• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 6665524552

27 Oct 2023 09:38AM UTC coverage: 95.165% (-0.03%) from 95.191%
6665524552

push

github

mlin
log thread pool initialization

7361 of 7735 relevant lines covered (95.16%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.76
/WDL/runtime/task.py
1
# pyre-strict
2
"""
1✔
3
Local task runner
4
"""
5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import Tuple, List, Dict, Optional, Callable, Set, Any, Union
1✔
15
from contextlib import ExitStack, suppress
1✔
16
from collections import Counter
1✔
17

18
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
19
from .._util import (
1✔
20
    write_atomic,
21
    write_values_json,
22
    provision_run_dir,
23
    TerminationSignalFlag,
24
    chmod_R_plus,
25
    path_really_within,
26
    LoggingFileHandler,
27
    compose_coroutines,
28
    pathsize,
29
    link_force,
30
    symlink_force,
31
    rmtree_atomic,
32
)
33
from .._util import StructuredLogMessage as _
1✔
34
from . import config, _statusbar
1✔
35
from .download import able as downloadable, run_cached as download
1✔
36
from .cache import CallCache, new as new_call_cache
1✔
37
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
38

39

40
def run_local_task(
1✔
41
    cfg: config.Loader,
42
    task: Tree.Task,
43
    inputs: Env.Bindings[Value.Base],
44
    run_id: Optional[str] = None,
45
    run_dir: Optional[str] = None,
46
    logger_prefix: Optional[List[str]] = None,
47
    _run_id_stack: Optional[List[str]] = None,
48
    _cache: Optional[CallCache] = None,
49
    _plugins: Optional[List[Callable[..., Any]]] = None,
50
) -> Tuple[str, Env.Bindings[Value.Base]]:
51
    """
52
    Run a task locally.
53

54
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
55
    paths that can be mounted into a container.
56

57
    :param run_id: unique ID for the run, defaults to workflow name
58
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
59
                    (defaults to current working directory).
60
                    If the final path component is ".", then operate in run_dir directly.
61
    """
62
    from .task_container import new as new_task_container  # delay heavy import
1✔
63

64
    _run_id_stack = _run_id_stack or []
1✔
65
    run_id = run_id or task.name
1✔
66
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
67
    logger = logging.getLogger(".".join(logger_prefix))
1✔
68
    with ExitStack() as cleanup:
1✔
69
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
70
        if terminating():
1✔
71
            raise Terminated(quiet=True)
×
72

73
        # provision run directory and log file
74
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
75
        logfile = os.path.join(run_dir, "task.log")
1✔
76
        cleanup.enter_context(
1✔
77
            LoggingFileHandler(
78
                logger,
79
                logfile,
80
            )
81
        )
82
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
83
            cleanup.enter_context(
1✔
84
                LoggingFileHandler(
85
                    logger,
86
                    logfile + ".json",
87
                    json=True,
88
                )
89
            )
90
        logger.notice(  # pyre-fixme
1✔
91
            _(
92
                "task setup",
93
                name=task.name,
94
                source=task.pos.uri,
95
                line=task.pos.line,
96
                column=task.pos.column,
97
                dir=run_dir,
98
                thread=threading.get_ident(),
99
            )
100
        )
101
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
102

103
        if not _run_id_stack:
1✔
104
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
105
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
106
        else:
107
            cache = _cache
1✔
108
        assert cache
1✔
109

110
        cleanup.enter_context(_statusbar.task_slotted())
1✔
111
        container = None
1✔
112
        try:
1✔
113
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
114
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
115
            if cached is not None:
1✔
116
                for decl in task.outputs:
1✔
117
                    v = cached[decl.name]
1✔
118
                    vj = json.dumps(v.json)
1✔
119
                    logger.info(
1✔
120
                        _(
121
                            "cached output",
122
                            name=decl.name,
123
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
124
                        )
125
                    )
126
                # create out/ and outputs.json
127
                _outputs = link_outputs(
1✔
128
                    cache,
129
                    cached,
130
                    run_dir,
131
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
132
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
133
                )
134
                write_values_json(
1✔
135
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
136
                )
137
                logger.notice("done (cached)")  # pyre-fixme
1✔
138
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
139
                # cached downstream inputs
140
                return (run_dir, cached)
1✔
141
            # start plugin coroutines and process inputs through them
142
            with compose_coroutines(
1✔
143
                [
144
                    (
145
                        lambda kwargs, cor=cor: cor(
146
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
147
                        )
148
                    )
149
                    for cor in (
150
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
151
                        + (_plugins or [])
152
                    )
153
                ],
154
                {"inputs": inputs},
155
            ) as plugins:
156
                recv = next(plugins)
1✔
157
                inputs = recv["inputs"]
1✔
158

159
                # download input files, if needed
160
                posix_inputs = _download_input_files(
1✔
161
                    cfg,
162
                    logger,
163
                    logger_prefix,
164
                    run_dir,
165
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
166
                    cache,
167
                )
168

169
                # create TaskContainer according to configuration
170
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
171

172
                # evaluate input/postinput declarations, including mapping from host to
173
                # in-container file paths
174
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
175

176
                # evaluate runtime fields
177
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
178
                _eval_task_runtime(
1✔
179
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
180
                )
181

182
                # interpolate command
183
                # pylint: disable=E1101
184
                placeholder_re = regex.compile(
1✔
185
                    cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX
186
                )
187
                setattr(
1✔
188
                    stdlib,
189
                    "_placeholder_regex",
190
                    placeholder_re,
191
                )  # hack to pass regex to WDL.Expr.Placeholder._eval
192
                command = _util.strip_leading_whitespace(
1✔
193
                    task.command.eval(container_env, stdlib).value
194
                )[1]
195
                delattr(stdlib, "_placeholder_regex")
1✔
196
                logger.debug(_("command", command=command.strip()))
1✔
197

198
                # process command & container through plugins
199
                recv = plugins.send({"command": command, "container": container})
1✔
200
                command, container = (recv[k] for k in ("command", "container"))
1✔
201

202
                # start container & run command (and retry if needed)
203
                _try_task(cfg, task, logger, container, command, terminating)
1✔
204

205
                # evaluate output declarations
206
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
207

208
                # create output_links
209
                outputs = link_outputs(
1✔
210
                    cache,
211
                    outputs,
212
                    run_dir,
213
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
214
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
215
                )
216

217
                # process outputs through plugins
218
                recv = plugins.send({"outputs": outputs})
1✔
219
                outputs = recv["outputs"]
1✔
220

221
                # clean up, if so configured, and make sure output files will be accessible to
222
                # downstream tasks
223
                _delete_work(cfg, logger, container, True)
1✔
224
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
225
                _warn_output_basename_collisions(logger, outputs)
1✔
226

227
                # write outputs.json
228
                write_values_json(
1✔
229
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
230
                )
231
                logger.notice("done")  # pyre-fixme
1✔
232
                if not run_id.startswith("download-"):
1✔
233
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
234
                return (run_dir, outputs)
1✔
235
        except Exception as exn:
1✔
236
            tbtxt = traceback.format_exc()
1✔
237
            logger.debug(tbtxt)
1✔
238
            wrapper = RunFailed(task, run_id, run_dir)
1✔
239
            logmsg = _(
1✔
240
                str(wrapper),
241
                dir=run_dir,
242
                **error_json(
243
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
244
                ),
245
            )
246
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
247
                logger.debug(logmsg)
×
248
            else:
249
                logger.error(logmsg)
1✔
250
            try:
1✔
251
                write_atomic(
1✔
252
                    json.dumps(
253
                        error_json(
254
                            wrapper,
255
                            cause=exn,
256
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
257
                        ),
258
                        indent=2,
259
                    ),
260
                    os.path.join(run_dir, "error.json"),
261
                )
262
            except Exception as exn2:
×
263
                logger.debug(traceback.format_exc())
×
264
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
265
            try:
1✔
266
                _delete_work(cfg, logger, container, False)
1✔
267
            except Exception as exn2:
×
268
                logger.debug(traceback.format_exc())
×
269
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
270
            raise wrapper from exn
1✔
271

272

273
def _download_input_files(
1✔
274
    cfg: config.Loader,
275
    logger: logging.Logger,
276
    logger_prefix: List[str],
277
    run_dir: str,
278
    inputs: Env.Bindings[Value.Base],
279
    cache: CallCache,
280
) -> Env.Bindings[Value.Base]:
281
    """
282
    Find all File & Directory input values that are downloadable URIs (including any nested within
283
    compound values). Download them to some location under run_dir and return a copy of the inputs
284
    with the URI values replaced by the downloaded paths.
285
    """
286

287
    downloads = 0
1✔
288
    download_bytes = 0
1✔
289
    cached_hits = 0
1✔
290

291
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
292
        nonlocal downloads, download_bytes, cached_hits
293
        directory = isinstance(v, Value.Directory)
1✔
294
        uri = v.value
1✔
295
        if downloadable(cfg, uri, directory=directory):
1✔
296
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
297
            cached, filename = download(
1✔
298
                cfg,
299
                logger,
300
                cache,
301
                uri,
302
                directory=directory,
303
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
304
                logger_prefix=logger_prefix + [f"download{downloads}"],
305
            )
306
            if cached:
1✔
307
                cached_hits += 1
1✔
308
            else:
309
                sz = pathsize(filename)
1✔
310
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
311
                downloads += 1
1✔
312
                download_bytes += sz
1✔
313
            return filename
1✔
314
        return uri
1✔
315

316
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
317
    if downloads or cached_hits:
1✔
318
        logger.notice(  # pyre-fixme
1✔
319
            _(
320
                "processed input URIs",
321
                downloaded=downloads,
322
                downloaded_bytes=download_bytes,
323
                cached=cached_hits,
324
            )
325
        )
326
    return ans
1✔
327

328

329
def _add_downloadable_defaults(
1✔
330
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
331
) -> Env.Bindings[Value.Base]:
332
    """
333
    Look for available File/Directory inputs that default to a string constant appearing to be a
334
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
335
    not already overridden in them).
336

337
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
338
    declarations until after processing downloads.
339
    """
340
    ans = inputs
1✔
341
    for b in available_inputs:
1✔
342
        if (
1✔
343
            isinstance(b.value.type, (Type.File, Type.Directory))
344
            and b.name not in ans
345
            and isinstance(b.value.expr, Expr.String)
346
        ):
347
            directory = isinstance(b.value.type, Type.Directory)
1✔
348
            maybe_uri = b.value.expr.literal
1✔
349
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
350
                v = (
1✔
351
                    Value.Directory(maybe_uri.value, b.value.expr)
352
                    if directory
353
                    else Value.File(maybe_uri.value, b.value.expr)
354
                )
355
                ans = ans.bind(b.name, v)
1✔
356
    return ans
1✔
357

358

359
def _eval_task_inputs(
1✔
360
    logger: logging.Logger,
361
    task: Tree.Task,
362
    posix_inputs: Env.Bindings[Value.Base],
363
    container: "runtime.task_container.TaskContainer",
364
) -> Env.Bindings[Value.Base]:
365
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
366
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
367
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
368
    # supplied None to override any default.
369
    input_decls = task.available_inputs
1✔
370
    posix_inputs = posix_inputs.filter(
1✔
371
        lambda b: not (
372
            isinstance(b.value, Value.Null)
373
            and b.name in input_decls
374
            and input_decls[b.name].expr
375
            and not input_decls[b.name].type.optional
376
        )
377
    )
378

379
    # Map all the provided input File & Directory paths to in-container paths
380
    container.add_paths(_fspaths(posix_inputs))
1✔
381
    _warn_input_basename_collisions(logger, container)
1✔
382

383
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
384
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
385
        p = fn.value.rstrip("/")
1✔
386
        if isinstance(fn, Value.Directory):
1✔
387
            p += "/"
1✔
388
        return container.input_path_map[p]
1✔
389

390
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
391

392
    # initialize value environment with the inputs
393
    container_env = Env.Bindings()
1✔
394
    for b in container_inputs:
1✔
395
        assert isinstance(b, Env.Binding)
1✔
396
        v = b.value
1✔
397
        assert isinstance(v, Value.Base)
1✔
398
        container_env = container_env.bind(b.name, v)
1✔
399
        vj = json.dumps(v.json)
1✔
400
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
401

402
    # collect remaining declarations requiring evaluation.
403
    decls_to_eval = []
1✔
404
    for decl in (task.inputs or []) + (task.postinputs or []):
1✔
405
        if not container_env.has_binding(decl.name):
1✔
406
            decls_to_eval.append(decl)
1✔
407

408
    # topsort them according to internal dependencies. prior static validation
409
    # should have ensured they're acyclic.
410
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(decls_to_eval)
1✔
411
    decls_to_eval = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
412
    assert len(decls_by_id) == len(decls_to_eval)
1✔
413

414
    # evaluate each declaration in that order
415
    # note: the write_* functions call container.add_paths as a side-effect
416
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
417
    for decl in decls_to_eval:
1✔
418
        assert isinstance(decl, Tree.Decl)
1✔
419
        v = Value.Null()
1✔
420
        if decl.expr:
1✔
421
            try:
1✔
422
                v = decl.expr.eval(container_env, stdlib=stdlib).coerce(decl.type)
1✔
423
            except Error.RuntimeError as exn:
1✔
424
                setattr(exn, "job_id", decl.workflow_node_id)
1✔
425
                raise exn
1✔
426
            except Exception as exn:
×
427
                exn2 = Error.EvalError(decl, str(exn))
×
428
                setattr(exn2, "job_id", decl.workflow_node_id)
×
429
                raise exn2 from exn
×
430
        else:
431
            assert decl.type.optional
1✔
432
        vj = json.dumps(v.json)
1✔
433
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
434
        container_env = container_env.bind(decl.name, v)
1✔
435

436
    return container_env
1✔
437

438

439
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
440
    """
441
    Get the unique paths of all File & Directory values in the environment. Directory paths will
442
    have a trailing '/'.
443
    """
444
    ans = set()
1✔
445

446
    def collector(v: Value.Base) -> None:
1✔
447
        if isinstance(v, Value.File):
1✔
448
            assert not v.value.endswith("/")
1✔
449
            ans.add(v.value)
1✔
450
        elif isinstance(v, Value.Directory):
1✔
451
            ans.add(v.value.rstrip("/") + "/")
1✔
452
        for ch in v.children:
1✔
453
            collector(ch)
1✔
454

455
    for b in env:
1✔
456
        collector(b.value)
1✔
457
    return ans
1✔
458

459

460
def _warn_input_basename_collisions(
1✔
461
    logger: logging.Logger, container: "runtime.task_container.TaskContainer"
462
) -> None:
463
    basenames = Counter(
1✔
464
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
465
    )
466
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
467
    if collisions:
1✔
468
        logger.warning(
1✔
469
            _(
470
                "mounting input files with colliding basenames in separate container directories",
471
                basenames=collisions,
472
            )
473
        )
474

475

476
def _eval_task_runtime(
1✔
477
    cfg: config.Loader,
478
    logger: logging.Logger,
479
    run_id: str,
480
    task: Tree.Task,
481
    inputs: Env.Bindings[Value.Base],
482
    container: "runtime.task_container.TaskContainer",
483
    env: Env.Bindings[Value.Base],
484
    stdlib: StdLib.Base,
485
) -> None:
486
    # evaluate runtime{} expressions (merged with any configured defaults)
487
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
488
    if run_id.startswith("download-"):
1✔
489
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
490
    runtime_values = {}
1✔
491
    for key, v in runtime_defaults.items():
1✔
492
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
493
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
494
        runtime_values[key] = expr.eval(env, stdlib)
1✔
495
    for b in inputs.enter_namespace("runtime"):
1✔
496
        runtime_values[b.name] = b.value  # input overrides
1✔
497
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
498

499
    # have container implementation validate & postprocess into container.runtime_values
500
    container.process_runtime(logger, runtime_values)
1✔
501

502
    if container.runtime_values:
1✔
503
        logger.info(_("effective runtime", **container.runtime_values))
1✔
504

505
    # add any configured overrides for in-container environment variables
506
    container.runtime_values.setdefault("env", {})
1✔
507
    env_vars_override = {}
1✔
508
    env_vars_skipped = []
1✔
509
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
510
        if ev_value is None:
1✔
511
            try:
1✔
512
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
513
            except KeyError:
1✔
514
                env_vars_skipped.append(ev_name)
1✔
515
        else:
516
            env_vars_override[ev_name] = str(ev_value)
1✔
517
    if env_vars_skipped:
1✔
518
        logger.warning(
1✔
519
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
520
        )
521
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
522
        "file_io", "mount_tmpdir_for"
523
    ):
524
        env_vars_override["TMPDIR"] = os.path.join(
1✔
525
            container.container_dir, "work", "_miniwdl_tmpdir"
526
        )
527
    if env_vars_override:
1✔
528
        # usually don't dump values into log, as they may often be auth tokens
529
        logger.notice(  # pyre-ignore
1✔
530
            _(
531
                "overriding environment variables (portability warning)",
532
                names=list(env_vars_override.keys()),
533
            )
534
        )
535
        logger.debug(
1✔
536
            _("overriding environment variables (portability warning)", **env_vars_override)
537
        )
538
        container.runtime_values["env"].update(env_vars_override)
1✔
539

540
    # process decls with "env" decorator (EXPERIMENTAL)
541
    env_decls = {}
1✔
542
    for decl in (task.inputs or []) + task.postinputs:
1✔
543
        if decl.decor.get("env", False) is True:
1✔
544
            if not env_decls:
1✔
545
                logger.warning(
1✔
546
                    "task env declarations are an experimental feature, subject to change"
547
                )
548
            v = env[decl.name]
1✔
549
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
550
                v = v.value
1✔
551
            else:
552
                v = json.dumps(v.json)
1✔
553
            env_decls[decl.name] = v
1✔
554
    container.runtime_values["env"].update(env_decls)
1✔
555

556
    unused_keys = list(
1✔
557
        key
558
        for key in runtime_values
559
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
560
    )
561
    if unused_keys:
1✔
562
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
563

564

565
def _try_task(
1✔
566
    cfg: config.Loader,
567
    task: Tree.Task,
568
    logger: logging.Logger,
569
    container: "runtime.task_container.TaskContainer",
570
    command: str,
571
    terminating: Callable[[], bool],
572
) -> None:
573
    """
574
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
575
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
576
    """
577
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
578

579
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
580
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
581
    retries = 0
1✔
582
    interruptions = 0
1✔
583

584
    while True:
×
585
        if terminating():
1✔
586
            raise Terminated()
1✔
587
        # copy input files, if needed
588
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
589
            "file_io", "copy_input_files_for"
590
        ):
591
            container.copy_input_files(logger)
1✔
592
        host_tmpdir = (
1✔
593
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
594
            if cfg.get_bool("file_io", "mount_tmpdir")
595
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
596
            else None
597
        )
598

599
        try:
1✔
600
            # start container & run command
601
            if host_tmpdir:
1✔
602
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
603
                os.mkdir(host_tmpdir, mode=0o770)
1✔
604
            try:
1✔
605
                return container.run(logger, command)
1✔
606
            finally:
607
                if host_tmpdir:
1✔
608
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
609
                    rmtree_atomic(host_tmpdir)
1✔
610
                if (
1✔
611
                    "preemptible" in container.runtime_values
612
                    and cfg.has_option("task_runtime", "_mock_interruptions")
613
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
614
                ):
615
                    raise Interrupted("mock interruption") from None
1✔
616
        except Exception as exn:
1✔
617
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
618
                logger.error(
1✔
619
                    _(
620
                        "interrupted task will be retried",
621
                        error=exn.__class__.__name__,
622
                        message=str(exn),
623
                        prev_interruptions=interruptions,
624
                        max_interruptions=max_interruptions,
625
                    )
626
                )
627
                interruptions += 1
1✔
628
            elif (
1✔
629
                not isinstance(exn, (Terminated, DockerBuildError))
630
                and retries < max_retries
631
                and not terminating()
632
            ):
633
                logger.error(
1✔
634
                    _(
635
                        "failed task will be retried",
636
                        error=exn.__class__.__name__,
637
                        message=str(exn),
638
                        prev_retries=retries,
639
                        max_retries=max_retries,
640
                    )
641
                )
642
                retries += 1
1✔
643
            else:
644
                raise
1✔
645
            _delete_work(cfg, logger, container, False)
1✔
646
            container.reset(logger)
1✔
647

648

649
def _eval_task_outputs(
1✔
650
    logger: logging.Logger,
651
    run_id: str,
652
    task: Tree.Task,
653
    env: Env.Bindings[Value.Base],
654
    container: "runtime.task_container.TaskContainer",
655
) -> Env.Bindings[Value.Base]:
656
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
657
    with suppress(FileNotFoundError):
1✔
658
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
659
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
660
            # courtesy log message directing user where to find it
661
            stdout_used = False
1✔
662
            expr_stack = [outp.expr for outp in task.outputs]
1✔
663
            while expr_stack:
1✔
664
                expr = expr_stack.pop()
1✔
665
                assert isinstance(expr, Expr.Base)
1✔
666
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
667
                    stdout_used = True
1✔
668
                else:
669
                    expr_stack.extend(expr.children)
1✔
670
            if not stdout_used:
1✔
671
                logger.info(
1✔
672
                    _(
673
                        "command stdout unused; consider output `File cmd_out = stdout()`"
674
                        " or redirect command to stderr log >&2",
675
                        stdout_file=stdout_file,
676
                    )
677
                )
678

679
    # Helpers to rewrite File/Directory from in-container paths to host paths
680
    # First pass -- convert nonexistent output paths to None/Null
681
    def rewriter1(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
682
        container_path = v.value
1✔
683
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
684
            container_path += "/"
1✔
685
        if container.host_path(container_path) is None:
1✔
686
            logger.warning(
1✔
687
                _(
688
                    "output path not found in container (error unless declared type is optional)",
689
                    output=output_name,
690
                    path=container_path,
691
                )
692
            )
693
            return None
1✔
694
        return v.value
1✔
695

696
    # Second pass -- convert in-container paths to host paths
697
    def rewriter2(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
698
        container_path = v.value
1✔
699
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
700
            container_path += "/"
1✔
701
        host_path = container.host_path(container_path)
1✔
702
        assert host_path is not None
1✔
703
        if isinstance(v, Value.Directory):
1✔
704
            if host_path.endswith("/"):
1✔
705
                host_path = host_path[:-1]
1✔
706
            _check_directory(host_path, output_name)
1✔
707
            logger.debug(_("output dir", container=container_path, host=host_path))
1✔
708
        else:
709
            logger.debug(_("output file", container=container_path, host=host_path))
1✔
710
        return host_path
1✔
711

712
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
713
    outputs = Env.Bindings()
1✔
714
    for decl in task.outputs:
1✔
715
        assert decl.expr
1✔
716
        try:
1✔
717
            v = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type)
1✔
718
        except Error.RuntimeError as exn:
1✔
719
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
720
            raise exn
1✔
721
        except Exception as exn:
×
722
            exn2 = Error.EvalError(decl, str(exn))
×
723
            setattr(exn2, "job_id", decl.workflow_node_id)
×
724
            raise exn2 from exn
×
725
        _warn_struct_extra(logger, decl.name, v)
1✔
726
        vj = json.dumps(v.json)
1✔
727
        logger.info(
1✔
728
            _("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
729
        )
730

731
        # Now, a delicate sequence for postprocessing File outputs (including Files nested within
732
        # compound values)
733

734
        # First convert nonexistent paths to None/Null, and bind this in the environment for
735
        # evaluating subsequent output expressions.
736
        v = Value.rewrite_paths(v, lambda w: rewriter1(w, decl.name))
1✔
737
        env = env.bind(decl.name, v)
1✔
738
        # check if any nonexistent paths were provided for (non-optional) File/Directory types
739
        # Value.Null.coerce has a special behavior for us to raise FileNotFoundError for a
740
        # non-optional File/Directory type.
741
        try:
1✔
742
            v = v.coerce(decl.type)
1✔
743
        except FileNotFoundError:
1✔
744
            exn = OutputError("File/Directory path not found in task output " + decl.name)
1✔
745
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
746
            raise exn
1✔
747
        # Rewrite in-container paths to host paths
748
        v = Value.rewrite_paths(v, lambda w: rewriter2(w, decl.name))
1✔
749
        outputs = outputs.bind(decl.name, v)
1✔
750

751
    return outputs
1✔
752

753

754
def _check_directory(host_path: str, output_name: str) -> None:
1✔
755
    """
756
    traverse output directory to check that all symlinks are relative & resolve inside the dir
757
    """
758

759
    def raiser(exc: OSError):
1✔
760
        raise exc
×
761

762
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
763
        for fn in files:
1✔
764
            fn = os.path.join(root, fn)
1✔
765
            if os.path.islink(fn) and (
1✔
766
                not os.path.exists(fn)
767
                or os.path.isabs(os.readlink(fn))
768
                or not path_really_within(fn, host_path)
769
            ):
770
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
771

772

773
def _warn_struct_extra(
1✔
774
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
775
) -> None:
776
    """
777
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
778
    """
779
    if warned_keys is None:
1✔
780
        warned_keys = set()
1✔
781
    if isinstance(v, Value.Struct) and v.extra:
1✔
782
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
783
        if extra_keys - warned_keys:
1✔
784
            logger.notice(  # pyre-fixme
1✔
785
                _(
786
                    "extraneous keys in struct initializer",
787
                    decl=decl_name,
788
                    struct=str(v.type),
789
                    extra_keys=list(extra_keys),
790
                )
791
            )
792
            warned_keys.update(extra_keys)
1✔
793
    for ch in v.children:
1✔
794
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
795

796

797
def link_outputs(
1✔
798
    cache: CallCache,
799
    outputs: Env.Bindings[Value.Base],
800
    run_dir: str,
801
    hardlinks: bool = False,
802
    use_relative_output_paths: bool = False,
803
) -> Env.Bindings[Value.Base]:
804
    """
805
    Following a successful run, the output files may be scattered throughout a complex directory
806
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
807
    containing nicely organized symlinks to the output files, and rewrite File values in the
808
    outputs env to use these symlinks.
809
    """
810

811
    def link1(target: str, link: str, directory: bool) -> None:
1✔
812
        if hardlinks:
1✔
813
            # TODO: what if target is an input from a different filesystem?
814
            if directory:
1✔
815
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
816
            else:
817
                link_force(target, link)
1✔
818
        else:
819
            symlink_force(target, link)
1✔
820

821
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
822
        if isinstance(v, (Value.File, Value.Directory)):
1✔
823
            target = (
1✔
824
                v.value
825
                if os.path.exists(v.value)
826
                else cache.get_download(v.value, isinstance(v, Value.Directory))
827
            )
828
            if target:
1✔
829
                target = os.path.realpath(target)
1✔
830
                assert os.path.exists(target)
1✔
831
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
832
                    # make symlink relative
833
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
834
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
835
                os.makedirs(dn, exist_ok=False)
1✔
836
                link1(target, link, isinstance(v, Value.Directory))
1✔
837
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
838
                # directory without reference to the output types or JSON for whatever reason. It
839
                # might otherwise have trouble distinguishing Directory outputs among the
840
                # structured subdirectories we create for compound types.
841
                if isinstance(v, Value.Directory):
1✔
842
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as dotfile:
1✔
843
                        pass
1✔
844
                v.value = link
1✔
845
        # recurse into compound values
846
        elif isinstance(v, Value.Array) and v.value:
1✔
847
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
848
            for i in range(len(v.value)):
1✔
849
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
850
        elif isinstance(v, Value.Map):
1✔
851
            # create a subdirectory for each key, as long as the key names seem to make reasonable
852
            # path components; otherwise, treat the dict as a list of its values
853
            keys_ok = (
1✔
854
                sum(
855
                    1
856
                    for b in v.value
857
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
858
                    is None
859
                )
860
                == 0
861
            )
862
            d = int(math.ceil(math.log10(len(v.value))))
1✔
863
            for i, b in enumerate(v.value):
1✔
864
                v.value[i] = (
1✔
865
                    b[0],
866
                    map_paths(
867
                        b[1],
868
                        os.path.join(
869
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
870
                        ),
871
                    ),
872
                )
873
        elif isinstance(v, Value.Pair):
1✔
874
            v.value = (
1✔
875
                map_paths(v.value[0], os.path.join(dn, "left")),
876
                map_paths(v.value[1], os.path.join(dn, "right")),
877
            )
878
        elif isinstance(v, Value.Struct):
1✔
879
            for key in v.value:
1✔
880
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
881
        return v
1✔
882

883
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
884

885
    if use_relative_output_paths:
1✔
886
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
887

888
    return outputs.map(
1✔
889
        lambda binding: Env.Binding(
890
            binding.name,
891
            map_paths(
892
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
893
                os.path.join(run_dir, "out", binding.name),
894
            ),
895
        )
896
    )
897

898

899
def link_outputs_relative(
1✔
900
    link1: Callable[[str, str, bool], None],
901
    cache: CallCache,
902
    outputs: Env.Bindings[Value.Base],
903
    run_dir: str,
904
    hardlinks: bool = False,
905
) -> Env.Bindings[Value.Base]:
906
    """
907
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
908
    the generated files' paths relative to their task working directory.
909
    """
910
    link_destinations = dict()
1✔
911

912
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
913
        target = (
1✔
914
            v.value
915
            if os.path.exists(v.value)
916
            else cache.get_download(v.value, isinstance(v, Value.Directory))
917
        )
918
        if target:
1✔
919
            real_target = os.path.realpath(target)
1✔
920
            rel_link = None
1✔
921
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
922
                # target was generated by current task; use its path relative to the task work dir
923
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
924
                    rel_link = os.path.relpath(
1✔
925
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
926
                    )
927
            else:
928
                # target is an out/ link generated by a call in the current workflow OR a cached
929
                # run; use the link's path relative to that out/ dir, which by induction should
930
                # equal its path relative to the original work/ dir.
931
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
932
                # user's cwd or the task-generated relative path might coincidentally have
933
                # something named 'out'.
934
                p = None
1✔
935
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
936
                    if p and (
1✔
937
                        os.path.isfile(os.path.join(target[:p], "task.log"))
938
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
939
                    ):
940
                        break
1✔
941
                    p = None
1✔
942
                if p and p + 5 < len(target):
1✔
943
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
944
            # if neither of the above cases applies, then fall back to just the target basename
945
            rel_link = rel_link or os.path.basename(target)
1✔
946
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
947
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
948
                raise FileExistsError(
1✔
949
                    "Output filename collision; to allow this, set"
950
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
951
                )
952
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
953
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
954
            link_destinations[abs_link] = real_target
1✔
955
            return abs_link
1✔
956
        return v.value
×
957

958
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
959

960

961
def _warn_output_basename_collisions(
1✔
962
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
963
) -> None:
964
    targets_by_basename = {}
1✔
965

966
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
967
        target = v.value
1✔
968
        if os.path.exists(target):
1✔
969
            target = os.path.realpath(target)
1✔
970
        basename = os.path.basename(target)
1✔
971
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
972
        return v.value
1✔
973

974
    Value.rewrite_env_paths(outputs, walker)
1✔
975

976
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
977
    if collisions:
1✔
978
        logger.warning(
1✔
979
            _(
980
                "multiple output files share the same basename; while miniwdl supports this,"
981
                " consider modifying WDL to ensure distinct output basenames",
982
                basenames=collisions,
983
            )
984
        )
985

986

987
def _delete_work(
1✔
988
    cfg: config.Loader,
989
    logger: logging.Logger,
990
    container: "Optional[runtime.task_container.TaskContainer]",
991
    success: bool,
992
) -> None:
993
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
994
    if container and (
1✔
995
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
996
    ):
997
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
998
            logger.warning(
1✔
999
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1000
            )
1001
            return
1✔
1002
        container.delete_work(logger, delete_streams=not success)
1✔
1003

1004

1005
class _StdLib(StdLib.Base):
1✔
1006
    logger: logging.Logger
1✔
1007
    container: "runtime.task_container.TaskContainer"
1✔
1008
    inputs_only: bool  # if True then only permit access to input files
1✔
1009

1010
    def __init__(
1✔
1011
        self,
1012
        wdl_version: str,
1013
        logger: logging.Logger,
1014
        container: "runtime.task_container.TaskContainer",
1015
        inputs_only: bool,
1016
    ) -> None:
1017
        super().__init__(wdl_version, write_dir=os.path.join(container.host_dir, "write_"))
1✔
1018
        self.logger = logger
1✔
1019
        self.container = container
1✔
1020
        self.inputs_only = inputs_only
1✔
1021

1022
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1023
        # check allowability of reading this file, & map from in-container to host
1024
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1025
        if ans is None:
1✔
1026
            raise OutputError("function was passed non-existent file " + filename)
×
1027
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1028
        return ans
1✔
1029

1030
    def _virtualize_filename(self, filename: str) -> str:
1✔
1031
        # register new file with container input_path_map
1032
        self.container.add_paths([filename])
1✔
1033
        self.logger.debug(
1✔
1034
            _("write_", host=filename, container=self.container.input_path_map[filename])
1035
        )
1036
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1037
        return self.container.input_path_map[filename]
1✔
1038

1039

1040
class InputStdLib(_StdLib):
1✔
1041
    # StdLib for evaluation of task inputs and command
1042
    def __init__(
1✔
1043
        self,
1044
        wdl_version: str,
1045
        logger: logging.Logger,
1046
        container: "runtime.task_container.TaskContainer",
1047
    ) -> None:
1048
        super().__init__(wdl_version, logger, container, True)
1✔
1049

1050

1051
class OutputStdLib(_StdLib):
1✔
1052
    # StdLib for evaluation of task outputs
1053
    def __init__(
1✔
1054
        self,
1055
        wdl_version: str,
1056
        logger: logging.Logger,
1057
        container: "runtime.task_container.TaskContainer",
1058
    ) -> None:
1059
        super().__init__(wdl_version, logger, container, False)
1✔
1060

1061
        setattr(
1✔
1062
            self,
1063
            "stdout",
1064
            StdLib.StaticFunction(
1065
                "stdout",
1066
                [],
1067
                Type.File(),
1068
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1069
            ),
1070
        )
1071
        setattr(
1✔
1072
            self,
1073
            "stderr",
1074
            StdLib.StaticFunction(
1075
                "stderr",
1076
                [],
1077
                Type.File(),
1078
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1079
            ),
1080
        )
1081

1082
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1083
            pat = pattern.coerce(Type.String()).value
1✔
1084
            if not pat:
1✔
1085
                raise OutputError("empty glob() pattern")
×
1086
            assert isinstance(pat, str)
1✔
1087
            if pat[0] == "/":
1✔
1088
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1089
            if pat.startswith("..") or "/.." in pat:
1✔
1090
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1091
            if pat.startswith("./"):
1✔
1092
                pat = pat[2:]
1✔
1093
            # glob the host directory
1094
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1095
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1096
            # convert the host filenames to in-container filenames
1097
            container_files = []
1✔
1098
            for hf in host_files:
1✔
1099
                dstrip = lib.container.host_work_dir()
1✔
1100
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1101
                assert hf.startswith(dstrip)
1✔
1102
                container_files.append(
1✔
1103
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1104
                )
1105
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1106

1107
        setattr(
1✔
1108
            self,
1109
            "glob",
1110
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1111
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc