• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 21776649852

07 Feb 2026 07:45AM UTC coverage: 95.222% (+0.01%) from 95.211%
21776649852

Pull #828

github

web-flow
[WDL 1.2] add values() (#832)

Co-authored-by: Mike Lin <mlin@contractor.chanzuckerberg.com>
Pull Request #828: WDL 1.2 standard library additions

51 of 54 new or added lines in 1 file covered. (94.44%)

1 existing line in 1 file now uncovered.

7533 of 7911 relevant lines covered (95.22%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.63
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import Tuple, List, Dict, Optional, Callable, Set, Any, Union, TYPE_CHECKING
1✔
15
from contextlib import ExitStack, suppress
1✔
16
from collections import Counter
1✔
17

18
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
19
from .._util import (
1✔
20
    write_atomic,
21
    write_values_json,
22
    provision_run_dir,
23
    TerminationSignalFlag,
24
    chmod_R_plus,
25
    path_really_within,
26
    LoggingFileHandler,
27
    compose_coroutines,
28
    pathsize,
29
    link_force,
30
    symlink_force,
31
    rmtree_atomic,
32
)
33
from .._util import StructuredLogMessage as _
1✔
34
from . import config, _statusbar
1✔
35
from .download import able as downloadable, run_cached as download
1✔
36
from .cache import CallCache, new as new_call_cache
1✔
37
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
38

39
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
40
    from .task_container import TaskContainer
×
41

42

43
def run_local_task(  # type: ignore[return]
1✔
44
    cfg: config.Loader,
45
    task: Tree.Task,
46
    inputs: Env.Bindings[Value.Base],
47
    run_id: Optional[str] = None,
48
    run_dir: Optional[str] = None,
49
    logger_prefix: Optional[List[str]] = None,
50
    _run_id_stack: Optional[List[str]] = None,
51
    _cache: Optional[CallCache] = None,
52
    _plugins: Optional[List[Callable[..., Any]]] = None,
53
) -> Tuple[str, Env.Bindings[Value.Base]]:
54
    """
55
    Run a task locally.
56

57
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
58
    paths that can be mounted into a container.
59

60
    :param run_id: unique ID for the run, defaults to workflow name
61
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
62
                    (defaults to current working directory).
63
                    If the final path component is ".", then operate in run_dir directly.
64
    """
65
    from .task_container import new as new_task_container  # delay heavy import
1✔
66

67
    _run_id_stack = _run_id_stack or []
1✔
68
    run_id = run_id or task.name
1✔
69
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
70
    logger = logging.getLogger(".".join(logger_prefix))
1✔
71
    with ExitStack() as cleanup:
1✔
72
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
73
        if terminating():
1✔
74
            raise Terminated(quiet=True)
×
75

76
        # provision run directory and log file
77
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
78
        logfile = os.path.join(run_dir, "task.log")
1✔
79
        cleanup.enter_context(
1✔
80
            LoggingFileHandler(
81
                logger,
82
                logfile,
83
            )
84
        )
85
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
86
            cleanup.enter_context(
1✔
87
                LoggingFileHandler(
88
                    logger,
89
                    logfile + ".json",
90
                    json=True,
91
                )
92
            )
93
        logger.notice(
1✔
94
            _(
95
                "task setup",
96
                name=task.name,
97
                source=task.pos.uri,
98
                line=task.pos.line,
99
                column=task.pos.column,
100
                dir=run_dir,
101
                thread=threading.get_ident(),
102
            )
103
        )
104
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
105

106
        if not _run_id_stack:
1✔
107
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
108
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
109
        else:
110
            cache = _cache
1✔
111
        assert cache
1✔
112

113
        cleanup.enter_context(_statusbar.task_slotted())
1✔
114
        maybe_container = None
1✔
115
        try:
1✔
116
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
117
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
118
            if cached is not None:
1✔
119
                for decl in task.outputs:
1✔
120
                    v = cached[decl.name]
1✔
121
                    vj = json.dumps(v.json)
1✔
122
                    logger.info(
1✔
123
                        _(
124
                            "cached output",
125
                            name=decl.name,
126
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
127
                        )
128
                    )
129
                # create out/ and outputs.json
130
                _outputs = link_outputs(
1✔
131
                    cache,
132
                    cached,
133
                    run_dir,
134
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
135
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
136
                )
137
                write_values_json(
1✔
138
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
139
                )
140
                logger.notice("done (cached)")
1✔
141
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
142
                # cached downstream inputs
143
                return (run_dir, cached)
1✔
144
            # start plugin coroutines and process inputs through them
145
            with compose_coroutines(
1✔
146
                [
147
                    (
148
                        lambda kwargs, cor=cor: cor(  # type: ignore
149
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
150
                        )
151
                    )
152
                    for cor in (
153
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
154
                        + (_plugins or [])
155
                    )
156
                ],
157
                {"inputs": inputs},
158
            ) as plugins:
159
                recv = next(plugins)
1✔
160
                inputs = recv["inputs"]
1✔
161

162
                # download input files, if needed
163
                posix_inputs = _download_input_files(
1✔
164
                    cfg,
165
                    logger,
166
                    logger_prefix,
167
                    run_dir,
168
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
169
                    cache,
170
                )
171

172
                # create TaskContainer according to configuration
173
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
174
                maybe_container = container
1✔
175

176
                # evaluate input/postinput declarations, including mapping from host to
177
                # in-container file paths
178
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
179

180
                # evaluate runtime fields
181
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
182
                _eval_task_runtime(
1✔
183
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
184
                )
185

186
                # interpolate command
187
                old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
188
                # pylint: disable=E1101
189
                placeholder_re = regex.compile(
1✔
190
                    cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX
191
                )
192
                setattr(
1✔
193
                    stdlib,
194
                    "_placeholder_regex",
195
                    placeholder_re,
196
                )  # hack to pass regex to WDL.Expr.Placeholder._eval
197
                assert isinstance(task.command, Expr.TaskCommand)
1✔
198
                command = task.command.eval(
1✔
199
                    container_env, stdlib, dedent=not old_command_dedent
200
                ).value
201
                delattr(stdlib, "_placeholder_regex")
1✔
202
                if old_command_dedent:  # see issue #674
1✔
203
                    command = _util.strip_leading_whitespace(command)[1]
1✔
204
                logger.debug(_("command", command=command.strip()))
1✔
205

206
                # process command & container through plugins
207
                recv = plugins.send({"command": command, "container": container})
1✔
208
                command, container = (recv[k] for k in ("command", "container"))
1✔
209

210
                # start container & run command (and retry if needed)
211
                _try_task(cfg, task, logger, container, command, terminating)
1✔
212

213
                # evaluate output declarations
214
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
215

216
                # create output_links
217
                outputs = link_outputs(
1✔
218
                    cache,
219
                    outputs,
220
                    run_dir,
221
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
222
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
223
                )
224

225
                # process outputs through plugins
226
                recv = plugins.send({"outputs": outputs})
1✔
227
                outputs = recv["outputs"]
1✔
228

229
                # clean up, if so configured, and make sure output files will be accessible to
230
                # downstream tasks
231
                _delete_work(cfg, logger, container, True)
1✔
232
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
233
                _warn_output_basename_collisions(logger, outputs)
1✔
234

235
                # write outputs.json
236
                write_values_json(
1✔
237
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
238
                )
239
                logger.notice("done")
1✔
240
                if not run_id.startswith("download-"):
1✔
241
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
242
                return (run_dir, outputs)
1✔
243
        except Exception as exn:
1✔
244
            tbtxt = traceback.format_exc()
1✔
245
            logger.debug(tbtxt)
1✔
246
            wrapper = RunFailed(task, run_id, run_dir)
1✔
247
            logmsg = _(
1✔
248
                str(wrapper),
249
                dir=run_dir,
250
                **error_json(
251
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
252
                ),
253
            )
254
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
UNCOV
255
                logger.debug(logmsg)
×
256
            else:
257
                logger.error(logmsg)
1✔
258
            try:
1✔
259
                write_atomic(
1✔
260
                    json.dumps(
261
                        error_json(
262
                            wrapper,
263
                            cause=exn,
264
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
265
                        ),
266
                        indent=2,
267
                    ),
268
                    os.path.join(run_dir, "error.json"),
269
                )
270
            except Exception as exn2:
×
271
                logger.debug(traceback.format_exc())
×
272
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
273
            try:
1✔
274
                if maybe_container:
1✔
275
                    _delete_work(cfg, logger, maybe_container, False)
1✔
276
            except Exception as exn2:
×
277
                logger.debug(traceback.format_exc())
×
278
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
279
            raise wrapper from exn
1✔
280

281

282
def _download_input_files(
1✔
283
    cfg: config.Loader,
284
    logger: logging.Logger,
285
    logger_prefix: List[str],
286
    run_dir: str,
287
    inputs: Env.Bindings[Value.Base],
288
    cache: CallCache,
289
) -> Env.Bindings[Value.Base]:
290
    """
291
    Find all File & Directory input values that are downloadable URIs (including any nested within
292
    compound values). Download them to some location under run_dir and return a copy of the inputs
293
    with the URI values replaced by the downloaded paths.
294
    """
295

296
    downloads = 0
1✔
297
    download_bytes = 0
1✔
298
    cached_hits = 0
1✔
299

300
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
301
        nonlocal downloads, download_bytes, cached_hits
302
        directory = isinstance(v, Value.Directory)
1✔
303
        uri = v.value
1✔
304
        if downloadable(cfg, uri, directory=directory):
1✔
305
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
306
            cached, filename = download(
1✔
307
                cfg,
308
                logger,
309
                cache,
310
                uri,
311
                directory=directory,
312
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
313
                logger_prefix=logger_prefix + [f"download{downloads}"],
314
            )
315
            if cached:
1✔
316
                cached_hits += 1
1✔
317
            else:
318
                sz = pathsize(filename)
1✔
319
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
320
                downloads += 1
1✔
321
                download_bytes += sz
1✔
322
            return filename
1✔
323
        return uri
1✔
324

325
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
326
    if downloads or cached_hits:
1✔
327
        logger.notice(
1✔
328
            _(
329
                "processed input URIs",
330
                downloaded=downloads,
331
                downloaded_bytes=download_bytes,
332
                cached=cached_hits,
333
            )
334
        )
335
    return ans
1✔
336

337

338
def _add_downloadable_defaults(
1✔
339
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
340
) -> Env.Bindings[Value.Base]:
341
    """
342
    Look for available File/Directory inputs that default to a string constant appearing to be a
343
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
344
    not already overridden in them).
345

346
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
347
    declarations until after processing downloads.
348
    """
349
    ans = inputs
1✔
350
    for b in available_inputs:
1✔
351
        if (
1✔
352
            isinstance(b.value.type, (Type.File, Type.Directory))
353
            and b.name not in ans
354
            and isinstance(b.value.expr, Expr.String)
355
        ):
356
            directory = isinstance(b.value.type, Type.Directory)
1✔
357
            maybe_uri = b.value.expr.literal
1✔
358
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
359
                v = (
1✔
360
                    Value.Directory(maybe_uri.value, b.value.expr)
361
                    if directory
362
                    else Value.File(maybe_uri.value, b.value.expr)
363
                )
364
                ans = ans.bind(b.name, v)
1✔
365
    return ans
1✔
366

367

368
def _eval_task_inputs(
1✔
369
    logger: logging.Logger,
370
    task: Tree.Task,
371
    posix_inputs: Env.Bindings[Value.Base],
372
    container: "TaskContainer",
373
) -> Env.Bindings[Value.Base]:
374
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
375
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
376
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
377
    # supplied None to override any default.
378
    input_decls = task.available_inputs
1✔
379
    posix_inputs = posix_inputs.filter(
1✔
380
        lambda b: (
381
            not (
382
                isinstance(b.value, Value.Null)
383
                and b.name in input_decls
384
                and input_decls[b.name].expr
385
                and not input_decls[b.name].type.optional
386
            )
387
        )
388
    )
389

390
    # Map all the provided input File & Directory paths to in-container paths
391
    container.add_paths(_fspaths(posix_inputs))
1✔
392
    _warn_input_basename_collisions(logger, container)
1✔
393

394
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
395
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
396
        p = fn.value.rstrip("/")
1✔
397
        if isinstance(fn, Value.Directory):
1✔
398
            p += "/"
1✔
399
        return container.input_path_map[p]
1✔
400

401
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
402

403
    # initialize value environment with the inputs
404
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
405
    for b in container_inputs:
1✔
406
        assert isinstance(b, Env.Binding)
1✔
407
        v = b.value
1✔
408
        assert isinstance(v, Value.Base)
1✔
409
        container_env = container_env.bind(b.name, v)
1✔
410
        vj = json.dumps(v.json)
1✔
411
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
412

413
    # collect remaining declarations requiring evaluation.
414
    decls_to_eval = []
1✔
415
    for decl in (task.inputs or []) + (task.postinputs or []):
1✔
416
        if not container_env.has_binding(decl.name):
1✔
417
            decls_to_eval.append(decl)
1✔
418

419
    # topsort them according to internal dependencies. prior static validation
420
    # should have ensured they're acyclic.
421
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(decls_to_eval)
1✔
422
    decls_to_eval = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
423
    assert len(decls_by_id) == len(decls_to_eval)
1✔
424

425
    # evaluate each declaration in that order
426
    # note: the write_* functions call container.add_paths as a side-effect
427
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
428
    for decl in decls_to_eval:
1✔
429
        assert isinstance(decl, Tree.Decl)
1✔
430
        v = Value.Null()
1✔
431
        if decl.expr:
1✔
432
            try:
1✔
433
                v = decl.expr.eval(container_env, stdlib=stdlib).coerce(decl.type)
1✔
434
            except Error.RuntimeError as exn:
1✔
435
                setattr(exn, "job_id", decl.workflow_node_id)
1✔
436
                raise exn
1✔
437
            except Exception as exn:
×
438
                exn2 = Error.EvalError(decl, str(exn))
×
439
                setattr(exn2, "job_id", decl.workflow_node_id)
×
440
                raise exn2 from exn
×
441
        else:
442
            assert decl.type.optional
1✔
443
        vj = json.dumps(v.json)
1✔
444
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
445
        container_env = container_env.bind(decl.name, v)
1✔
446

447
    return container_env
1✔
448

449

450
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
451
    """
452
    Get the unique paths of all File & Directory values in the environment. Directory paths will
453
    have a trailing '/'.
454
    """
455
    ans = set()
1✔
456

457
    def collector(v: Value.Base) -> None:
1✔
458
        if isinstance(v, Value.File):
1✔
459
            assert not v.value.endswith("/")
1✔
460
            ans.add(v.value)
1✔
461
        elif isinstance(v, Value.Directory):
1✔
462
            ans.add(v.value.rstrip("/") + "/")
1✔
463
        for ch in v.children:
1✔
464
            collector(ch)
1✔
465

466
    for b in env:
1✔
467
        collector(b.value)
1✔
468
    return ans
1✔
469

470

471
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
472
    basenames = Counter(
1✔
473
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
474
    )
475
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
476
    if collisions:
1✔
477
        logger.warning(
1✔
478
            _(
479
                "mounting input files with colliding basenames in separate container directories",
480
                basenames=collisions,
481
            )
482
        )
483

484

485
def _eval_task_runtime(
1✔
486
    cfg: config.Loader,
487
    logger: logging.Logger,
488
    run_id: str,
489
    task: Tree.Task,
490
    inputs: Env.Bindings[Value.Base],
491
    container: "TaskContainer",
492
    env: Env.Bindings[Value.Base],
493
    stdlib: StdLib.Base,
494
) -> None:
495
    # evaluate runtime{} expressions (merged with any configured defaults)
496
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
497
    if run_id.startswith("download-"):
1✔
498
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
499
    runtime_values = {}
1✔
500
    for key, v in runtime_defaults.items():
1✔
501
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
502
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
503
        runtime_values[key] = expr.eval(env, stdlib)
1✔
504
    for b in inputs.enter_namespace("runtime"):
1✔
505
        runtime_values[b.name] = b.value  # input overrides
1✔
506
    for b in inputs.enter_namespace("requirements"):
1✔
507
        runtime_values[b.name] = b.value
1✔
508
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
509

510
    # have container implementation validate & postprocess into container.runtime_values
511
    container.process_runtime(logger, runtime_values)
1✔
512

513
    if container.runtime_values:
1✔
514
        logger.info(_("effective runtime", **container.runtime_values))
1✔
515

516
    # add any configured overrides for in-container environment variables
517
    container.runtime_values.setdefault("env", {})
1✔
518
    env_vars_override = {}
1✔
519
    env_vars_skipped = []
1✔
520
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
521
        if ev_value is None:
1✔
522
            try:
1✔
523
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
524
            except KeyError:
1✔
525
                env_vars_skipped.append(ev_name)
1✔
526
        else:
527
            env_vars_override[ev_name] = str(ev_value)
1✔
528
    if env_vars_skipped:
1✔
529
        logger.warning(
1✔
530
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
531
        )
532
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
533
        "file_io", "mount_tmpdir_for"
534
    ):
535
        env_vars_override["TMPDIR"] = os.path.join(
1✔
536
            container.container_dir, "work", "_miniwdl_tmpdir"
537
        )
538
    if env_vars_override:
1✔
539
        # usually don't dump values into log, as they may often be auth tokens
540
        logger.notice(
1✔
541
            _(
542
                "overriding environment variables (portability warning)",
543
                names=list(env_vars_override.keys()),
544
            )
545
        )
546
        logger.debug(
1✔
547
            _("overriding environment variables (portability warning)", **env_vars_override)
548
        )
549
        container.runtime_values["env"].update(env_vars_override)
1✔
550

551
    # process decls with "env" decorator (EXPERIMENTAL)
552
    env_decls: Dict[str, Value.Base] = {}
1✔
553
    for decl in (task.inputs or []) + task.postinputs:
1✔
554
        if decl.decor.get("env", False) is True:
1✔
555
            if not env_decls:
1✔
556
                logger.warning(
1✔
557
                    "task env declarations are an experimental feature, subject to change"
558
                )
559
            v = env[decl.name]
1✔
560
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
561
                v = v.value
1✔
562
            else:
563
                v = json.dumps(v.json)
1✔
564
            env_decls[decl.name] = v
1✔
565
    container.runtime_values["env"].update(env_decls)
1✔
566

567
    unused_keys = list(
1✔
568
        key
569
        for key in runtime_values
570
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
571
    )
572
    if unused_keys:
1✔
573
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
574

575

576
def _try_task(
1✔
577
    cfg: config.Loader,
578
    task: Tree.Task,
579
    logger: logging.Logger,
580
    container: "TaskContainer",
581
    command: str,
582
    terminating: Callable[[], bool],
583
) -> None:
584
    """
585
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
586
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
587
    """
588
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
589

590
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
591
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
592
    retries = 0
1✔
593
    interruptions = 0
1✔
594

595
    while True:
×
596
        if terminating():
1✔
597
            raise Terminated()
1✔
598
        # copy input files, if needed
599
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
600
            "file_io", "copy_input_files_for"
601
        ):
602
            container.copy_input_files(logger)
1✔
603
        host_tmpdir = (
1✔
604
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
605
            if cfg.get_bool("file_io", "mount_tmpdir")
606
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
607
            else None
608
        )
609

610
        try:
1✔
611
            # start container & run command
612
            if host_tmpdir:
1✔
613
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
614
                os.mkdir(host_tmpdir, mode=0o770)
1✔
615
            try:
1✔
616
                return container.run(logger, command)
1✔
617
            finally:
618
                if host_tmpdir:
1✔
619
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
620
                    rmtree_atomic(host_tmpdir)
1✔
621
                if (
1✔
622
                    "preemptible" in container.runtime_values
623
                    and cfg.has_option("task_runtime", "_mock_interruptions")
624
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
625
                ):
626
                    raise Interrupted("mock interruption") from None
1✔
627
        except Exception as exn:
1✔
628
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
629
                logger.error(
1✔
630
                    _(
631
                        "interrupted task will be retried",
632
                        error=exn.__class__.__name__,
633
                        message=str(exn),
634
                        prev_interruptions=interruptions,
635
                        max_interruptions=max_interruptions,
636
                    )
637
                )
638
                interruptions += 1
1✔
639
            elif (
1✔
640
                not isinstance(exn, (Terminated, DockerBuildError))
641
                and retries < max_retries
642
                and not terminating()
643
            ):
644
                logger.error(
1✔
645
                    _(
646
                        "failed task will be retried",
647
                        error=exn.__class__.__name__,
648
                        message=str(exn),
649
                        prev_retries=retries,
650
                        max_retries=max_retries,
651
                    )
652
                )
653
                retries += 1
1✔
654
            else:
655
                raise
1✔
656
            _delete_work(cfg, logger, container, False)
1✔
657
            container.reset(logger)
1✔
658

659

660
def _eval_task_outputs(
1✔
661
    logger: logging.Logger,
662
    run_id: str,
663
    task: Tree.Task,
664
    env: Env.Bindings[Value.Base],
665
    container: "TaskContainer",
666
) -> Env.Bindings[Value.Base]:
667
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
668
    with suppress(FileNotFoundError):
1✔
669
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
670
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
671
            # courtesy log message directing user where to find it
672
            stdout_used = False
1✔
673
            expr_stack = [outp.expr for outp in task.outputs]
1✔
674
            while expr_stack:
1✔
675
                expr = expr_stack.pop()
1✔
676
                assert isinstance(expr, Expr.Base)
1✔
677
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
678
                    stdout_used = True
1✔
679
                else:
680
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
681
            if not stdout_used:
1✔
682
                logger.info(
1✔
683
                    _(
684
                        "command stdout unused; consider output `File cmd_out = stdout()`"
685
                        " or redirect command to stderr log >&2",
686
                        stdout_file=stdout_file,
687
                    )
688
                )
689

690
    # Helpers to rewrite File/Directory from in-container paths to host paths
691
    # First pass -- convert nonexistent output paths to None/Null
692
    def rewriter1(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
693
        container_path = v.value
1✔
694
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
695
            container_path += "/"
1✔
696
        if container.host_path(container_path) is None:
1✔
697
            logger.warning(
1✔
698
                _(
699
                    "output path not found in container (error unless declared type is optional)",
700
                    output=output_name,
701
                    path=container_path,
702
                )
703
            )
704
            return None
1✔
705
        return v.value
1✔
706

707
    # Second pass -- convert in-container paths to host paths
708
    def rewriter2(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
709
        container_path = v.value
1✔
710
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
711
            container_path += "/"
1✔
712
        host_path = container.host_path(container_path)
1✔
713
        assert host_path is not None
1✔
714
        if isinstance(v, Value.Directory):
1✔
715
            if host_path.endswith("/"):
1✔
716
                host_path = host_path[:-1]
1✔
717
            _check_directory(host_path, output_name)
1✔
718
            logger.debug(_("output dir", container=container_path, host=host_path))
1✔
719
        else:
720
            logger.debug(_("output file", container=container_path, host=host_path))
1✔
721
        return host_path
1✔
722

723
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
724
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
725
    for decl in task.outputs:
1✔
726
        assert decl.expr
1✔
727
        try:
1✔
728
            v = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type)
1✔
729
        except Error.RuntimeError as exn:
1✔
730
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
731
            raise exn
1✔
732
        except Exception as exn:
×
733
            exn2 = Error.EvalError(decl, str(exn))
×
734
            setattr(exn2, "job_id", decl.workflow_node_id)
×
735
            raise exn2 from exn
×
736
        _warn_struct_extra(logger, decl.name, v)
1✔
737
        vj = json.dumps(v.json)
1✔
738
        logger.info(
1✔
739
            _("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
740
        )
741

742
        # Now, a delicate sequence for postprocessing File outputs (including Files nested within
743
        # compound values)
744

745
        # First convert nonexistent paths to None/Null, and bind this in the environment for
746
        # evaluating subsequent output expressions.
747
        v = Value.rewrite_paths(v, lambda w: rewriter1(w, decl.name))
1✔
748
        env = env.bind(decl.name, v)
1✔
749
        # check if any nonexistent paths were provided for (non-optional) File/Directory types
750
        # Value.Null.coerce has a special behavior for us to raise FileNotFoundError for a
751
        # non-optional File/Directory type.
752
        try:
1✔
753
            v = v.coerce(decl.type)
1✔
754
        except FileNotFoundError:
1✔
755
            err = OutputError("File/Directory path not found in task output " + decl.name)
1✔
756
            setattr(err, "job_id", decl.workflow_node_id)
1✔
757
            raise err
1✔
758
        # Rewrite in-container paths to host paths
759
        v = Value.rewrite_paths(v, lambda w: rewriter2(w, decl.name))
1✔
760
        outputs = outputs.bind(decl.name, v)
1✔
761

762
    return outputs
1✔
763

764

765
def _check_directory(host_path: str, output_name: str) -> None:
1✔
766
    """
767
    traverse output directory to check that all symlinks are relative & resolve inside the dir
768
    """
769

770
    def raiser(exc: OSError):
1✔
771
        raise exc
×
772

773
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
774
        for fn in files:
1✔
775
            fn = os.path.join(root, fn)
1✔
776
            if os.path.islink(fn) and (
1✔
777
                not os.path.exists(fn)
778
                or os.path.isabs(os.readlink(fn))
779
                or not path_really_within(fn, host_path)
780
            ):
781
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
782

783

784
def _warn_struct_extra(
1✔
785
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
786
) -> None:
787
    """
788
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
789
    """
790
    if warned_keys is None:
1✔
791
        warned_keys = set()
1✔
792
    if isinstance(v, Value.Struct) and v.extra:
1✔
793
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
794
        if extra_keys - warned_keys:
1✔
795
            logger.notice(
1✔
796
                _(
797
                    "extraneous keys in struct initializer",
798
                    decl=decl_name,
799
                    struct=str(v.type),
800
                    extra_keys=list(extra_keys),
801
                )
802
            )
803
            warned_keys.update(extra_keys)
1✔
804
    for ch in v.children:
1✔
805
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
806

807

808
def link_outputs(
1✔
809
    cache: CallCache,
810
    outputs: Env.Bindings[Value.Base],
811
    run_dir: str,
812
    hardlinks: bool = False,
813
    use_relative_output_paths: bool = False,
814
) -> Env.Bindings[Value.Base]:
815
    """
816
    Following a successful run, the output files may be scattered throughout a complex directory
817
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
818
    containing nicely organized symlinks to the output files, and rewrite File values in the
819
    outputs env to use these symlinks.
820
    """
821

822
    def link1(target: str, link: str, directory: bool) -> None:
1✔
823
        if hardlinks:
1✔
824
            # TODO: what if target is an input from a different filesystem?
825
            if directory:
1✔
826
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
827
            else:
828
                link_force(target, link)
1✔
829
        else:
830
            symlink_force(target, link)
1✔
831

832
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
833
        if isinstance(v, (Value.File, Value.Directory)):
1✔
834
            target = (
1✔
835
                v.value
836
                if os.path.exists(v.value)
837
                else cache.get_download(v.value, isinstance(v, Value.Directory))
838
            )
839
            if target:
1✔
840
                target = os.path.realpath(target)
1✔
841
                assert os.path.exists(target)
1✔
842
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
843
                    # make symlink relative
844
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
845
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
846
                os.makedirs(dn, exist_ok=False)
1✔
847
                link1(target, link, isinstance(v, Value.Directory))
1✔
848
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
849
                # directory without reference to the output types or JSON for whatever reason. It
850
                # might otherwise have trouble distinguishing Directory outputs among the
851
                # structured subdirectories we create for compound types.
852
                if isinstance(v, Value.Directory):
1✔
853
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
854
                        pass
1✔
855
                v.value = link
1✔
856
        # recurse into compound values
857
        elif isinstance(v, Value.Array) and v.value:
1✔
858
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
859
            for i in range(len(v.value)):
1✔
860
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
861
        elif isinstance(v, Value.Map) and v.value:
1✔
862
            # create a subdirectory for each key, as long as the key names seem to make reasonable
863
            # path components; otherwise, treat the dict as a list of its values
864
            keys_ok = (
1✔
865
                sum(
866
                    1
867
                    for b in v.value
868
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
869
                    is None
870
                )
871
                == 0
872
            )
873
            d = int(math.ceil(math.log10(len(v.value))))
1✔
874
            for i, b in enumerate(v.value):
1✔
875
                v.value[i] = (
1✔
876
                    b[0],
877
                    map_paths(
878
                        b[1],
879
                        os.path.join(
880
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
881
                        ),
882
                    ),
883
                )
884
        elif isinstance(v, Value.Pair):
1✔
885
            v.value = (
1✔
886
                map_paths(v.value[0], os.path.join(dn, "left")),
887
                map_paths(v.value[1], os.path.join(dn, "right")),
888
            )
889
        elif isinstance(v, Value.Struct):
1✔
890
            for key in v.value:
1✔
891
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
892
        return v
1✔
893

894
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
895

896
    if use_relative_output_paths:
1✔
897
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
898

899
    return outputs.map(
1✔
900
        lambda binding: Env.Binding(
901
            binding.name,
902
            map_paths(
903
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
904
                os.path.join(run_dir, "out", binding.name),
905
            ),
906
        )
907
    )
908

909

910
def link_outputs_relative(
1✔
911
    link1: Callable[[str, str, bool], None],
912
    cache: CallCache,
913
    outputs: Env.Bindings[Value.Base],
914
    run_dir: str,
915
    hardlinks: bool = False,
916
) -> Env.Bindings[Value.Base]:
917
    """
918
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
919
    the generated files' paths relative to their task working directory.
920
    """
921
    link_destinations: Dict[str, str] = dict()
1✔
922

923
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
924
        target = (
1✔
925
            v.value
926
            if os.path.exists(v.value)
927
            else cache.get_download(v.value, isinstance(v, Value.Directory))
928
        )
929
        if target:
1✔
930
            real_target = os.path.realpath(target)
1✔
931
            rel_link = None
1✔
932
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
933
                # target was generated by current task; use its path relative to the task work dir
934
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
935
                    rel_link = os.path.relpath(
1✔
936
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
937
                    )
938
            else:
939
                # target is an out/ link generated by a call in the current workflow OR a cached
940
                # run; use the link's path relative to that out/ dir, which by induction should
941
                # equal its path relative to the original work/ dir.
942
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
943
                # user's cwd or the task-generated relative path might coincidentally have
944
                # something named 'out'.
945
                p = None
1✔
946
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
947
                    if p and (
1✔
948
                        os.path.isfile(os.path.join(target[:p], "task.log"))
949
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
950
                    ):
951
                        break
1✔
952
                    p = None
1✔
953
                if p and p + 5 < len(target):
1✔
954
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
955
            # if neither of the above cases applies, then fall back to just the target basename
956
            rel_link = rel_link or os.path.basename(target)
1✔
957
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
958
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
959
                raise FileExistsError(
1✔
960
                    "Output filename collision; to allow this, set"
961
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
962
                )
963
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
964
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
965
            link_destinations[abs_link] = real_target
1✔
966
            return abs_link
1✔
967
        return v.value
×
968

969
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
970

971

972
def _warn_output_basename_collisions(
1✔
973
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
974
) -> None:
975
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
976

977
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
978
        target = v.value
1✔
979
        if os.path.exists(target):
1✔
980
            target = os.path.realpath(target)
1✔
981
        basename = os.path.basename(target)
1✔
982
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
983
        return v.value
1✔
984

985
    Value.rewrite_env_paths(outputs, walker)
1✔
986

987
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
988
    if collisions:
1✔
989
        logger.warning(
1✔
990
            _(
991
                "multiple output files share the same basename; while miniwdl supports this,"
992
                " consider modifying WDL to ensure distinct output basenames",
993
                basenames=collisions,
994
            )
995
        )
996

997

998
def _delete_work(
1✔
999
    cfg: config.Loader,
1000
    logger: logging.Logger,
1001
    container: "Optional[TaskContainer]",
1002
    success: bool,
1003
) -> None:
1004
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1005
    if container and (
1✔
1006
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1007
    ):
1008
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1009
            logger.warning(
1✔
1010
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1011
            )
1012
            return
1✔
1013
        container.delete_work(logger, delete_streams=not success)
1✔
1014

1015

1016
class _StdLib(StdLib.Base):
1✔
1017
    logger: logging.Logger
1✔
1018
    container: "TaskContainer"
1✔
1019
    inputs_only: bool  # if True then only permit access to input files
1✔
1020

1021
    def __init__(
1✔
1022
        self,
1023
        wdl_version: str,
1024
        logger: logging.Logger,
1025
        container: "TaskContainer",
1026
        inputs_only: bool,
1027
    ) -> None:
1028
        super().__init__(wdl_version, write_dir=os.path.join(container.host_dir, "write_"))
1✔
1029
        self.logger = logger
1✔
1030
        self.container = container
1✔
1031
        self.inputs_only = inputs_only
1✔
1032

1033
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1034
        # check allowability of reading this file, & map from in-container to host
1035
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1036
        if ans is None:
1✔
1037
            raise OutputError("function was passed non-existent file " + filename)
×
1038
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1039
        return ans
1✔
1040

1041
    def _virtualize_filename(self, filename: str) -> str:
1✔
1042
        # register new file with container input_path_map
1043
        self.container.add_paths([filename])
1✔
1044
        self.logger.debug(
1✔
1045
            _("write_", host=filename, container=self.container.input_path_map[filename])
1046
        )
1047
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1048
        return self.container.input_path_map[filename]
1✔
1049

1050

1051
class InputStdLib(_StdLib):
1✔
1052
    # StdLib for evaluation of task inputs and command
1053
    def __init__(
1✔
1054
        self,
1055
        wdl_version: str,
1056
        logger: logging.Logger,
1057
        container: "TaskContainer",
1058
    ) -> None:
1059
        super().__init__(wdl_version, logger, container, True)
1✔
1060

1061

1062
class OutputStdLib(_StdLib):
1✔
1063
    # StdLib for evaluation of task outputs
1064
    def __init__(
1✔
1065
        self,
1066
        wdl_version: str,
1067
        logger: logging.Logger,
1068
        container: "TaskContainer",
1069
    ) -> None:
1070
        super().__init__(wdl_version, logger, container, False)
1✔
1071

1072
        setattr(
1✔
1073
            self,
1074
            "stdout",
1075
            StdLib.StaticFunction(
1076
                "stdout",
1077
                [],
1078
                Type.File(),
1079
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1080
            ),
1081
        )
1082
        setattr(
1✔
1083
            self,
1084
            "stderr",
1085
            StdLib.StaticFunction(
1086
                "stderr",
1087
                [],
1088
                Type.File(),
1089
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1090
            ),
1091
        )
1092

1093
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1094
            pat = pattern.coerce(Type.String()).value
1✔
1095
            if not pat:
1✔
1096
                raise OutputError("empty glob() pattern")
×
1097
            assert isinstance(pat, str)
1✔
1098
            if pat[0] == "/":
1✔
1099
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1100
            if pat.startswith("..") or "/.." in pat:
1✔
1101
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1102
            if pat.startswith("./"):
1✔
1103
                pat = pat[2:]
1✔
1104
            # glob the host directory
1105
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1106
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1107
            # convert the host filenames to in-container filenames
1108
            container_files = []
1✔
1109
            for hf in host_files:
1✔
1110
                dstrip = lib.container.host_work_dir()
1✔
1111
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1112
                assert hf.startswith(dstrip)
1✔
1113
                container_files.append(
1✔
1114
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1115
                )
1116
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1117

1118
        setattr(
1✔
1119
            self,
1120
            "glob",
1121
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1122
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc