• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 26354823463

24 May 2026 07:11AM UTC coverage: 95.602% (-0.02%) from 95.619%
26354823463

push

github

web-flow
[WDL 1.2] add new deprecation warnings (#879)

15 of 15 new or added lines in 3 files covered. (100.0%)

2 existing lines in 2 files now uncovered.

8283 of 8664 relevant lines covered (95.6%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.4
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import (
1✔
15
    Tuple,
16
    List,
17
    Dict,
18
    Optional,
19
    Callable,
20
    Set,
21
    Any,
22
    Union,
23
    TYPE_CHECKING,
24
    Generator,
25
    Iterable,
26
)
27
from contextlib import ExitStack, suppress
1✔
28
from collections import Counter
1✔
29

30
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
31
from .._util import (
1✔
32
    WDLVersion,
33
    write_atomic,
34
    write_values_json,
35
    provision_run_dir,
36
    TerminationSignalFlag,
37
    chmod_R_plus,
38
    path_really_within,
39
    LoggingFileHandler,
40
    compose_coroutines,
41
    pathsize,
42
    link_force,
43
    symlink_force,
44
    rmtree_atomic,
45
    wdl_version_geq,
46
)
47
from .._util import StructuredLogMessage as _
1✔
48
from . import config, _statusbar
1✔
49
from .download import able as downloadable, run_cached as download
1✔
50
from .cache import CallCache, new as new_call_cache
1✔
51
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
52

53
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
54
    from .task_container import TaskContainer
×
55

56
TaskPluginCoroutine = Generator[Dict[str, Any], Dict[str, Any], None]
1✔
57

58

59
def run_local_task(  # type: ignore[return]
1✔
60
    cfg: config.Loader,
61
    task: Tree.Task,
62
    inputs: Env.Bindings[Value.Base],
63
    run_id: Optional[str] = None,
64
    run_dir: Optional[str] = None,
65
    logger_prefix: Optional[List[str]] = None,
66
    _run_id_stack: Optional[List[str]] = None,
67
    _cache: Optional[CallCache] = None,
68
    _plugins: Optional[List[Callable[..., Any]]] = None,
69
) -> Tuple[str, Env.Bindings[Value.Base]]:
70
    """
71
    Run a task locally.
72

73
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
74
    paths that can be mounted into a container.
75

76
    :param run_id: unique ID for the run, defaults to workflow name
77
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
78
                    (defaults to current working directory).
79
                    If the final path component is ".", then operate in run_dir directly.
80
    """
81
    from .task_container import new as new_task_container  # delay heavy import
1✔
82

83
    _run_id_stack = _run_id_stack or []
1✔
84
    run_id = run_id or task.name
1✔
85
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
86
    logger = logging.getLogger(".".join(logger_prefix))
1✔
87
    with ExitStack() as cleanup:
1✔
88
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
89
        if terminating():
1✔
90
            raise Terminated(quiet=True)
×
91

92
        # provision run directory and log file
93
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
94
        logfile = os.path.join(run_dir, "task.log")
1✔
95
        cleanup.enter_context(
1✔
96
            LoggingFileHandler(
97
                logger,
98
                logfile,
99
            )
100
        )
101
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
102
            cleanup.enter_context(
1✔
103
                LoggingFileHandler(
104
                    logger,
105
                    logfile + ".json",
106
                    json=True,
107
                )
108
            )
109
        logger.notice(
1✔
110
            _(
111
                "task setup",
112
                name=task.name,
113
                source=task.pos.uri,
114
                line=task.pos.line,
115
                column=task.pos.column,
116
                dir=run_dir,
117
                thread=threading.get_ident(),
118
            )
119
        )
120
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
121

122
        if not _run_id_stack:
1✔
123
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
124
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
125
        else:
126
            cache = _cache
1✔
127
        assert cache
1✔
128

129
        cleanup.enter_context(_statusbar.task_slotted())
1✔
130
        maybe_container = None
1✔
131
        try:
1✔
132
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
133
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
134
            if cached is not None:
1✔
135
                for decl in task.outputs:
1✔
136
                    v = cached[decl.name]
1✔
137
                    vj = json.dumps(v.json)
1✔
138
                    logger.info(
1✔
139
                        _(
140
                            "cached output",
141
                            name=decl.name,
142
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
143
                        )
144
                    )
145
                # create out/ and outputs.json
146
                _outputs = link_outputs(
1✔
147
                    cache,
148
                    cached,
149
                    run_dir,
150
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
151
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
152
                )
153
                write_values_json(
1✔
154
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
155
                )
156
                logger.notice("done (cached)")
1✔
157
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
158
                # cached downstream inputs
159
                return (run_dir, cached)
1✔
160
            # start plugin coroutines and process inputs through them
161
            with compose_coroutines(
1✔
162
                [
163
                    (
164
                        lambda kwargs, cor=cor: cor(  # type: ignore
165
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
166
                        )
167
                    )
168
                    for cor in (
169
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
170
                        + (_plugins or [])
171
                    )
172
                ],
173
                {"inputs": inputs},
174
            ) as plugins:
175
                recv = next(plugins)
1✔
176
                inputs = recv["inputs"]
1✔
177

178
                # download input files, if needed
179
                posix_inputs = _download_input_files(
1✔
180
                    cfg,
181
                    logger,
182
                    logger_prefix,
183
                    run_dir,
184
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
185
                    cache,
186
                )
187

188
                # create TaskContainer according to configuration
189
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
190
                maybe_container = container
1✔
191

192
                # evaluate input/postinput declarations, including mapping from host to
193
                # in-container file paths
194
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
195

196
                # evaluate runtime fields
197
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
198
                _eval_task_runtime(
1✔
199
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
200
                )
201
                if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
202
                    container.build_task_runtime_info_struct(logger, run_id, task)
1✔
203
                    assert container.task_runtime_info_struct is not None
1✔
204
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
205

206
                # start container & run command (and retry if needed)
207
                container = _try_task(
1✔
208
                    cfg,
209
                    task,
210
                    logger,
211
                    plugins,
212
                    container,
213
                    container_env,
214
                    terminating,
215
                )
216

217
                # bind output declarations to task runtime info with the final return code
218
                if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
219
                    assert container.try_counter >= 1
1✔
220
                    container.update_task_runtime_info_struct(
1✔
221
                        attempt=Value.Int(container.try_counter - 1),
222
                        return_code=(
223
                            Value.Int(container.last_exit_code)
224
                            if container.last_exit_code is not None
225
                            else Value.Null()
226
                        ),
227
                    )
228
                    assert container.task_runtime_info_struct is not None
1✔
229
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
230

231
                # evaluate output declarations
232
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
233

234
                # create output_links
235
                outputs = link_outputs(
1✔
236
                    cache,
237
                    outputs,
238
                    run_dir,
239
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
240
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
241
                )
242

243
                # process outputs through plugins
244
                recv = plugins.send({"outputs": outputs})
1✔
245
                outputs = recv["outputs"]
1✔
246

247
                # clean up, if so configured, and make sure output files will be accessible to
248
                # downstream tasks
249
                _delete_work(cfg, logger, container, True)
1✔
250
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
251
                _warn_output_basename_collisions(logger, outputs)
1✔
252

253
                # write outputs.json
254
                write_values_json(
1✔
255
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
256
                )
257
                logger.notice("done")
1✔
258
                if not run_id.startswith("download-"):
1✔
259
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
260
                return (run_dir, outputs)
1✔
261
        except Exception as exn:
1✔
262
            tbtxt = traceback.format_exc()
1✔
263
            logger.debug(tbtxt)
1✔
264
            wrapper = RunFailed(task, run_id, run_dir)
1✔
265
            logmsg = _(
1✔
266
                str(wrapper),
267
                dir=run_dir,
268
                **error_json(
269
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
270
                ),
271
            )
272
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
UNCOV
273
                logger.debug(logmsg)
×
274
            else:
275
                logger.error(logmsg)
1✔
276
            try:
1✔
277
                write_atomic(
1✔
278
                    json.dumps(
279
                        error_json(
280
                            wrapper,
281
                            cause=exn,
282
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
283
                        ),
284
                        indent=2,
285
                    ),
286
                    os.path.join(run_dir, "error.json"),
287
                )
288
            except Exception as exn2:
×
289
                logger.debug(traceback.format_exc())
×
290
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
291
            try:
1✔
292
                if maybe_container:
1✔
293
                    _delete_work(cfg, logger, maybe_container, False)
1✔
294
            except Exception as exn2:
×
295
                logger.debug(traceback.format_exc())
×
296
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
297
            raise wrapper from exn
1✔
298

299

300
def _download_input_files(
1✔
301
    cfg: config.Loader,
302
    logger: logging.Logger,
303
    logger_prefix: List[str],
304
    run_dir: str,
305
    inputs: Env.Bindings[Value.Base],
306
    cache: CallCache,
307
) -> Env.Bindings[Value.Base]:
308
    """
309
    Find all File & Directory input values that are downloadable URIs (including any nested within
310
    compound values). Download them to some location under run_dir and return a copy of the inputs
311
    with the URI values replaced by the downloaded paths.
312
    """
313

314
    downloads = 0
1✔
315
    download_bytes = 0
1✔
316
    cached_hits = 0
1✔
317

318
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
319
        nonlocal downloads, download_bytes, cached_hits
320
        directory = isinstance(v, Value.Directory)
1✔
321
        uri = v.value
1✔
322
        if downloadable(cfg, uri, directory=directory):
1✔
323
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
324
            cached, filename = download(
1✔
325
                cfg,
326
                logger,
327
                cache,
328
                uri,
329
                directory=directory,
330
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
331
                logger_prefix=logger_prefix + [f"download{downloads}"],
332
            )
333
            if cached:
1✔
334
                cached_hits += 1
1✔
335
            else:
336
                sz = pathsize(filename)
1✔
337
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
338
                downloads += 1
1✔
339
                download_bytes += sz
1✔
340
            return filename
1✔
341
        return uri
1✔
342

343
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
344
    if downloads or cached_hits:
1✔
345
        logger.notice(
1✔
346
            _(
347
                "processed input URIs",
348
                downloaded=downloads,
349
                downloaded_bytes=download_bytes,
350
                cached=cached_hits,
351
            )
352
        )
353
    return ans
1✔
354

355

356
def _add_downloadable_defaults(
1✔
357
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
358
) -> Env.Bindings[Value.Base]:
359
    """
360
    Look for available File/Directory inputs that default to a string constant appearing to be a
361
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
362
    not already overridden in them).
363

364
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
365
    declarations until after processing downloads.
366
    """
367
    ans = inputs
1✔
368
    for b in available_inputs:
1✔
369
        if (
1✔
370
            isinstance(b.value.type, (Type.File, Type.Directory))
371
            and b.name not in ans
372
            and isinstance(b.value.expr, Expr.String)
373
        ):
374
            directory = isinstance(b.value.type, Type.Directory)
1✔
375
            maybe_uri = b.value.expr.literal
1✔
376
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
377
                v = (
1✔
378
                    Value.Directory(maybe_uri.value, b.value.expr)
379
                    if directory
380
                    else Value.File(maybe_uri.value, b.value.expr)
381
                )
382
                ans = ans.bind(b.name, v)
1✔
383
    return ans
1✔
384

385

386
def _eval_task_inputs(
1✔
387
    logger: logging.Logger,
388
    task: Tree.Task,
389
    posix_inputs: Env.Bindings[Value.Base],
390
    container: "TaskContainer",
391
) -> Env.Bindings[Value.Base]:
392
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
393
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
394
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
395
    # supplied None to override any default.
396
    input_decls = task.available_inputs
1✔
397
    posix_inputs = posix_inputs.filter(
1✔
398
        lambda b: (
399
            not (
400
                isinstance(b.value, Value.Null)
401
                and b.name in input_decls
402
                and input_decls[b.name].expr
403
                and not input_decls[b.name].type.optional
404
            )
405
        )
406
    )
407

408
    # Map all the provided input File & Directory paths to in-container paths
409
    container.add_paths(_fspaths(posix_inputs))
1✔
410
    _warn_input_basename_collisions(logger, container)
1✔
411

412
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
413
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
414
        p = fn.value.rstrip("/")
1✔
415
        if isinstance(fn, Value.Directory):
1✔
416
            p += "/"
1✔
417
        return container.input_path_map[p]
1✔
418

419
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
420

421
    # initialize value environment with the inputs
422
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
423
    for b in container_inputs:
1✔
424
        assert isinstance(b, Env.Binding)
1✔
425
        v = b.value
1✔
426
        assert isinstance(v, Value.Base)
1✔
427
        container_env = container_env.bind(b.name, v)
1✔
428
        vj = json.dumps(v.json)
1✔
429
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
430

431
    # collect remaining declarations requiring evaluation.
432
    decls_to_eval = _task_decl_eval_order(
1✔
433
        decl
434
        for decl in (task.inputs or []) + task.postinputs
435
        if not container_env.has_binding(decl.name)
436
    )
437

438
    # evaluate each declaration in that order
439
    # note: the write_* functions call container.add_paths as a side-effect
440
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
441
    for decl in decls_to_eval:
1✔
442
        assert isinstance(decl, Tree.Decl)
1✔
443
        v = _eval_task_decl(
1✔
444
            logger,
445
            decl,
446
            container_env,
447
            stdlib,
448
            lambda value: _postprocess_task_decl_paths(
449
                decl,
450
                value,
451
                lambda w: _task_decl_input_directory_child_path(decl.name, w, container),
452
                lambda name: Error.InputError(
453
                    f"File/Directory path not found in task declaration {name}"
454
                ),
455
            ),
456
        )
457
        vj = json.dumps(v.json)
1✔
458
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
459
        container_env = container_env.bind(decl.name, v)
1✔
460

461
    return container_env
1✔
462

463

464
def _task_decl_eval_order(decls: Iterable[Tree.Decl]) -> List[Tree.Decl]:
1✔
465
    """
466
    Topologically sort task declarations for evaluation.
467
    """
468
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(list(decls))
1✔
469
    ans = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
470
    # NOTE: _util.topsort() throws on cycles, but those should have been rejected in static
471
    # typechecking prior to this.
472
    assert len(decls_by_id) == len(ans)
1✔
473
    return ans
1✔
474

475

476
def _eval_task_decl(
1✔
477
    logger: logging.Logger,
478
    decl: Tree.Decl,
479
    env: Env.Bindings[Value.Base],
480
    stdlib: StdLib.Base,
481
    postprocess_paths: Callable[[Value.Base], Value.Base],
482
) -> Value.Base:
483
    """
484
    Evaluate one task declaration and apply File/Directory path rewriting logic (which differs
485
    between input/private and output declarations).
486
    """
487
    try:
1✔
488
        value = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type) if decl.expr else Value.Null()
1✔
489
        _warn_struct_extra(logger, decl.name, value)
1✔
490
        return postprocess_paths(value)
1✔
491
    except Error.RuntimeError as exn:
1✔
492
        setattr(exn, "job_id", decl.workflow_node_id)
1✔
493
        raise exn
1✔
494
    except Exception as exn:
×
495
        exn2 = Error.EvalError(decl, str(exn))
×
496
        setattr(exn2, "job_id", decl.workflow_node_id)
×
497
        raise exn2 from exn
×
498

499

500
def _postprocess_task_decl_paths(
1✔
501
    decl: Tree.Decl,
502
    value: Value.Base,
503
    missing_path: Callable[[Union[Value.File, Value.Directory]], Optional[str]],
504
    missing_error: Callable[[str], Error.RuntimeError],
505
) -> Value.Base:
506
    """
507
    Replace non-existent File/Directory paths with None (Value.Null), then coerce to the
508
    declaration type (which may raise if the declaration is non-optional).
509
    """
510
    value = Value.rewrite_paths(value, missing_path)
1✔
511
    try:
1✔
512
        return value.coerce(decl.type)
1✔
513
    except FileNotFoundError:  # from Value.Null.coerce(File|Directory)
1✔
514
        err = missing_error(decl.name)
1✔
515
        setattr(err, "job_id", decl.workflow_node_id)
1✔
516
        raise err
1✔
517

518

519
def _task_decl_input_directory_child_path(
1✔
520
    decl_name: str, v: Union[Value.File, Value.Directory], container: "TaskContainer"
521
) -> Optional[str]:
522
    """
523
    Check a task File/Directory path formulated by path logic from another input Directory.
524

525
    input {
526
        Directory d
527
    }
528
    File f = join_paths(d, "file.txt")
529
    Directory? maybe = join_paths(d, "maybe/")
530

531
    Existing children must match the declared File/Directory kind. Directory children are returned
532
    with the trailing slash expected by TaskContainer. Missing children return None so optional
533
    declarations can become Null.
534
    """
535
    isdir = isinstance(v, Value.Directory)
1✔
536
    container_path = v.value.rstrip("/") + ("/" if isdir else "")
1✔
537
    found_input, host_path = container._input_host_path(container_path)
1✔
538
    if not found_input:
1✔
539
        return v.value
1✔
540
    assert host_path is not None
1✔
541
    if not os.path.exists(host_path.rstrip("/")):
1✔
542
        return None  # induces to Value.Null()
1✔
543
    if os.path.isdir(host_path) if isdir else os.path.isfile(host_path):
1✔
544
        return container_path if isdir else v.value
1✔
545
    raise Error.InputError(
×
546
        f"task declaration {decl_name} uses file/directory with the wrong type: " + v.value
547
    )
548

549

550
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
551
    """
552
    Get the unique paths of all File & Directory values in the environment. Directory paths will
553
    have a trailing '/'.
554
    """
555
    ans = set()
1✔
556

557
    def collector(v: Value.Base) -> None:
1✔
558
        if isinstance(v, Value.File):
1✔
559
            assert not v.value.endswith("/")
1✔
560
            ans.add(v.value)
1✔
561
        elif isinstance(v, Value.Directory):
1✔
562
            ans.add(v.value.rstrip("/") + "/")
1✔
563
        for ch in v.children:
1✔
564
            collector(ch)
1✔
565

566
    for b in env:
1✔
567
        collector(b.value)
1✔
568
    return ans
1✔
569

570

571
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
572
    basenames = Counter(
1✔
573
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
574
    )
575
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
576
    if collisions:
1✔
577
        logger.warning(
1✔
578
            _(
579
                "mounting input files with colliding basenames in separate container directories",
580
                basenames=collisions,
581
            )
582
        )
583

584

585
def _eval_task_runtime(
1✔
586
    cfg: config.Loader,
587
    logger: logging.Logger,
588
    run_id: str,
589
    task: Tree.Task,
590
    inputs: Env.Bindings[Value.Base],
591
    container: "TaskContainer",
592
    env: Env.Bindings[Value.Base],
593
    stdlib: StdLib.Base,
594
) -> None:
595
    # evaluate runtime{} expressions (merged with any configured defaults)
596
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
597
    if run_id.startswith("download-"):
1✔
598
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
599
    runtime_values = {}
1✔
600
    for key, v in runtime_defaults.items():
1✔
601
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
602
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
603
        runtime_values[key] = expr.eval(env, stdlib)
1✔
604
    for b in inputs.enter_namespace("runtime"):
1✔
605
        runtime_values[b.name] = b.value  # input overrides
1✔
606
    for b in inputs.enter_namespace("requirements"):
1✔
607
        runtime_values[b.name] = b.value
1✔
608
    if "return_codes" in runtime_values and wdl_version_geq(
1✔
609
        task.effective_wdl_version, WDLVersion.V1_2
610
    ):
611
        runtime_values["returnCodes"] = runtime_values.pop("return_codes")
1✔
612
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
613

614
    # have container implementation validate & postprocess into container.runtime_values
615
    container.process_runtime(logger, runtime_values)
1✔
616

617
    if container.runtime_values:
1✔
618
        logger.info(_("effective runtime", **container.runtime_values))
1✔
619

620
    # add any configured overrides for in-container environment variables
621
    container.runtime_values.setdefault("env", {})
1✔
622
    env_vars_override = {}
1✔
623
    env_vars_skipped = []
1✔
624
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
625
        if ev_value is None:
1✔
626
            try:
1✔
627
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
628
            except KeyError:
1✔
629
                env_vars_skipped.append(ev_name)
1✔
630
        else:
631
            env_vars_override[ev_name] = str(ev_value)
1✔
632
    if env_vars_skipped:
1✔
633
        logger.warning(
1✔
634
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
635
        )
636
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
637
        "file_io", "mount_tmpdir_for"
638
    ):
639
        env_vars_override["TMPDIR"] = os.path.join(
1✔
640
            container.container_dir, "work", "_miniwdl_tmpdir"
641
        )
642
    if env_vars_override:
1✔
643
        # usually don't dump values into log, as they may often be auth tokens
644
        logger.notice(
1✔
645
            _(
646
                "overriding environment variables (portability warning)",
647
                names=list(env_vars_override.keys()),
648
            )
649
        )
650
        logger.debug(
1✔
651
            _("overriding environment variables (portability warning)", **env_vars_override)
652
        )
653
        container.runtime_values["env"].update(env_vars_override)
1✔
654

655
    # process decls with "env" decorator
656
    env_decls: Dict[str, Value.Base] = {}
1✔
657
    for decl in (task.inputs or []) + task.postinputs:
1✔
658
        if decl.decor.get("env", False) is True:
1✔
659
            v = env[decl.name]
1✔
660
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
661
                v = v.value
1✔
662
            else:
663
                v = json.dumps(v.json)
1✔
664
            env_decls[decl.name] = v
1✔
665
    container.runtime_values["env"].update(env_decls)
1✔
666

667
    unused_keys = list(
1✔
668
        key
669
        for key in runtime_values
670
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
671
    )
672
    if unused_keys:
1✔
673
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
674

675

676
def _try_task(
1✔
677
    cfg: config.Loader,
678
    task: Tree.Task,
679
    logger: logging.Logger,
680
    plugins: TaskPluginCoroutine,
681
    container: "TaskContainer",
682
    container_env: Env.Bindings[Value.Base],
683
    terminating: Callable[[], bool],
684
) -> "TaskContainer":
685
    """
686
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
687
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
688
    """
689
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
690

691
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
692
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
693
    retries = 0
1✔
694
    interruptions = 0
1✔
695

696
    command = None
1✔
697
    plugin_changed_command = False
1✔
698
    assert isinstance(task.command, Expr.TaskCommand)
1✔
699
    command_uses_task_attempt = _task_command_uses_task_attempt(task.command)
1✔
700

701
    while True:
×
702
        if terminating():
1✔
703
            raise Terminated()
1✔
704

705
        if command is None or command_uses_task_attempt:
1✔
706
            command = _eval_task_command(
1✔
707
                cfg,
708
                task,
709
                logger,
710
                container,
711
                container_env,
712
                attempt=container.try_counter - 1,
713
            )
714
            if container.try_counter == 1:
1✔
715
                assert retries == 0 and interruptions == 0 and not plugin_changed_command
1✔
716
                # let plugin(s) process command & container
717
                recv = plugins.send({"command": command, "container": container})
1✔
718
                plugin_command, container = (recv[k] for k in ("command", "container"))
1✔
719
                if plugin_command != command:
1✔
720
                    plugin_changed_command = True
1✔
721
                    command = plugin_command
1✔
722
        assert isinstance(command, str)
1✔
723
        logger.debug(_("command", command=command.strip()))
1✔
724

725
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
726
            "file_io", "copy_input_files_for"
727
        ):
728
            # must follow command interpolation, which can add new input files via write_*
729
            container.copy_input_files(logger)
1✔
730
        host_tmpdir = (
1✔
731
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
732
            if cfg.get_bool("file_io", "mount_tmpdir")
733
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
734
            else None
735
        )
736

737
        try:
1✔
738
            # start container & run command
739
            if host_tmpdir:
1✔
740
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
741
                os.mkdir(host_tmpdir, mode=0o770)
1✔
742
            try:
1✔
743
                container.run(logger, command)
1✔
744
                return container
1✔
745
            finally:
746
                if host_tmpdir:
1✔
747
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
748
                    rmtree_atomic(host_tmpdir)
1✔
749
                if (
1✔
750
                    "preemptible" in container.runtime_values
751
                    and cfg.has_option("task_runtime", "_mock_interruptions")
752
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
753
                ):
754
                    raise Interrupted("mock interruption") from None
1✔
755
        except Exception as exn:
1✔
756
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
757
                logger.error(
1✔
758
                    _(
759
                        "interrupted task will be retried",
760
                        error=exn.__class__.__name__,
761
                        message=str(exn),
762
                        prev_interruptions=interruptions,
763
                        max_interruptions=max_interruptions,
764
                    )
765
                )
766
                interruptions += 1
1✔
767
            elif (
1✔
768
                not isinstance(exn, (Terminated, DockerBuildError))
769
                and retries < max_retries
770
                and not terminating()
771
            ):
772
                logger.error(
1✔
773
                    _(
774
                        "failed task will be retried",
775
                        error=exn.__class__.__name__,
776
                        message=str(exn),
777
                        prev_retries=retries,
778
                        max_retries=max_retries,
779
                    )
780
                )
781
                retries += 1
1✔
782
            else:
783
                raise
1✔
784
            if command_uses_task_attempt and plugin_changed_command:
1✔
785
                # Our plugin API, designed well before the addition of `task.attempt` in WDL 1.2,
786
                # doesn't allow for reprocessing the command after a retry; to be safe, we fail if
787
                # the command uses `task.attempt` and the plugin changed the (first-try) command.
788
                raise Error.RuntimeError(
1✔
789
                    "task command uses task.attempt, but a task plugin changed the command; "
790
                    "cannot retry with an updated task.attempt value"
791
                ) from exn
792
            _delete_work(cfg, logger, container, False)
1✔
793
            container.reset(logger)
1✔
794

795

796
def _eval_task_command(
1✔
797
    cfg: config.Loader,
798
    task: Tree.Task,
799
    logger: logging.Logger,
800
    container: "TaskContainer",
801
    container_env: Env.Bindings[Value.Base],
802
    attempt: int,
803
) -> str:
804
    """
805
    Evaluate the task command expression. In WDL 1.2, this may occur multiple times if retrying and
806
    the command uses `task.attempt`.
807
    """
808
    assert attempt >= 0
1✔
809
    command_env = container_env
1✔
810
    if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
811
        container.update_task_runtime_info_struct(
1✔
812
            attempt=Value.Int(attempt),
813
            return_code=Value.Null(),
814
        )
815
        assert container.task_runtime_info_struct is not None
1✔
816
        command_env = command_env.bind("task", container.task_runtime_info_struct)
1✔
817
    old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
818
    # pylint: disable=E1101
819
    placeholder_re = regex.compile(cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX)
1✔
820
    command_stdlib = InputStdLib(
1✔
821
        task.effective_wdl_version,
822
        logger,
823
        container,
824
        eval_context=StdLib.EvalContext(placeholder_regex=placeholder_re),
825
    )
826
    assert isinstance(task.command, Expr.TaskCommand)
1✔
827
    ans = task.command.eval(command_env, command_stdlib, dedent=not old_command_dedent).value
1✔
828
    if old_command_dedent:  # see issue #674
1✔
829
        ans = _util.strip_leading_whitespace(ans)[1]
1✔
830
    return ans
1✔
831

832

833
def _task_command_uses_task_attempt(command: Expr.TaskCommand) -> bool:
1✔
834
    """
835
    Test whether the command uses WDL 1.2's `task.attempt` (which necessitates re-evaluating the
836
    command on retry).
837
    """
838
    exprs = [part.expr for part in command.parts if isinstance(part, Expr.Placeholder)]
1✔
839
    while exprs:
1✔
840
        expr = exprs.pop()
1✔
841
        if (
1✔
842
            isinstance(expr, Expr.Get)
843
            and expr.member == "attempt"
844
            and isinstance(expr.expr, Expr.Get)
845
            and expr.expr.member is None
846
            and isinstance(expr.expr.expr, Expr.Ident)
847
            and expr.expr.expr.name == "task"
848
        ) or (isinstance(expr, Expr.Ident) and expr.name == "task.attempt"):
849
            return True
1✔
850
        exprs.extend(child for child in expr.children if isinstance(child, Expr.Base))
1✔
851
    return False
1✔
852

853

854
def _eval_task_outputs(
1✔
855
    logger: logging.Logger,
856
    run_id: str,
857
    task: Tree.Task,
858
    env: Env.Bindings[Value.Base],
859
    container: "TaskContainer",
860
) -> Env.Bindings[Value.Base]:
861
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
862
    with suppress(FileNotFoundError):
1✔
863
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
864
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
865
            # courtesy log message directing user where to find it
866
            stdout_used = False
1✔
867
            expr_stack = [outp.expr for outp in task.outputs]
1✔
868
            while expr_stack:
1✔
869
                expr = expr_stack.pop()
1✔
870
                assert isinstance(expr, Expr.Base)
1✔
871
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
872
                    stdout_used = True
1✔
873
                else:
874
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
875
            if not stdout_used:
1✔
876
                logger.info(
1✔
877
                    _(
878
                        "command stdout unused; consider output `File cmd_out = stdout()`"
879
                        " or redirect command to stderr log >&2",
880
                        stdout_file=stdout_file,
881
                    )
882
                )
883

884
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
885
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
886

887
    # evaluate output declarations in dependency order
888
    for decl in _task_decl_eval_order(task.outputs):
1✔
889
        assert decl.expr
1✔
890
        # evaluate and check existence of in-container File/Directory output paths (tolerating
891
        # non-existence for optional outputs); bind to env for subsequent decls
892
        v = _eval_task_decl(
1✔
893
            logger,
894
            decl,
895
            env,
896
            stdlib,
897
            lambda value: _postprocess_task_output_decl_paths(logger, decl, value, container),
898
        )
899
        env = env.bind(decl.name, v)
1✔
900

901
        # rewrite in-container File/Directory paths to host paths, bind in outputs env
902
        try:
1✔
903
            v = Value.rewrite_paths(
1✔
904
                v, lambda w: _task_output_host_path(logger, decl.name, w, container)
905
            )
906
        except Error.RuntimeError as exn:
1✔
907
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
908
            raise exn
1✔
909
        outputs = outputs.bind(decl.name, v)
1✔
910

911
    return outputs
1✔
912

913

914
def _postprocess_task_output_decl_paths(
1✔
915
    logger: logging.Logger,
916
    decl: Tree.Decl,
917
    value: Value.Base,
918
    container: "TaskContainer",
919
) -> Value.Base:
920
    """
921
    Log a task output value, then normalize missing File/Directory paths.
922
    """
923
    vj = json.dumps(value.json)
1✔
924
    logger.info(
1✔
925
        _("output", name=decl.name, value=(value.json if len(vj) < 4096 else "(((large)))"))
926
    )
927
    return _postprocess_task_decl_paths(
1✔
928
        decl,
929
        value,
930
        lambda v: _task_output_missing_path(v, container),
931
        lambda name: OutputError("File/Directory path not found in task output " + name),
932
    )
933

934

935
def _task_output_missing_path(
1✔
936
    v: Union[Value.File, Value.Directory],
937
    container: "TaskContainer",
938
) -> Optional[str]:
939
    """
940
    Return None for a task output File/Directory path missing from the container.
941
    """
942
    container_path = v.value
1✔
943
    if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
944
        container_path += "/"
1✔
945
    if container.host_path(container_path) is None:
1✔
946
        return None
1✔
947
    return v.value
1✔
948

949

950
def _task_output_host_path(
1✔
951
    logger: logging.Logger,
952
    output_name: str,
953
    v: Union[Value.File, Value.Directory],
954
    container: "TaskContainer",
955
) -> Optional[str]:
956
    """
957
    Rewrite an existing task output File/Directory path from container path to host path.
958
    """
959
    container_path = v.value
1✔
960
    if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
961
        container_path += "/"
1✔
962
    host_path = container.host_path(container_path)
1✔
963
    assert host_path is not None
1✔
964
    if isinstance(v, Value.Directory):
1✔
965
        if host_path.endswith("/"):
1✔
966
            host_path = host_path[:-1]
1✔
967
        _check_directory(host_path, output_name)
1✔
968
        logger.debug(_("output dir", container=container_path, host=host_path))
1✔
969
    else:
970
        logger.debug(_("output file", container=container_path, host=host_path))
1✔
971
    return host_path
1✔
972

973

974
def _check_directory(host_path: str, output_name: str) -> None:
1✔
975
    """
976
    traverse output directory to check that all symlinks are relative & resolve inside the dir
977
    """
978

979
    def raiser(exc: OSError):
1✔
980
        raise exc
×
981

982
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
983
        for fn in files:
1✔
984
            fn = os.path.join(root, fn)
1✔
985
            if os.path.islink(fn) and (
1✔
986
                not os.path.exists(fn)
987
                or os.path.isabs(os.readlink(fn))
988
                or not path_really_within(fn, host_path)
989
            ):
990
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
991

992

993
def _warn_struct_extra(
1✔
994
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
995
) -> None:
996
    """
997
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
998
    """
999
    if warned_keys is None:
1✔
1000
        warned_keys = set()
1✔
1001
    if isinstance(v, Value.Struct) and v.extra:
1✔
1002
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
1003
        if extra_keys - warned_keys:
1✔
1004
            logger.notice(
1✔
1005
                _(
1006
                    "extraneous keys in struct initializer",
1007
                    decl=decl_name,
1008
                    struct=str(v.type),
1009
                    extra_keys=list(extra_keys),
1010
                )
1011
            )
1012
            warned_keys.update(extra_keys)
1✔
1013
    for ch in v.children:
1✔
1014
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
1015

1016

1017
def link_outputs(
1✔
1018
    cache: CallCache,
1019
    outputs: Env.Bindings[Value.Base],
1020
    run_dir: str,
1021
    hardlinks: bool = False,
1022
    use_relative_output_paths: bool = False,
1023
) -> Env.Bindings[Value.Base]:
1024
    """
1025
    Following a successful run, the output files may be scattered throughout a complex directory
1026
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
1027
    containing nicely organized symlinks to the output files, and rewrite File values in the
1028
    outputs env to use these symlinks.
1029
    """
1030

1031
    def link1(target: str, link: str, directory: bool) -> None:
1✔
1032
        if hardlinks:
1✔
1033
            # TODO: what if target is an input from a different filesystem?
1034
            if directory:
1✔
1035
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
1036
            else:
1037
                link_force(target, link)
1✔
1038
        else:
1039
            symlink_force(target, link)
1✔
1040

1041
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
1042
        if isinstance(v, (Value.File, Value.Directory)):
1✔
1043
            target = (
1✔
1044
                v.value
1045
                if os.path.exists(v.value)
1046
                else cache.get_download(v.value, isinstance(v, Value.Directory))
1047
            )
1048
            if target:
1✔
1049
                target = os.path.realpath(target)
1✔
1050
                assert os.path.exists(target)
1✔
1051
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
1052
                    # make symlink relative
1053
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
1054
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
1055
                os.makedirs(dn, exist_ok=False)
1✔
1056
                link1(target, link, isinstance(v, Value.Directory))
1✔
1057
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
1058
                # directory without reference to the output types or JSON for whatever reason. It
1059
                # might otherwise have trouble distinguishing Directory outputs among the
1060
                # structured subdirectories we create for compound types.
1061
                if isinstance(v, Value.Directory):
1✔
1062
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
1063
                        pass
1✔
1064
                v.value = link
1✔
1065
        # recurse into compound values
1066
        elif isinstance(v, Value.Array) and v.value:
1✔
1067
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
1068
            for i in range(len(v.value)):
1✔
1069
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
1070
        elif isinstance(v, Value.Map) and v.value:
1✔
1071
            # create a subdirectory for each key, as long as the key names seem to make reasonable
1072
            # path components; otherwise, treat the dict as a list of its values
1073
            keys_ok = (
1✔
1074
                sum(
1075
                    1
1076
                    for b in v.value
1077
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
1078
                    is None
1079
                )
1080
                == 0
1081
            )
1082
            d = int(math.ceil(math.log10(len(v.value))))
1✔
1083
            for i, b in enumerate(v.value):
1✔
1084
                v.value[i] = (
1✔
1085
                    b[0],
1086
                    map_paths(
1087
                        b[1],
1088
                        os.path.join(
1089
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
1090
                        ),
1091
                    ),
1092
                )
1093
        elif isinstance(v, Value.Pair):
1✔
1094
            v.value = (
1✔
1095
                map_paths(v.value[0], os.path.join(dn, "left")),
1096
                map_paths(v.value[1], os.path.join(dn, "right")),
1097
            )
1098
        elif isinstance(v, Value.Struct):
1✔
1099
            for key in v.value:
1✔
1100
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
1101
        return v
1✔
1102

1103
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
1104

1105
    if use_relative_output_paths:
1✔
1106
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
1107

1108
    return outputs.map(
1✔
1109
        lambda binding: Env.Binding(
1110
            binding.name,
1111
            map_paths(
1112
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
1113
                os.path.join(run_dir, "out", binding.name),
1114
            ),
1115
        )
1116
    )
1117

1118

1119
def link_outputs_relative(
1✔
1120
    link1: Callable[[str, str, bool], None],
1121
    cache: CallCache,
1122
    outputs: Env.Bindings[Value.Base],
1123
    run_dir: str,
1124
    hardlinks: bool = False,
1125
) -> Env.Bindings[Value.Base]:
1126
    """
1127
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
1128
    the generated files' paths relative to their task working directory.
1129
    """
1130
    link_destinations: Dict[str, str] = dict()
1✔
1131

1132
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
1133
        target = (
1✔
1134
            v.value
1135
            if os.path.exists(v.value)
1136
            else cache.get_download(v.value, isinstance(v, Value.Directory))
1137
        )
1138
        if target:
1✔
1139
            real_target = os.path.realpath(target)
1✔
1140
            rel_link = None
1✔
1141
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
1142
                # target was generated by current task; use its path relative to the task work dir
1143
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
1144
                    rel_link = os.path.relpath(
1✔
1145
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
1146
                    )
1147
            else:
1148
                # target is an out/ link generated by a call in the current workflow OR a cached
1149
                # run; use the link's path relative to that out/ dir, which by induction should
1150
                # equal its path relative to the original work/ dir.
1151
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
1152
                # user's cwd or the task-generated relative path might coincidentally have
1153
                # something named 'out'.
1154
                p = None
1✔
1155
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
1156
                    if p and (
1✔
1157
                        os.path.isfile(os.path.join(target[:p], "task.log"))
1158
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
1159
                    ):
1160
                        break
1✔
1161
                    p = None
1✔
1162
                if p and p + 5 < len(target):
1✔
1163
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
1164
            # if neither of the above cases applies, then fall back to just the target basename
1165
            rel_link = rel_link or os.path.basename(target)
1✔
1166
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
1167
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
1168
                raise FileExistsError(
1✔
1169
                    "Output filename collision; to allow this, set"
1170
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
1171
                )
1172
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
1173
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
1174
            link_destinations[abs_link] = real_target
1✔
1175
            return abs_link
1✔
1176
        return v.value
×
1177

1178
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
1179

1180

1181
def _warn_output_basename_collisions(
1✔
1182
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
1183
) -> None:
1184
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
1185

1186
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
1187
        target = v.value
1✔
1188
        if os.path.exists(target):
1✔
1189
            target = os.path.realpath(target)
1✔
1190
        basename = os.path.basename(target)
1✔
1191
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
1192
        return v.value
1✔
1193

1194
    Value.rewrite_env_paths(outputs, walker)
1✔
1195

1196
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
1197
    if collisions:
1✔
1198
        logger.warning(
1✔
1199
            _(
1200
                "multiple output files share the same basename; while miniwdl supports this,"
1201
                " consider modifying WDL to ensure distinct output basenames",
1202
                basenames=collisions,
1203
            )
1204
        )
1205

1206

1207
def _delete_work(
1✔
1208
    cfg: config.Loader,
1209
    logger: logging.Logger,
1210
    container: "Optional[TaskContainer]",
1211
    success: bool,
1212
) -> None:
1213
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1214
    if container and (
1✔
1215
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1216
    ):
1217
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1218
            logger.warning(
1✔
1219
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1220
            )
1221
            return
1✔
1222
        container.delete_work(logger, delete_streams=not success)
1✔
1223

1224

1225
class _StdLib(StdLib.Base):
1✔
1226
    logger: logging.Logger
1✔
1227
    container: "TaskContainer"
1✔
1228
    inputs_only: bool  # if True then only permit access to input files
1✔
1229

1230
    def __init__(
1✔
1231
        self,
1232
        wdl_version: str,
1233
        logger: logging.Logger,
1234
        container: "TaskContainer",
1235
        inputs_only: bool,
1236
        eval_context: Optional[StdLib.EvalContext] = None,
1237
    ) -> None:
1238
        super().__init__(
1✔
1239
            wdl_version,
1240
            write_dir=os.path.join(container.host_dir, "write_"),
1241
            eval_context=eval_context,
1242
        )
1243
        self.logger = logger
1✔
1244
        self.container = container
1✔
1245
        self.inputs_only = inputs_only
1✔
1246

1247
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1248
        # check allowability of reading this file, & map from in-container to host
1249
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1250
        if ans is None:
1✔
1251
            raise OutputError("function was passed non-existent file " + filename)
×
1252
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1253
        return ans
1✔
1254

1255
    def _virtualize_filename(self, filename: str) -> str:
1✔
1256
        # register new file with container input_path_map
1257
        self.container.add_paths([filename])
1✔
1258
        self.logger.debug(
1✔
1259
            _("write_", host=filename, container=self.container.input_path_map[filename])
1260
        )
1261
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1262
        return self.container.input_path_map[filename]
1✔
1263

1264
    def _join_paths_default_directory(self) -> str:
1✔
1265
        return os.path.join(self.container.container_dir, "work")
×
1266

1267

1268
class InputStdLib(_StdLib):
1✔
1269
    # StdLib for evaluation of task inputs and command
1270
    def __init__(
1✔
1271
        self,
1272
        wdl_version: str,
1273
        logger: logging.Logger,
1274
        container: "TaskContainer",
1275
        eval_context: Optional[StdLib.EvalContext] = None,
1276
    ) -> None:
1277
        super().__init__(wdl_version, logger, container, True, eval_context=eval_context)
1✔
1278

1279

1280
class OutputStdLib(_StdLib):
1✔
1281
    # StdLib for evaluation of task outputs
1282
    def __init__(
1✔
1283
        self,
1284
        wdl_version: str,
1285
        logger: logging.Logger,
1286
        container: "TaskContainer",
1287
        eval_context: Optional[StdLib.EvalContext] = None,
1288
    ) -> None:
1289
        super().__init__(wdl_version, logger, container, False, eval_context=eval_context)
1✔
1290

1291
        setattr(
1✔
1292
            self,
1293
            "stdout",
1294
            StdLib.StaticFunction(
1295
                "stdout",
1296
                [],
1297
                Type.File(),
1298
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1299
            ),
1300
        )
1301
        setattr(
1✔
1302
            self,
1303
            "stderr",
1304
            StdLib.StaticFunction(
1305
                "stderr",
1306
                [],
1307
                Type.File(),
1308
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1309
            ),
1310
        )
1311

1312
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1313
            pat = pattern.coerce(Type.String()).value
1✔
1314
            if not pat:
1✔
1315
                raise OutputError("empty glob() pattern")
×
1316
            assert isinstance(pat, str)
1✔
1317
            if pat[0] == "/":
1✔
1318
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1319
            if pat.startswith("..") or "/.." in pat:
1✔
1320
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1321
            if pat.startswith("./"):
1✔
1322
                pat = pat[2:]
1✔
1323
            # glob the host directory
1324
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1325
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1326
            # convert the host filenames to in-container filenames
1327
            container_files = []
1✔
1328
            for hf in host_files:
1✔
1329
                dstrip = lib.container.host_work_dir()
1✔
1330
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1331
                assert hf.startswith(dstrip)
1✔
1332
                container_files.append(
1✔
1333
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1334
                )
1335
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1336

1337
        setattr(
1✔
1338
            self,
1339
            "glob",
1340
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1341
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc