• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 19203544105

09 Nov 2025 04:47AM UTC coverage: 95.202% (-0.01%) from 95.214%
19203544105

Pull #821

github

web-flow
Merge 67125ed79 into beaa9a968
Pull Request #821: [WDL 1.2] override task requirements in inputs

8 of 8 new or added lines in 3 files covered. (100.0%)

1 existing line in 1 file now uncovered.

7481 of 7858 relevant lines covered (95.2%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.63
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import Tuple, List, Dict, Optional, Callable, Set, Any, Union, TYPE_CHECKING
1✔
15
from contextlib import ExitStack, suppress
1✔
16
from collections import Counter
1✔
17

18
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
19
from .._util import (
1✔
20
    write_atomic,
21
    write_values_json,
22
    provision_run_dir,
23
    TerminationSignalFlag,
24
    chmod_R_plus,
25
    path_really_within,
26
    LoggingFileHandler,
27
    compose_coroutines,
28
    pathsize,
29
    link_force,
30
    symlink_force,
31
    rmtree_atomic,
32
)
33
from .._util import StructuredLogMessage as _
1✔
34
from . import config, _statusbar
1✔
35
from .download import able as downloadable, run_cached as download
1✔
36
from .cache import CallCache, new as new_call_cache
1✔
37
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
38

39
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
40
    from .task_container import TaskContainer
×
41

42

43
def run_local_task(  # type: ignore[return]
1✔
44
    cfg: config.Loader,
45
    task: Tree.Task,
46
    inputs: Env.Bindings[Value.Base],
47
    run_id: Optional[str] = None,
48
    run_dir: Optional[str] = None,
49
    logger_prefix: Optional[List[str]] = None,
50
    _run_id_stack: Optional[List[str]] = None,
51
    _cache: Optional[CallCache] = None,
52
    _plugins: Optional[List[Callable[..., Any]]] = None,
53
) -> Tuple[str, Env.Bindings[Value.Base]]:
54
    """
55
    Run a task locally.
56

57
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
58
    paths that can be mounted into a container.
59

60
    :param run_id: unique ID for the run, defaults to workflow name
61
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
62
                    (defaults to current working directory).
63
                    If the final path component is ".", then operate in run_dir directly.
64
    """
65
    from .task_container import new as new_task_container  # delay heavy import
1✔
66

67
    _run_id_stack = _run_id_stack or []
1✔
68
    run_id = run_id or task.name
1✔
69
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
70
    logger = logging.getLogger(".".join(logger_prefix))
1✔
71
    with ExitStack() as cleanup:
1✔
72
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
73
        if terminating():
1✔
74
            raise Terminated(quiet=True)
×
75

76
        # provision run directory and log file
77
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
78
        logfile = os.path.join(run_dir, "task.log")
1✔
79
        cleanup.enter_context(
1✔
80
            LoggingFileHandler(
81
                logger,
82
                logfile,
83
            )
84
        )
85
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
86
            cleanup.enter_context(
1✔
87
                LoggingFileHandler(
88
                    logger,
89
                    logfile + ".json",
90
                    json=True,
91
                )
92
            )
93
        logger.notice(
1✔
94
            _(
95
                "task setup",
96
                name=task.name,
97
                source=task.pos.uri,
98
                line=task.pos.line,
99
                column=task.pos.column,
100
                dir=run_dir,
101
                thread=threading.get_ident(),
102
            )
103
        )
104
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
105

106
        if not _run_id_stack:
1✔
107
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
108
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
109
        else:
110
            cache = _cache
1✔
111
        assert cache
1✔
112

113
        cleanup.enter_context(_statusbar.task_slotted())
1✔
114
        maybe_container = None
1✔
115
        try:
1✔
116
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
117
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
118
            if cached is not None:
1✔
119
                for decl in task.outputs:
1✔
120
                    v = cached[decl.name]
1✔
121
                    vj = json.dumps(v.json)
1✔
122
                    logger.info(
1✔
123
                        _(
124
                            "cached output",
125
                            name=decl.name,
126
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
127
                        )
128
                    )
129
                # create out/ and outputs.json
130
                _outputs = link_outputs(
1✔
131
                    cache,
132
                    cached,
133
                    run_dir,
134
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
135
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
136
                )
137
                write_values_json(
1✔
138
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
139
                )
140
                logger.notice("done (cached)")
1✔
141
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
142
                # cached downstream inputs
143
                return (run_dir, cached)
1✔
144
            # start plugin coroutines and process inputs through them
145
            with compose_coroutines(
1✔
146
                [
147
                    (
148
                        lambda kwargs, cor=cor: cor(  # type: ignore
149
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
150
                        )
151
                    )
152
                    for cor in (
153
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
154
                        + (_plugins or [])
155
                    )
156
                ],
157
                {"inputs": inputs},
158
            ) as plugins:
159
                recv = next(plugins)
1✔
160
                inputs = recv["inputs"]
1✔
161

162
                # download input files, if needed
163
                posix_inputs = _download_input_files(
1✔
164
                    cfg,
165
                    logger,
166
                    logger_prefix,
167
                    run_dir,
168
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
169
                    cache,
170
                )
171

172
                # create TaskContainer according to configuration
173
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
174
                maybe_container = container
1✔
175

176
                # evaluate input/postinput declarations, including mapping from host to
177
                # in-container file paths
178
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
179

180
                # evaluate runtime fields
181
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
182
                _eval_task_runtime(
1✔
183
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
184
                )
185

186
                # interpolate command
187
                old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
188
                # pylint: disable=E1101
189
                placeholder_re = regex.compile(
1✔
190
                    cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX
191
                )
192
                setattr(
1✔
193
                    stdlib,
194
                    "_placeholder_regex",
195
                    placeholder_re,
196
                )  # hack to pass regex to WDL.Expr.Placeholder._eval
197
                assert isinstance(task.command, Expr.TaskCommand)
1✔
198
                command = task.command.eval(
1✔
199
                    container_env, stdlib, dedent=not old_command_dedent
200
                ).value
201
                delattr(stdlib, "_placeholder_regex")
1✔
202
                if old_command_dedent:  # see issue #674
1✔
203
                    command = _util.strip_leading_whitespace(command)[1]
1✔
204
                logger.debug(_("command", command=command.strip()))
1✔
205

206
                # process command & container through plugins
207
                recv = plugins.send({"command": command, "container": container})
1✔
208
                command, container = (recv[k] for k in ("command", "container"))
1✔
209

210
                # start container & run command (and retry if needed)
211
                _try_task(cfg, task, logger, container, command, terminating)
1✔
212

213
                # evaluate output declarations
214
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
215

216
                # create output_links
217
                outputs = link_outputs(
1✔
218
                    cache,
219
                    outputs,
220
                    run_dir,
221
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
222
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
223
                )
224

225
                # process outputs through plugins
226
                recv = plugins.send({"outputs": outputs})
1✔
227
                outputs = recv["outputs"]
1✔
228

229
                # clean up, if so configured, and make sure output files will be accessible to
230
                # downstream tasks
231
                _delete_work(cfg, logger, container, True)
1✔
232
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
233
                _warn_output_basename_collisions(logger, outputs)
1✔
234

235
                # write outputs.json
236
                write_values_json(
1✔
237
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
238
                )
239
                logger.notice("done")
1✔
240
                if not run_id.startswith("download-"):
1✔
241
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
242
                return (run_dir, outputs)
1✔
243
        except Exception as exn:
1✔
244
            tbtxt = traceback.format_exc()
1✔
245
            logger.debug(tbtxt)
1✔
246
            wrapper = RunFailed(task, run_id, run_dir)
1✔
247
            logmsg = _(
1✔
248
                str(wrapper),
249
                dir=run_dir,
250
                **error_json(
251
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
252
                ),
253
            )
254
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
UNCOV
255
                logger.debug(logmsg)
×
256
            else:
257
                logger.error(logmsg)
1✔
258
            try:
1✔
259
                write_atomic(
1✔
260
                    json.dumps(
261
                        error_json(
262
                            wrapper,
263
                            cause=exn,
264
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
265
                        ),
266
                        indent=2,
267
                    ),
268
                    os.path.join(run_dir, "error.json"),
269
                )
270
            except Exception as exn2:
×
271
                logger.debug(traceback.format_exc())
×
272
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
273
            try:
1✔
274
                if maybe_container:
1✔
275
                    _delete_work(cfg, logger, maybe_container, False)
1✔
276
            except Exception as exn2:
×
277
                logger.debug(traceback.format_exc())
×
278
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
279
            raise wrapper from exn
1✔
280

281

282
def _download_input_files(
1✔
283
    cfg: config.Loader,
284
    logger: logging.Logger,
285
    logger_prefix: List[str],
286
    run_dir: str,
287
    inputs: Env.Bindings[Value.Base],
288
    cache: CallCache,
289
) -> Env.Bindings[Value.Base]:
290
    """
291
    Find all File & Directory input values that are downloadable URIs (including any nested within
292
    compound values). Download them to some location under run_dir and return a copy of the inputs
293
    with the URI values replaced by the downloaded paths.
294
    """
295

296
    downloads = 0
1✔
297
    download_bytes = 0
1✔
298
    cached_hits = 0
1✔
299

300
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
301
        nonlocal downloads, download_bytes, cached_hits
302
        directory = isinstance(v, Value.Directory)
1✔
303
        uri = v.value
1✔
304
        if downloadable(cfg, uri, directory=directory):
1✔
305
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
306
            cached, filename = download(
1✔
307
                cfg,
308
                logger,
309
                cache,
310
                uri,
311
                directory=directory,
312
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
313
                logger_prefix=logger_prefix + [f"download{downloads}"],
314
            )
315
            if cached:
1✔
316
                cached_hits += 1
1✔
317
            else:
318
                sz = pathsize(filename)
1✔
319
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
320
                downloads += 1
1✔
321
                download_bytes += sz
1✔
322
            return filename
1✔
323
        return uri
1✔
324

325
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
326
    if downloads or cached_hits:
1✔
327
        logger.notice(
1✔
328
            _(
329
                "processed input URIs",
330
                downloaded=downloads,
331
                downloaded_bytes=download_bytes,
332
                cached=cached_hits,
333
            )
334
        )
335
    return ans
1✔
336

337

338
def _add_downloadable_defaults(
1✔
339
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
340
) -> Env.Bindings[Value.Base]:
341
    """
342
    Look for available File/Directory inputs that default to a string constant appearing to be a
343
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
344
    not already overridden in them).
345

346
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
347
    declarations until after processing downloads.
348
    """
349
    ans = inputs
1✔
350
    for b in available_inputs:
1✔
351
        if (
1✔
352
            isinstance(b.value.type, (Type.File, Type.Directory))
353
            and b.name not in ans
354
            and isinstance(b.value.expr, Expr.String)
355
        ):
356
            directory = isinstance(b.value.type, Type.Directory)
1✔
357
            maybe_uri = b.value.expr.literal
1✔
358
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
359
                v = (
1✔
360
                    Value.Directory(maybe_uri.value, b.value.expr)
361
                    if directory
362
                    else Value.File(maybe_uri.value, b.value.expr)
363
                )
364
                ans = ans.bind(b.name, v)
1✔
365
    return ans
1✔
366

367

368
def _eval_task_inputs(
1✔
369
    logger: logging.Logger,
370
    task: Tree.Task,
371
    posix_inputs: Env.Bindings[Value.Base],
372
    container: "TaskContainer",
373
) -> Env.Bindings[Value.Base]:
374
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
375
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
376
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
377
    # supplied None to override any default.
378
    input_decls = task.available_inputs
1✔
379
    posix_inputs = posix_inputs.filter(
1✔
380
        lambda b: not (
381
            isinstance(b.value, Value.Null)
382
            and b.name in input_decls
383
            and input_decls[b.name].expr
384
            and not input_decls[b.name].type.optional
385
        )
386
    )
387

388
    # Map all the provided input File & Directory paths to in-container paths
389
    container.add_paths(_fspaths(posix_inputs))
1✔
390
    _warn_input_basename_collisions(logger, container)
1✔
391

392
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
393
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
394
        p = fn.value.rstrip("/")
1✔
395
        if isinstance(fn, Value.Directory):
1✔
396
            p += "/"
1✔
397
        return container.input_path_map[p]
1✔
398

399
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
400

401
    # initialize value environment with the inputs
402
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
403
    for b in container_inputs:
1✔
404
        assert isinstance(b, Env.Binding)
1✔
405
        v = b.value
1✔
406
        assert isinstance(v, Value.Base)
1✔
407
        container_env = container_env.bind(b.name, v)
1✔
408
        vj = json.dumps(v.json)
1✔
409
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
410

411
    # collect remaining declarations requiring evaluation.
412
    decls_to_eval = []
1✔
413
    for decl in (task.inputs or []) + (task.postinputs or []):
1✔
414
        if not container_env.has_binding(decl.name):
1✔
415
            decls_to_eval.append(decl)
1✔
416

417
    # topsort them according to internal dependencies. prior static validation
418
    # should have ensured they're acyclic.
419
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(decls_to_eval)
1✔
420
    decls_to_eval = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
421
    assert len(decls_by_id) == len(decls_to_eval)
1✔
422

423
    # evaluate each declaration in that order
424
    # note: the write_* functions call container.add_paths as a side-effect
425
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
426
    for decl in decls_to_eval:
1✔
427
        assert isinstance(decl, Tree.Decl)
1✔
428
        v = Value.Null()
1✔
429
        if decl.expr:
1✔
430
            try:
1✔
431
                v = decl.expr.eval(container_env, stdlib=stdlib).coerce(decl.type)
1✔
432
            except Error.RuntimeError as exn:
1✔
433
                setattr(exn, "job_id", decl.workflow_node_id)
1✔
434
                raise exn
1✔
435
            except Exception as exn:
×
436
                exn2 = Error.EvalError(decl, str(exn))
×
437
                setattr(exn2, "job_id", decl.workflow_node_id)
×
438
                raise exn2 from exn
×
439
        else:
440
            assert decl.type.optional
1✔
441
        vj = json.dumps(v.json)
1✔
442
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
443
        container_env = container_env.bind(decl.name, v)
1✔
444

445
    return container_env
1✔
446

447

448
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
449
    """
450
    Get the unique paths of all File & Directory values in the environment. Directory paths will
451
    have a trailing '/'.
452
    """
453
    ans = set()
1✔
454

455
    def collector(v: Value.Base) -> None:
1✔
456
        if isinstance(v, Value.File):
1✔
457
            assert not v.value.endswith("/")
1✔
458
            ans.add(v.value)
1✔
459
        elif isinstance(v, Value.Directory):
1✔
460
            ans.add(v.value.rstrip("/") + "/")
1✔
461
        for ch in v.children:
1✔
462
            collector(ch)
1✔
463

464
    for b in env:
1✔
465
        collector(b.value)
1✔
466
    return ans
1✔
467

468

469
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
470
    basenames = Counter(
1✔
471
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
472
    )
473
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
474
    if collisions:
1✔
475
        logger.warning(
1✔
476
            _(
477
                "mounting input files with colliding basenames in separate container directories",
478
                basenames=collisions,
479
            )
480
        )
481

482

483
def _eval_task_runtime(
1✔
484
    cfg: config.Loader,
485
    logger: logging.Logger,
486
    run_id: str,
487
    task: Tree.Task,
488
    inputs: Env.Bindings[Value.Base],
489
    container: "TaskContainer",
490
    env: Env.Bindings[Value.Base],
491
    stdlib: StdLib.Base,
492
) -> None:
493
    # evaluate runtime{} expressions (merged with any configured defaults)
494
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
495
    if run_id.startswith("download-"):
1✔
496
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
497
    runtime_values = {}
1✔
498
    for key, v in runtime_defaults.items():
1✔
499
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
500
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
501
        runtime_values[key] = expr.eval(env, stdlib)
1✔
502
    for b in inputs.enter_namespace("runtime"):
1✔
503
        runtime_values[b.name] = b.value  # input overrides
1✔
504
    for b in inputs.enter_namespace("requirements"):
1✔
505
        runtime_values[b.name] = b.value
1✔
506
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
507

508
    # have container implementation validate & postprocess into container.runtime_values
509
    container.process_runtime(logger, runtime_values)
1✔
510

511
    if container.runtime_values:
1✔
512
        logger.info(_("effective runtime", **container.runtime_values))
1✔
513

514
    # add any configured overrides for in-container environment variables
515
    container.runtime_values.setdefault("env", {})
1✔
516
    env_vars_override = {}
1✔
517
    env_vars_skipped = []
1✔
518
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
519
        if ev_value is None:
1✔
520
            try:
1✔
521
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
522
            except KeyError:
1✔
523
                env_vars_skipped.append(ev_name)
1✔
524
        else:
525
            env_vars_override[ev_name] = str(ev_value)
1✔
526
    if env_vars_skipped:
1✔
527
        logger.warning(
1✔
528
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
529
        )
530
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
531
        "file_io", "mount_tmpdir_for"
532
    ):
533
        env_vars_override["TMPDIR"] = os.path.join(
1✔
534
            container.container_dir, "work", "_miniwdl_tmpdir"
535
        )
536
    if env_vars_override:
1✔
537
        # usually don't dump values into log, as they may often be auth tokens
538
        logger.notice(
1✔
539
            _(
540
                "overriding environment variables (portability warning)",
541
                names=list(env_vars_override.keys()),
542
            )
543
        )
544
        logger.debug(
1✔
545
            _("overriding environment variables (portability warning)", **env_vars_override)
546
        )
547
        container.runtime_values["env"].update(env_vars_override)
1✔
548

549
    # process decls with "env" decorator (EXPERIMENTAL)
550
    env_decls: Dict[str, Value.Base] = {}
1✔
551
    for decl in (task.inputs or []) + task.postinputs:
1✔
552
        if decl.decor.get("env", False) is True:
1✔
553
            if not env_decls:
1✔
554
                logger.warning(
1✔
555
                    "task env declarations are an experimental feature, subject to change"
556
                )
557
            v = env[decl.name]
1✔
558
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
559
                v = v.value
1✔
560
            else:
561
                v = json.dumps(v.json)
1✔
562
            env_decls[decl.name] = v
1✔
563
    container.runtime_values["env"].update(env_decls)
1✔
564

565
    unused_keys = list(
1✔
566
        key
567
        for key in runtime_values
568
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
569
    )
570
    if unused_keys:
1✔
571
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
572

573

574
def _try_task(
1✔
575
    cfg: config.Loader,
576
    task: Tree.Task,
577
    logger: logging.Logger,
578
    container: "TaskContainer",
579
    command: str,
580
    terminating: Callable[[], bool],
581
) -> None:
582
    """
583
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
584
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
585
    """
586
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
587

588
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
589
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
590
    retries = 0
1✔
591
    interruptions = 0
1✔
592

593
    while True:
×
594
        if terminating():
1✔
595
            raise Terminated()
1✔
596
        # copy input files, if needed
597
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
598
            "file_io", "copy_input_files_for"
599
        ):
600
            container.copy_input_files(logger)
1✔
601
        host_tmpdir = (
1✔
602
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
603
            if cfg.get_bool("file_io", "mount_tmpdir")
604
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
605
            else None
606
        )
607

608
        try:
1✔
609
            # start container & run command
610
            if host_tmpdir:
1✔
611
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
612
                os.mkdir(host_tmpdir, mode=0o770)
1✔
613
            try:
1✔
614
                return container.run(logger, command)
1✔
615
            finally:
616
                if host_tmpdir:
1✔
617
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
618
                    rmtree_atomic(host_tmpdir)
1✔
619
                if (
1✔
620
                    "preemptible" in container.runtime_values
621
                    and cfg.has_option("task_runtime", "_mock_interruptions")
622
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
623
                ):
624
                    raise Interrupted("mock interruption") from None
1✔
625
        except Exception as exn:
1✔
626
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
627
                logger.error(
1✔
628
                    _(
629
                        "interrupted task will be retried",
630
                        error=exn.__class__.__name__,
631
                        message=str(exn),
632
                        prev_interruptions=interruptions,
633
                        max_interruptions=max_interruptions,
634
                    )
635
                )
636
                interruptions += 1
1✔
637
            elif (
1✔
638
                not isinstance(exn, (Terminated, DockerBuildError))
639
                and retries < max_retries
640
                and not terminating()
641
            ):
642
                logger.error(
1✔
643
                    _(
644
                        "failed task will be retried",
645
                        error=exn.__class__.__name__,
646
                        message=str(exn),
647
                        prev_retries=retries,
648
                        max_retries=max_retries,
649
                    )
650
                )
651
                retries += 1
1✔
652
            else:
653
                raise
1✔
654
            _delete_work(cfg, logger, container, False)
1✔
655
            container.reset(logger)
1✔
656

657

658
def _eval_task_outputs(
1✔
659
    logger: logging.Logger,
660
    run_id: str,
661
    task: Tree.Task,
662
    env: Env.Bindings[Value.Base],
663
    container: "TaskContainer",
664
) -> Env.Bindings[Value.Base]:
665
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
666
    with suppress(FileNotFoundError):
1✔
667
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
668
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
669
            # courtesy log message directing user where to find it
670
            stdout_used = False
1✔
671
            expr_stack = [outp.expr for outp in task.outputs]
1✔
672
            while expr_stack:
1✔
673
                expr = expr_stack.pop()
1✔
674
                assert isinstance(expr, Expr.Base)
1✔
675
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
676
                    stdout_used = True
1✔
677
                else:
678
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
679
            if not stdout_used:
1✔
680
                logger.info(
1✔
681
                    _(
682
                        "command stdout unused; consider output `File cmd_out = stdout()`"
683
                        " or redirect command to stderr log >&2",
684
                        stdout_file=stdout_file,
685
                    )
686
                )
687

688
    # Helpers to rewrite File/Directory from in-container paths to host paths
689
    # First pass -- convert nonexistent output paths to None/Null
690
    def rewriter1(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
691
        container_path = v.value
1✔
692
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
693
            container_path += "/"
1✔
694
        if container.host_path(container_path) is None:
1✔
695
            logger.warning(
1✔
696
                _(
697
                    "output path not found in container (error unless declared type is optional)",
698
                    output=output_name,
699
                    path=container_path,
700
                )
701
            )
702
            return None
1✔
703
        return v.value
1✔
704

705
    # Second pass -- convert in-container paths to host paths
706
    def rewriter2(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
707
        container_path = v.value
1✔
708
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
709
            container_path += "/"
1✔
710
        host_path = container.host_path(container_path)
1✔
711
        assert host_path is not None
1✔
712
        if isinstance(v, Value.Directory):
1✔
713
            if host_path.endswith("/"):
1✔
714
                host_path = host_path[:-1]
1✔
715
            _check_directory(host_path, output_name)
1✔
716
            logger.debug(_("output dir", container=container_path, host=host_path))
1✔
717
        else:
718
            logger.debug(_("output file", container=container_path, host=host_path))
1✔
719
        return host_path
1✔
720

721
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
722
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
723
    for decl in task.outputs:
1✔
724
        assert decl.expr
1✔
725
        try:
1✔
726
            v = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type)
1✔
727
        except Error.RuntimeError as exn:
1✔
728
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
729
            raise exn
1✔
730
        except Exception as exn:
×
731
            exn2 = Error.EvalError(decl, str(exn))
×
732
            setattr(exn2, "job_id", decl.workflow_node_id)
×
733
            raise exn2 from exn
×
734
        _warn_struct_extra(logger, decl.name, v)
1✔
735
        vj = json.dumps(v.json)
1✔
736
        logger.info(
1✔
737
            _("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
738
        )
739

740
        # Now, a delicate sequence for postprocessing File outputs (including Files nested within
741
        # compound values)
742

743
        # First convert nonexistent paths to None/Null, and bind this in the environment for
744
        # evaluating subsequent output expressions.
745
        v = Value.rewrite_paths(v, lambda w: rewriter1(w, decl.name))
1✔
746
        env = env.bind(decl.name, v)
1✔
747
        # check if any nonexistent paths were provided for (non-optional) File/Directory types
748
        # Value.Null.coerce has a special behavior for us to raise FileNotFoundError for a
749
        # non-optional File/Directory type.
750
        try:
1✔
751
            v = v.coerce(decl.type)
1✔
752
        except FileNotFoundError:
1✔
753
            err = OutputError("File/Directory path not found in task output " + decl.name)
1✔
754
            setattr(err, "job_id", decl.workflow_node_id)
1✔
755
            raise err
1✔
756
        # Rewrite in-container paths to host paths
757
        v = Value.rewrite_paths(v, lambda w: rewriter2(w, decl.name))
1✔
758
        outputs = outputs.bind(decl.name, v)
1✔
759

760
    return outputs
1✔
761

762

763
def _check_directory(host_path: str, output_name: str) -> None:
1✔
764
    """
765
    traverse output directory to check that all symlinks are relative & resolve inside the dir
766
    """
767

768
    def raiser(exc: OSError):
1✔
769
        raise exc
×
770

771
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
772
        for fn in files:
1✔
773
            fn = os.path.join(root, fn)
1✔
774
            if os.path.islink(fn) and (
1✔
775
                not os.path.exists(fn)
776
                or os.path.isabs(os.readlink(fn))
777
                or not path_really_within(fn, host_path)
778
            ):
779
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
780

781

782
def _warn_struct_extra(
1✔
783
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
784
) -> None:
785
    """
786
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
787
    """
788
    if warned_keys is None:
1✔
789
        warned_keys = set()
1✔
790
    if isinstance(v, Value.Struct) and v.extra:
1✔
791
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
792
        if extra_keys - warned_keys:
1✔
793
            logger.notice(
1✔
794
                _(
795
                    "extraneous keys in struct initializer",
796
                    decl=decl_name,
797
                    struct=str(v.type),
798
                    extra_keys=list(extra_keys),
799
                )
800
            )
801
            warned_keys.update(extra_keys)
1✔
802
    for ch in v.children:
1✔
803
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
804

805

806
def link_outputs(
1✔
807
    cache: CallCache,
808
    outputs: Env.Bindings[Value.Base],
809
    run_dir: str,
810
    hardlinks: bool = False,
811
    use_relative_output_paths: bool = False,
812
) -> Env.Bindings[Value.Base]:
813
    """
814
    Following a successful run, the output files may be scattered throughout a complex directory
815
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
816
    containing nicely organized symlinks to the output files, and rewrite File values in the
817
    outputs env to use these symlinks.
818
    """
819

820
    def link1(target: str, link: str, directory: bool) -> None:
1✔
821
        if hardlinks:
1✔
822
            # TODO: what if target is an input from a different filesystem?
823
            if directory:
1✔
824
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
825
            else:
826
                link_force(target, link)
1✔
827
        else:
828
            symlink_force(target, link)
1✔
829

830
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
831
        if isinstance(v, (Value.File, Value.Directory)):
1✔
832
            target = (
1✔
833
                v.value
834
                if os.path.exists(v.value)
835
                else cache.get_download(v.value, isinstance(v, Value.Directory))
836
            )
837
            if target:
1✔
838
                target = os.path.realpath(target)
1✔
839
                assert os.path.exists(target)
1✔
840
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
841
                    # make symlink relative
842
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
843
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
844
                os.makedirs(dn, exist_ok=False)
1✔
845
                link1(target, link, isinstance(v, Value.Directory))
1✔
846
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
847
                # directory without reference to the output types or JSON for whatever reason. It
848
                # might otherwise have trouble distinguishing Directory outputs among the
849
                # structured subdirectories we create for compound types.
850
                if isinstance(v, Value.Directory):
1✔
851
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
852
                        pass
1✔
853
                v.value = link
1✔
854
        # recurse into compound values
855
        elif isinstance(v, Value.Array) and v.value:
1✔
856
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
857
            for i in range(len(v.value)):
1✔
858
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
859
        elif isinstance(v, Value.Map) and v.value:
1✔
860
            # create a subdirectory for each key, as long as the key names seem to make reasonable
861
            # path components; otherwise, treat the dict as a list of its values
862
            keys_ok = (
1✔
863
                sum(
864
                    1
865
                    for b in v.value
866
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
867
                    is None
868
                )
869
                == 0
870
            )
871
            d = int(math.ceil(math.log10(len(v.value))))
1✔
872
            for i, b in enumerate(v.value):
1✔
873
                v.value[i] = (
1✔
874
                    b[0],
875
                    map_paths(
876
                        b[1],
877
                        os.path.join(
878
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
879
                        ),
880
                    ),
881
                )
882
        elif isinstance(v, Value.Pair):
1✔
883
            v.value = (
1✔
884
                map_paths(v.value[0], os.path.join(dn, "left")),
885
                map_paths(v.value[1], os.path.join(dn, "right")),
886
            )
887
        elif isinstance(v, Value.Struct):
1✔
888
            for key in v.value:
1✔
889
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
890
        return v
1✔
891

892
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
893

894
    if use_relative_output_paths:
1✔
895
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
896

897
    return outputs.map(
1✔
898
        lambda binding: Env.Binding(
899
            binding.name,
900
            map_paths(
901
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
902
                os.path.join(run_dir, "out", binding.name),
903
            ),
904
        )
905
    )
906

907

908
def link_outputs_relative(
1✔
909
    link1: Callable[[str, str, bool], None],
910
    cache: CallCache,
911
    outputs: Env.Bindings[Value.Base],
912
    run_dir: str,
913
    hardlinks: bool = False,
914
) -> Env.Bindings[Value.Base]:
915
    """
916
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
917
    the generated files' paths relative to their task working directory.
918
    """
919
    link_destinations: Dict[str, str] = dict()
1✔
920

921
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
922
        target = (
1✔
923
            v.value
924
            if os.path.exists(v.value)
925
            else cache.get_download(v.value, isinstance(v, Value.Directory))
926
        )
927
        if target:
1✔
928
            real_target = os.path.realpath(target)
1✔
929
            rel_link = None
1✔
930
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
931
                # target was generated by current task; use its path relative to the task work dir
932
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
933
                    rel_link = os.path.relpath(
1✔
934
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
935
                    )
936
            else:
937
                # target is an out/ link generated by a call in the current workflow OR a cached
938
                # run; use the link's path relative to that out/ dir, which by induction should
939
                # equal its path relative to the original work/ dir.
940
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
941
                # user's cwd or the task-generated relative path might coincidentally have
942
                # something named 'out'.
943
                p = None
1✔
944
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
945
                    if p and (
1✔
946
                        os.path.isfile(os.path.join(target[:p], "task.log"))
947
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
948
                    ):
949
                        break
1✔
950
                    p = None
1✔
951
                if p and p + 5 < len(target):
1✔
952
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
953
            # if neither of the above cases applies, then fall back to just the target basename
954
            rel_link = rel_link or os.path.basename(target)
1✔
955
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
956
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
957
                raise FileExistsError(
1✔
958
                    "Output filename collision; to allow this, set"
959
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
960
                )
961
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
962
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
963
            link_destinations[abs_link] = real_target
1✔
964
            return abs_link
1✔
965
        return v.value
×
966

967
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
968

969

970
def _warn_output_basename_collisions(
1✔
971
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
972
) -> None:
973
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
974

975
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
976
        target = v.value
1✔
977
        if os.path.exists(target):
1✔
978
            target = os.path.realpath(target)
1✔
979
        basename = os.path.basename(target)
1✔
980
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
981
        return v.value
1✔
982

983
    Value.rewrite_env_paths(outputs, walker)
1✔
984

985
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
986
    if collisions:
1✔
987
        logger.warning(
1✔
988
            _(
989
                "multiple output files share the same basename; while miniwdl supports this,"
990
                " consider modifying WDL to ensure distinct output basenames",
991
                basenames=collisions,
992
            )
993
        )
994

995

996
def _delete_work(
1✔
997
    cfg: config.Loader,
998
    logger: logging.Logger,
999
    container: "Optional[TaskContainer]",
1000
    success: bool,
1001
) -> None:
1002
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1003
    if container and (
1✔
1004
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1005
    ):
1006
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1007
            logger.warning(
1✔
1008
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1009
            )
1010
            return
1✔
1011
        container.delete_work(logger, delete_streams=not success)
1✔
1012

1013

1014
class _StdLib(StdLib.Base):
1✔
1015
    logger: logging.Logger
1✔
1016
    container: "TaskContainer"
1✔
1017
    inputs_only: bool  # if True then only permit access to input files
1✔
1018

1019
    def __init__(
1✔
1020
        self,
1021
        wdl_version: str,
1022
        logger: logging.Logger,
1023
        container: "TaskContainer",
1024
        inputs_only: bool,
1025
    ) -> None:
1026
        super().__init__(wdl_version, write_dir=os.path.join(container.host_dir, "write_"))
1✔
1027
        self.logger = logger
1✔
1028
        self.container = container
1✔
1029
        self.inputs_only = inputs_only
1✔
1030

1031
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1032
        # check allowability of reading this file, & map from in-container to host
1033
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1034
        if ans is None:
1✔
1035
            raise OutputError("function was passed non-existent file " + filename)
×
1036
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1037
        return ans
1✔
1038

1039
    def _virtualize_filename(self, filename: str) -> str:
1✔
1040
        # register new file with container input_path_map
1041
        self.container.add_paths([filename])
1✔
1042
        self.logger.debug(
1✔
1043
            _("write_", host=filename, container=self.container.input_path_map[filename])
1044
        )
1045
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1046
        return self.container.input_path_map[filename]
1✔
1047

1048

1049
class InputStdLib(_StdLib):
1✔
1050
    # StdLib for evaluation of task inputs and command
1051
    def __init__(
1✔
1052
        self,
1053
        wdl_version: str,
1054
        logger: logging.Logger,
1055
        container: "TaskContainer",
1056
    ) -> None:
1057
        super().__init__(wdl_version, logger, container, True)
1✔
1058

1059

1060
class OutputStdLib(_StdLib):
1✔
1061
    # StdLib for evaluation of task outputs
1062
    def __init__(
1✔
1063
        self,
1064
        wdl_version: str,
1065
        logger: logging.Logger,
1066
        container: "TaskContainer",
1067
    ) -> None:
1068
        super().__init__(wdl_version, logger, container, False)
1✔
1069

1070
        setattr(
1✔
1071
            self,
1072
            "stdout",
1073
            StdLib.StaticFunction(
1074
                "stdout",
1075
                [],
1076
                Type.File(),
1077
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1078
            ),
1079
        )
1080
        setattr(
1✔
1081
            self,
1082
            "stderr",
1083
            StdLib.StaticFunction(
1084
                "stderr",
1085
                [],
1086
                Type.File(),
1087
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1088
            ),
1089
        )
1090

1091
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1092
            pat = pattern.coerce(Type.String()).value
1✔
1093
            if not pat:
1✔
1094
                raise OutputError("empty glob() pattern")
×
1095
            assert isinstance(pat, str)
1✔
1096
            if pat[0] == "/":
1✔
1097
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1098
            if pat.startswith("..") or "/.." in pat:
1✔
1099
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1100
            if pat.startswith("./"):
1✔
1101
                pat = pat[2:]
1✔
1102
            # glob the host directory
1103
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1104
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1105
            # convert the host filenames to in-container filenames
1106
            container_files = []
1✔
1107
            for hf in host_files:
1✔
1108
                dstrip = lib.container.host_work_dir()
1✔
1109
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1110
                assert hf.startswith(dstrip)
1✔
1111
                container_files.append(
1✔
1112
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1113
                )
1114
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1115

1116
        setattr(
1✔
1117
            self,
1118
            "glob",
1119
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1120
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc