• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 26351231487

24 May 2026 03:53AM UTC coverage: 95.712% (+0.09%) from 95.619%
26351231487

Pull #877

github

web-flow
Merge ecd0618a1 into 8443ffdca
Pull Request #877: [WDL 1.2] source-relative File/Directory declarations

167 of 169 new or added lines in 3 files covered. (98.82%)

31 existing lines in 3 files now uncovered.

8414 of 8791 relevant lines covered (95.71%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.96
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import (
1✔
15
    Tuple,
16
    List,
17
    Dict,
18
    Optional,
19
    Callable,
20
    Set,
21
    Any,
22
    Union,
23
    TYPE_CHECKING,
24
    Generator,
25
    Iterable,
26
)
27
from contextlib import ExitStack, suppress
1✔
28
from collections import Counter
1✔
29

30
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
31
from .._util import (
1✔
32
    WDLVersion,
33
    write_atomic,
34
    write_values_json,
35
    provision_run_dir,
36
    TerminationSignalFlag,
37
    chmod_R_plus,
38
    path_really_within,
39
    LoggingFileHandler,
40
    compose_coroutines,
41
    pathsize,
42
    link_force,
43
    symlink_force,
44
    rmtree_atomic,
45
    wdl_version_geq,
46
)
47
from .._util import StructuredLogMessage as _
1✔
48
from . import config, _statusbar
1✔
49
from .download import able as downloadable, run_cached as download
1✔
50
from .cache import CallCache, new as new_call_cache
1✔
51
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
52

53
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
54
    from .task_container import TaskContainer
×
55

56
TaskPluginCoroutine = Generator[Dict[str, Any], Dict[str, Any], None]
1✔
57

58

59
def run_local_task(  # type: ignore[return]
1✔
60
    cfg: config.Loader,
61
    task: Tree.Task,
62
    inputs: Env.Bindings[Value.Base],
63
    run_id: Optional[str] = None,
64
    run_dir: Optional[str] = None,
65
    logger_prefix: Optional[List[str]] = None,
66
    _run_id_stack: Optional[List[str]] = None,
67
    _cache: Optional[CallCache] = None,
68
    _plugins: Optional[List[Callable[..., Any]]] = None,
69
) -> Tuple[str, Env.Bindings[Value.Base]]:
70
    """
71
    Run a task locally.
72

73
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
74
    paths that can be mounted into a container.
75

76
    :param run_id: unique ID for the run, defaults to workflow name
77
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
78
                    (defaults to current working directory).
79
                    If the final path component is ".", then operate in run_dir directly.
80
    """
81
    from .task_container import new as new_task_container  # delay heavy import
1✔
82

83
    _run_id_stack = _run_id_stack or []
1✔
84
    run_id = run_id or task.name
1✔
85
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
86
    logger = logging.getLogger(".".join(logger_prefix))
1✔
87
    with ExitStack() as cleanup:
1✔
88
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
89
        if terminating():
1✔
90
            raise Terminated(quiet=True)
×
91

92
        # provision run directory and log file
93
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
94
        logfile = os.path.join(run_dir, "task.log")
1✔
95
        cleanup.enter_context(
1✔
96
            LoggingFileHandler(
97
                logger,
98
                logfile,
99
            )
100
        )
101
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
102
            cleanup.enter_context(
1✔
103
                LoggingFileHandler(
104
                    logger,
105
                    logfile + ".json",
106
                    json=True,
107
                )
108
            )
109
        logger.notice(
1✔
110
            _(
111
                "task setup",
112
                name=task.name,
113
                source=task.pos.uri,
114
                line=task.pos.line,
115
                column=task.pos.column,
116
                dir=run_dir,
117
                thread=threading.get_ident(),
118
            )
119
        )
120
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
121

122
        if not _run_id_stack:
1✔
123
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
124
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
125
        else:
126
            cache = _cache
1✔
127
        assert cache
1✔
128

129
        cleanup.enter_context(_statusbar.task_slotted())
1✔
130
        maybe_container = None
1✔
131
        try:
1✔
132
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
133
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
134
            if cached is not None:
1✔
135
                for decl in task.outputs:
1✔
136
                    v = cached[decl.name]
1✔
137
                    vj = json.dumps(v.json)
1✔
138
                    logger.info(
1✔
139
                        _(
140
                            "cached output",
141
                            name=decl.name,
142
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
143
                        )
144
                    )
145
                # create out/ and outputs.json
146
                _outputs = link_outputs(
1✔
147
                    cache,
148
                    cached,
149
                    run_dir,
150
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
151
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
152
                )
153
                write_values_json(
1✔
154
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
155
                )
156
                logger.notice("done (cached)")
1✔
157
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
158
                # cached downstream inputs
159
                return (run_dir, cached)
1✔
160
            # start plugin coroutines and process inputs through them
161
            with compose_coroutines(
1✔
162
                [
163
                    (
164
                        lambda kwargs, cor=cor: cor(  # type: ignore
165
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
166
                        )
167
                    )
168
                    for cor in (
169
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
170
                        + (_plugins or [])
171
                    )
172
                ],
173
                {"inputs": inputs},
174
            ) as plugins:
175
                recv = next(plugins)
1✔
176
                inputs = recv["inputs"]
1✔
177

178
                # download input files, if needed
179
                posix_inputs = _download_input_files(
1✔
180
                    cfg,
181
                    logger,
182
                    logger_prefix,
183
                    run_dir,
184
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
185
                    cache,
186
                )
187

188
                # create TaskContainer according to configuration
189
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
190
                maybe_container = container
1✔
191

192
                # evaluate input/postinput declarations, including mapping from host to
193
                # in-container file paths
194
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
195

196
                # evaluate runtime fields
197
                stdlib = InputStdLib(
1✔
198
                    task.effective_wdl_version,
199
                    logger,
200
                    container,
201
                    source_directory=_source_directory(task),
202
                )
203
                _eval_task_runtime(
1✔
204
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
205
                )
206
                if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
207
                    container.build_task_runtime_info_struct(logger, run_id, task)
1✔
208
                    assert container.task_runtime_info_struct is not None
1✔
209
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
210

211
                # start container & run command (and retry if needed)
212
                container = _try_task(
1✔
213
                    cfg,
214
                    task,
215
                    logger,
216
                    plugins,
217
                    container,
218
                    container_env,
219
                    terminating,
220
                )
221

222
                # bind output declarations to task runtime info with the final return code
223
                if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
224
                    assert container.try_counter >= 1
1✔
225
                    container.update_task_runtime_info_struct(
1✔
226
                        attempt=Value.Int(container.try_counter - 1),
227
                        return_code=(
228
                            Value.Int(container.last_exit_code)
229
                            if container.last_exit_code is not None
230
                            else Value.Null()
231
                        ),
232
                    )
233
                    assert container.task_runtime_info_struct is not None
1✔
234
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
235

236
                # evaluate output declarations
237
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
238

239
                # create output_links
240
                outputs = link_outputs(
1✔
241
                    cache,
242
                    outputs,
243
                    run_dir,
244
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
245
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
246
                )
247

248
                # process outputs through plugins
249
                recv = plugins.send({"outputs": outputs})
1✔
250
                outputs = recv["outputs"]
1✔
251

252
                # clean up, if so configured, and make sure output files will be accessible to
253
                # downstream tasks
254
                _delete_work(cfg, logger, container, True)
1✔
255
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
256
                _warn_output_basename_collisions(logger, outputs)
1✔
257

258
                # write outputs.json
259
                write_values_json(
1✔
260
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
261
                )
262
                logger.notice("done")
1✔
263
                if not run_id.startswith("download-"):
1✔
264
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
265
                return (run_dir, outputs)
1✔
266
        except Exception as exn:
1✔
267
            tbtxt = traceback.format_exc()
1✔
268
            logger.debug(tbtxt)
1✔
269
            wrapper = RunFailed(task, run_id, run_dir)
1✔
270
            logmsg = _(
1✔
271
                str(wrapper),
272
                dir=run_dir,
273
                **error_json(
274
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
275
                ),
276
            )
277
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
278
                logger.debug(logmsg)
1✔
279
            else:
280
                logger.error(logmsg)
1✔
281
            try:
1✔
282
                write_atomic(
1✔
283
                    json.dumps(
284
                        error_json(
285
                            wrapper,
286
                            cause=exn,
287
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
288
                        ),
289
                        indent=2,
290
                    ),
291
                    os.path.join(run_dir, "error.json"),
292
                )
293
            except Exception as exn2:
×
294
                logger.debug(traceback.format_exc())
×
295
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
296
            try:
1✔
297
                if maybe_container:
1✔
298
                    _delete_work(cfg, logger, maybe_container, False)
1✔
299
            except Exception as exn2:
×
300
                logger.debug(traceback.format_exc())
×
301
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
302
            raise wrapper from exn
1✔
303

304

305
def _download_input_files(
1✔
306
    cfg: config.Loader,
307
    logger: logging.Logger,
308
    logger_prefix: List[str],
309
    run_dir: str,
310
    inputs: Env.Bindings[Value.Base],
311
    cache: CallCache,
312
) -> Env.Bindings[Value.Base]:
313
    """
314
    Find all File & Directory input values that are downloadable URIs (including any nested within
315
    compound values). Download them to some location under run_dir and return a copy of the inputs
316
    with the URI values replaced by the downloaded paths.
317
    """
318

319
    downloads = 0
1✔
320
    download_bytes = 0
1✔
321
    cached_hits = 0
1✔
322

323
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
324
        nonlocal downloads, download_bytes, cached_hits
325
        directory = isinstance(v, Value.Directory)
1✔
326
        uri = v.value
1✔
327
        if downloadable(cfg, uri, directory=directory):
1✔
328
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
329
            cached, filename = download(
1✔
330
                cfg,
331
                logger,
332
                cache,
333
                uri,
334
                directory=directory,
335
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
336
                logger_prefix=logger_prefix + [f"download{downloads}"],
337
            )
338
            if cached:
1✔
339
                cached_hits += 1
1✔
340
            else:
341
                sz = pathsize(filename)
1✔
342
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
343
                downloads += 1
1✔
344
                download_bytes += sz
1✔
345
            return filename
1✔
346
        return uri
1✔
347

348
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
349
    if downloads or cached_hits:
1✔
350
        logger.notice(
1✔
351
            _(
352
                "processed input URIs",
353
                downloaded=downloads,
354
                downloaded_bytes=download_bytes,
355
                cached=cached_hits,
356
            )
357
        )
358
    return ans
1✔
359

360

361
def _add_downloadable_defaults(
1✔
362
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
363
) -> Env.Bindings[Value.Base]:
364
    """
365
    Look for available File/Directory inputs that default to a string constant appearing to be a
366
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
367
    not already overridden in them).
368

369
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
370
    declarations until after processing downloads.
371
    """
372
    ans = inputs
1✔
373
    for b in available_inputs:
1✔
374
        if (
1✔
375
            isinstance(b.value.type, (Type.File, Type.Directory))
376
            and b.name not in ans
377
            and isinstance(b.value.expr, Expr.String)
378
        ):
379
            directory = isinstance(b.value.type, Type.Directory)
1✔
380
            maybe_uri = b.value.expr.literal
1✔
381
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
382
                v = (
1✔
383
                    Value.Directory(maybe_uri.value, b.value.expr)
384
                    if directory
385
                    else Value.File(maybe_uri.value, b.value.expr)
386
                )
387
                ans = ans.bind(b.name, v)
1✔
388
    return ans
1✔
389

390

391
def _eval_task_inputs(
1✔
392
    logger: logging.Logger,
393
    task: Tree.Task,
394
    posix_inputs: Env.Bindings[Value.Base],
395
    container: "TaskContainer",
396
) -> Env.Bindings[Value.Base]:
397
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
398
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
399
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
400
    # supplied None to override any default.
401
    input_decls = task.available_inputs
1✔
402
    posix_inputs = posix_inputs.filter(
1✔
403
        lambda b: (
404
            not (
405
                isinstance(b.value, Value.Null)
406
                and b.name in input_decls
407
                and input_decls[b.name].expr
408
                and not input_decls[b.name].type.optional
409
            )
410
        )
411
    )
412

413
    # Map all the provided input File & Directory paths to in-container paths
414
    container.add_paths(_fspaths(posix_inputs))
1✔
415
    _warn_input_basename_collisions(logger, container)
1✔
416

417
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
418
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
419
        p = fn.value.rstrip("/")
1✔
420
        if isinstance(fn, Value.Directory):
1✔
421
            p += "/"
1✔
422
        return container.input_path_map[p]
1✔
423

424
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
425

426
    # initialize value environment with the inputs
427
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
428
    for b in container_inputs:
1✔
429
        assert isinstance(b, Env.Binding)
1✔
430
        v = b.value
1✔
431
        assert isinstance(v, Value.Base)
1✔
432
        container_env = container_env.bind(b.name, v)
1✔
433
        vj = json.dumps(v.json)
1✔
434
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
435

436
    # collect remaining declarations requiring evaluation.
437
    decls_to_eval = _task_decl_eval_order(
1✔
438
        decl
439
        for decl in (task.inputs or []) + task.postinputs
440
        if not container_env.has_binding(decl.name)
441
    )
442

443
    # evaluate each declaration in that order
444
    # note: the write_* functions call container.add_paths as a side-effect
445
    stdlib = InputStdLib(
1✔
446
        task.effective_wdl_version, logger, container, source_directory=_source_directory(task)
447
    )
448
    for decl in decls_to_eval:
1✔
449
        assert isinstance(decl, Tree.Decl)
1✔
450
        v = _eval_task_decl(
1✔
451
            logger,
452
            decl,
453
            container_env,
454
            stdlib,
455
            lambda value: _postprocess_task_decl_paths(
456
                decl,
457
                value,
458
                lambda w: _task_decl_path(task, decl.name, w, container),
459
                lambda name: Error.InputError(
460
                    f"File/Directory path not found in task declaration {name}"
461
                ),
462
            ),
463
        )
464
        vj = json.dumps(v.json)
1✔
465
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
466
        container_env = container_env.bind(decl.name, v)
1✔
467

468
    return container_env
1✔
469

470

471
def _task_decl_eval_order(decls: Iterable[Tree.Decl]) -> List[Tree.Decl]:
1✔
472
    """
473
    Topologically sort task declarations for evaluation.
474
    """
475
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(list(decls))
1✔
476
    ans = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
477
    # NOTE: _util.topsort() throws on cycles, but those should have been rejected in static
478
    # typechecking prior to this.
479
    assert len(decls_by_id) == len(ans)
1✔
480
    return ans
1✔
481

482

483
def _eval_task_decl(
1✔
484
    logger: logging.Logger,
485
    decl: Tree.Decl,
486
    env: Env.Bindings[Value.Base],
487
    stdlib: StdLib.Base,
488
    postprocess_paths: Callable[[Value.Base], Value.Base],
489
) -> Value.Base:
490
    """
491
    Evaluate one task declaration and apply File/Directory path rewriting logic (which differs
492
    between input/private and output declarations).
493
    """
494
    try:
1✔
495
        value = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type) if decl.expr else Value.Null()
1✔
496
        _warn_struct_extra(logger, decl.name, value)
1✔
497
        return postprocess_paths(value)
1✔
498
    except Error.RuntimeError as exn:
1✔
499
        setattr(exn, "job_id", decl.workflow_node_id)
1✔
500
        raise exn
1✔
501
    except Exception as exn:
×
502
        exn2 = Error.EvalError(decl, str(exn))
×
503
        setattr(exn2, "job_id", decl.workflow_node_id)
×
504
        raise exn2 from exn
×
505

506

507
def _postprocess_task_decl_paths(
1✔
508
    decl: Tree.Decl,
509
    value: Value.Base,
510
    missing_path: Callable[[Union[Value.File, Value.Directory]], Optional[str]],
511
    missing_error: Callable[[str], Error.RuntimeError],
512
) -> Value.Base:
513
    """
514
    Replace non-existent File/Directory paths with None (Value.Null), then coerce to the
515
    declaration type (which may raise if the declaration is non-optional).
516
    """
517
    value = Value.rewrite_paths(value, missing_path)
1✔
518
    try:
1✔
519
        return value.coerce(decl.type)
1✔
520
    except FileNotFoundError:  # from Value.Null.coerce(File|Directory)
1✔
521
        err = missing_error(decl.name)
1✔
522
        setattr(err, "job_id", decl.workflow_node_id)
1✔
523
        raise err
1✔
524

525

526
def _task_decl_path(
1✔
527
    task: Tree.Task,
528
    decl_name: str,
529
    v: Union[Value.File, Value.Directory],
530
    container: "TaskContainer",
531
) -> Optional[str]:
532
    """
533
    Resolve a task input/private declaration File/Directory path into the task container.
534

535
    Paths built from already-localized input Directories are checked against their host backing
536
    directories. Other relative paths are WDL 1.2 source-relative paths: resolve them under the WDL
537
    source directory, mount them into the task container, and return the container path.
538

539
    ``container`` is intentionally mutated when a source-relative path needs to be mounted; no other
540
    arguments are mutated.
541
    """
542
    ans = _task_decl_input_directory_child_path(decl_name, v, container)
1✔
543
    if ans is None or ans != v.value:
1✔
544
        return ans
1✔
545

546
    if not wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
547
        return v.value
1✔
548

549
    source_paths: Set[str] = set()
1✔
550
    ans = _resolve_source_relative_path(
1✔
551
        container.cfg, _source_directory(task), f"task declaration {decl_name}", v
552
    )
553
    if ans is None or ans == v.value:
1✔
554
        return ans
1✔
555

556
    source_paths.add(ans + ("/" if isinstance(v, Value.Directory) else ""))
1✔
557
    assert len(source_paths) == 1
1✔
558
    source_path = next(iter(source_paths))
1✔
559
    container.add_paths(source_paths)
1✔
560
    return container.input_path_map[source_path]
1✔
561

562

563
def _resolve_source_relative_path(
1✔
564
    cfg: config.Loader,
565
    source_directory: str,
566
    desc: str,
567
    v: Union[Value.File, Value.Directory],
568
) -> Optional[str]:
569
    """
570
    Resolve one File/Directory path against a WDL source directory when needed.
571

572
    ``source_directory`` is either "" or a local WDL source directory with trailing "/". Absolute
573
    paths and downloadable URIs are returned unchanged. Relative paths require ``source_directory``,
574
    are resolved with realpath, and must remain inside the source directory tree. Missing paths
575
    return None so callers can rewrite optional File?/Directory? values to Null before final type
576
    coercion.
577

578
    This scalar helper has no side effects.
579
    """
580
    isdir = isinstance(v, Value.Directory)
1✔
581
    if os.path.isabs(v.value) or downloadable(cfg, v.value, directory=isdir):
1✔
582
        return v.value
1✔
583

584
    if not source_directory:
1✔
585
        raise Error.InputError(
1✔
586
            "relative File/Directory path in "
587
            + desc
588
            + " requires a local WDL source file: "
589
            + v.value
590
        )
591

592
    ans = os.path.realpath(
1✔
593
        os.path.join(source_directory, v.value.rstrip("/") if isdir else v.value)
594
    )
595
    within = path_really_within(ans, source_directory)
1✔
596
    if within and not os.path.exists(ans):
1✔
597
        return None
1✔
598
    if within and not (os.path.isdir(ans) if isdir else os.path.isfile(ans)):
1✔
599
        kind = "Directory" if isdir else "File"
1✔
600
        expected = "directory" if isdir else "file"
1✔
601
        raise Error.InputError(f"{kind} path is not a {expected}: {v.value}")
1✔
602
    if not within:
1✔
603
        raise Error.InputError(
1✔
604
            "File/Directory path in "
605
            + desc
606
            + f" must reside within WDL source directory {source_directory}: "
607
            + v.value
608
        )
609

610
    return ans
1✔
611

612

613
def _resolve_source_relative_paths(
1✔
614
    cfg: config.Loader,
615
    source_directory: str,
616
    value: Value.Base,
617
    desired_type: Type.Base,
618
    desc: str,
619
) -> Tuple[Value.Base, Set[str]]:
620
    """
621
    Coerce a value to a path-containing type and resolve each File/Directory path within it.
622

623
    This recursively applies ``_resolve_source_relative_path`` to File/Directory leaves after
624
    coercing ``value`` to ``desired_type``. It also collects each newly resolved local source path
625
    in the returned set so callers can perform allowlist or container-mount side effects after
626
    validation succeeds. No arguments are mutated.
627
    """
628
    source_paths: Set[str] = set()
1✔
629
    value = value.coerce(desired_type)
1✔
630

631
    def rewrite_path(v: Union[Value.File, Value.Directory]) -> Optional[str]:
1✔
632
        ans = _resolve_source_relative_path(cfg, source_directory, desc, v)
1✔
633
        if ans is None:
1✔
634
            return None
1✔
635
        if ans != v.value:
1✔
636
            source_paths.add(ans + ("/" if isinstance(v, Value.Directory) else ""))
1✔
637
        return ans
1✔
638

639
    value = Value.rewrite_paths(
1✔
640
        value,
641
        rewrite_path,
642
    )
643
    try:
1✔
644
        return value.coerce(desired_type), source_paths
1✔
645
    except FileNotFoundError:
1✔
646
        raise Error.InputError(f"File/Directory path not found in {desc}") from None
1✔
647

648

649
def _source_directory(node: Tree.SourceNode) -> str:
1✔
650
    """
651
    Return the local directory containing a WDL source node, with trailing "/", or "".
652

653
    Source-relative File/Directory declarations need an explicit local source directory. Parsed
654
    buffers and other non-local source locations are represented as an empty string so callers can
655
    produce a declaration-specific error only when a relative path actually needs resolution.
656
    """
657
    source = node.pos.abspath
1✔
658
    if not source or source == "(buffer)" or not os.path.isabs(source):
1✔
659
        return ""
1✔
660
    return os.path.join(os.path.realpath(os.path.dirname(source)), "")
1✔
661

662

663
def _task_decl_input_directory_child_path(
1✔
664
    decl_name: str, v: Union[Value.File, Value.Directory], container: "TaskContainer"
665
) -> Optional[str]:
666
    """
667
    Check a task File/Directory path formulated by path logic from another input Directory.
668

669
    input {
670
        Directory d
671
    }
672
    File f = join_paths(d, "file.txt")
673
    Directory? maybe = join_paths(d, "maybe/")
674

675
    Existing children must match the declared File/Directory kind. Directory children are returned
676
    with the trailing slash expected by TaskContainer. Missing children return None so optional
677
    declarations can become Null.
678
    """
679
    isdir = isinstance(v, Value.Directory)
1✔
680
    container_path = v.value.rstrip("/") + ("/" if isdir else "")
1✔
681
    found_input, host_path = container._input_host_path(container_path)
1✔
682
    if not found_input:
1✔
683
        return v.value
1✔
684
    assert host_path is not None
1✔
685
    if not os.path.exists(host_path.rstrip("/")):
1✔
686
        return None  # induces to Value.Null()
1✔
687
    if os.path.isdir(host_path) if isdir else os.path.isfile(host_path):
1✔
688
        return container_path if isdir else v.value
1✔
UNCOV
689
    raise Error.InputError(
×
690
        f"task declaration {decl_name} uses file/directory with the wrong type: " + v.value
691
    )
692

693

694
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
695
    """
696
    Get the unique paths of all File & Directory values in the environment. Directory paths will
697
    have a trailing '/'.
698
    """
699
    ans = set()
1✔
700

701
    def collector(v: Value.Base) -> None:
1✔
702
        if isinstance(v, Value.File):
1✔
703
            assert not v.value.endswith("/")
1✔
704
            ans.add(v.value)
1✔
705
        elif isinstance(v, Value.Directory):
1✔
706
            ans.add(v.value.rstrip("/") + "/")
1✔
707
        for ch in v.children:
1✔
708
            collector(ch)
1✔
709

710
    for b in env:
1✔
711
        collector(b.value)
1✔
712
    return ans
1✔
713

714

715
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
716
    basenames = Counter(
1✔
717
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
718
    )
719
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
720
    if collisions:
1✔
721
        logger.warning(
1✔
722
            _(
723
                "mounting input files with colliding basenames in separate container directories",
724
                basenames=collisions,
725
            )
726
        )
727

728

729
def _eval_task_runtime(
1✔
730
    cfg: config.Loader,
731
    logger: logging.Logger,
732
    run_id: str,
733
    task: Tree.Task,
734
    inputs: Env.Bindings[Value.Base],
735
    container: "TaskContainer",
736
    env: Env.Bindings[Value.Base],
737
    stdlib: StdLib.Base,
738
) -> None:
739
    # evaluate runtime{} expressions (merged with any configured defaults)
740
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
741
    if run_id.startswith("download-"):
1✔
742
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
743
    runtime_values = {}
1✔
744
    for key, v in runtime_defaults.items():
1✔
745
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
746
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
747
        runtime_values[key] = expr.eval(env, stdlib)
1✔
748
    for b in inputs.enter_namespace("runtime"):
1✔
749
        runtime_values[b.name] = b.value  # input overrides
1✔
750
    for b in inputs.enter_namespace("requirements"):
1✔
751
        runtime_values[b.name] = b.value
1✔
752
    if "return_codes" in runtime_values and wdl_version_geq(
1✔
753
        task.effective_wdl_version, WDLVersion.V1_2
754
    ):
755
        runtime_values["returnCodes"] = runtime_values.pop("return_codes")
1✔
756
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
757

758
    # have container implementation validate & postprocess into container.runtime_values
759
    container.process_runtime(logger, runtime_values)
1✔
760

761
    if container.runtime_values:
1✔
762
        logger.info(_("effective runtime", **container.runtime_values))
1✔
763

764
    # add any configured overrides for in-container environment variables
765
    container.runtime_values.setdefault("env", {})
1✔
766
    env_vars_override = {}
1✔
767
    env_vars_skipped = []
1✔
768
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
769
        if ev_value is None:
1✔
770
            try:
1✔
771
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
772
            except KeyError:
1✔
773
                env_vars_skipped.append(ev_name)
1✔
774
        else:
775
            env_vars_override[ev_name] = str(ev_value)
1✔
776
    if env_vars_skipped:
1✔
777
        logger.warning(
1✔
778
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
779
        )
780
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
781
        "file_io", "mount_tmpdir_for"
782
    ):
783
        env_vars_override["TMPDIR"] = os.path.join(
1✔
784
            container.container_dir, "work", "_miniwdl_tmpdir"
785
        )
786
    if env_vars_override:
1✔
787
        # usually don't dump values into log, as they may often be auth tokens
788
        logger.notice(
1✔
789
            _(
790
                "overriding environment variables (portability warning)",
791
                names=list(env_vars_override.keys()),
792
            )
793
        )
794
        logger.debug(
1✔
795
            _("overriding environment variables (portability warning)", **env_vars_override)
796
        )
797
        container.runtime_values["env"].update(env_vars_override)
1✔
798

799
    # process decls with "env" decorator
800
    env_decls: Dict[str, Value.Base] = {}
1✔
801
    for decl in (task.inputs or []) + task.postinputs:
1✔
802
        if decl.decor.get("env", False) is True:
1✔
803
            v = env[decl.name]
1✔
804
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
805
                v = v.value
1✔
806
            else:
807
                v = json.dumps(v.json)
1✔
808
            env_decls[decl.name] = v
1✔
809
    container.runtime_values["env"].update(env_decls)
1✔
810

811
    unused_keys = list(
1✔
812
        key
813
        for key in runtime_values
814
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
815
    )
816
    if unused_keys:
1✔
817
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
818

819

820
def _try_task(
1✔
821
    cfg: config.Loader,
822
    task: Tree.Task,
823
    logger: logging.Logger,
824
    plugins: TaskPluginCoroutine,
825
    container: "TaskContainer",
826
    container_env: Env.Bindings[Value.Base],
827
    terminating: Callable[[], bool],
828
) -> "TaskContainer":
829
    """
830
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
831
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
832
    """
833
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
834

835
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
836
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
837
    retries = 0
1✔
838
    interruptions = 0
1✔
839

840
    command = None
1✔
841
    plugin_changed_command = False
1✔
842
    assert isinstance(task.command, Expr.TaskCommand)
1✔
843
    command_uses_task_attempt = _task_command_uses_task_attempt(task.command)
1✔
844

UNCOV
845
    while True:
×
846
        if terminating():
1✔
847
            raise Terminated()
1✔
848

849
        if command is None or command_uses_task_attempt:
1✔
850
            command = _eval_task_command(
1✔
851
                cfg,
852
                task,
853
                logger,
854
                container,
855
                container_env,
856
                attempt=container.try_counter - 1,
857
            )
858
            if container.try_counter == 1:
1✔
859
                assert retries == 0 and interruptions == 0 and not plugin_changed_command
1✔
860
                # let plugin(s) process command & container
861
                recv = plugins.send({"command": command, "container": container})
1✔
862
                plugin_command, container = (recv[k] for k in ("command", "container"))
1✔
863
                if plugin_command != command:
1✔
864
                    plugin_changed_command = True
1✔
865
                    command = plugin_command
1✔
866
        assert isinstance(command, str)
1✔
867
        logger.debug(_("command", command=command.strip()))
1✔
868

869
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
870
            "file_io", "copy_input_files_for"
871
        ):
872
            # must follow command interpolation, which can add new input files via write_*
873
            container.copy_input_files(logger)
1✔
874
        host_tmpdir = (
1✔
875
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
876
            if cfg.get_bool("file_io", "mount_tmpdir")
877
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
878
            else None
879
        )
880

881
        try:
1✔
882
            # start container & run command
883
            if host_tmpdir:
1✔
884
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
885
                os.mkdir(host_tmpdir, mode=0o770)
1✔
886
            try:
1✔
887
                container.run(logger, command)
1✔
888
                return container
1✔
889
            finally:
890
                if host_tmpdir:
1✔
891
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
892
                    rmtree_atomic(host_tmpdir)
1✔
893
                if (
1✔
894
                    "preemptible" in container.runtime_values
895
                    and cfg.has_option("task_runtime", "_mock_interruptions")
896
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
897
                ):
898
                    raise Interrupted("mock interruption") from None
1✔
899
        except Exception as exn:
1✔
900
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
901
                logger.error(
1✔
902
                    _(
903
                        "interrupted task will be retried",
904
                        error=exn.__class__.__name__,
905
                        message=str(exn),
906
                        prev_interruptions=interruptions,
907
                        max_interruptions=max_interruptions,
908
                    )
909
                )
910
                interruptions += 1
1✔
911
            elif (
1✔
912
                not isinstance(exn, (Terminated, DockerBuildError))
913
                and retries < max_retries
914
                and not terminating()
915
            ):
916
                logger.error(
1✔
917
                    _(
918
                        "failed task will be retried",
919
                        error=exn.__class__.__name__,
920
                        message=str(exn),
921
                        prev_retries=retries,
922
                        max_retries=max_retries,
923
                    )
924
                )
925
                retries += 1
1✔
926
            else:
927
                raise
1✔
928
            if command_uses_task_attempt and plugin_changed_command:
1✔
929
                # Our plugin API, designed well before the addition of `task.attempt` in WDL 1.2,
930
                # doesn't allow for reprocessing the command after a retry; to be safe, we fail if
931
                # the command uses `task.attempt` and the plugin changed the (first-try) command.
932
                raise Error.RuntimeError(
1✔
933
                    "task command uses task.attempt, but a task plugin changed the command; "
934
                    "cannot retry with an updated task.attempt value"
935
                ) from exn
936
            _delete_work(cfg, logger, container, False)
1✔
937
            container.reset(logger)
1✔
938

939

940
def _eval_task_command(
1✔
941
    cfg: config.Loader,
942
    task: Tree.Task,
943
    logger: logging.Logger,
944
    container: "TaskContainer",
945
    container_env: Env.Bindings[Value.Base],
946
    attempt: int,
947
) -> str:
948
    """
949
    Evaluate the task command expression. In WDL 1.2, this may occur multiple times if retrying and
950
    the command uses `task.attempt`.
951
    """
952
    assert attempt >= 0
1✔
953
    command_env = container_env
1✔
954
    if wdl_version_geq(task.effective_wdl_version, WDLVersion.V1_2):
1✔
955
        container.update_task_runtime_info_struct(
1✔
956
            attempt=Value.Int(attempt),
957
            return_code=Value.Null(),
958
        )
959
        assert container.task_runtime_info_struct is not None
1✔
960
        command_env = command_env.bind("task", container.task_runtime_info_struct)
1✔
961
    old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
962
    # pylint: disable=E1101
963
    placeholder_re = regex.compile(cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX)
1✔
964
    command_stdlib = InputStdLib(
1✔
965
        task.effective_wdl_version,
966
        logger,
967
        container,
968
        source_directory=_source_directory(task),
969
        eval_context=StdLib.EvalContext(placeholder_regex=placeholder_re),
970
    )
971
    assert isinstance(task.command, Expr.TaskCommand)
1✔
972
    ans = task.command.eval(command_env, command_stdlib, dedent=not old_command_dedent).value
1✔
973
    if old_command_dedent:  # see issue #674
1✔
974
        ans = _util.strip_leading_whitespace(ans)[1]
1✔
975
    return ans
1✔
976

977

978
def _task_command_uses_task_attempt(command: Expr.TaskCommand) -> bool:
1✔
979
    """
980
    Test whether the command uses WDL 1.2's `task.attempt` (which necessitates re-evaluating the
981
    command on retry).
982
    """
983
    exprs = [part.expr for part in command.parts if isinstance(part, Expr.Placeholder)]
1✔
984
    while exprs:
1✔
985
        expr = exprs.pop()
1✔
986
        if (
1✔
987
            isinstance(expr, Expr.Get)
988
            and expr.member == "attempt"
989
            and isinstance(expr.expr, Expr.Get)
990
            and expr.expr.member is None
991
            and isinstance(expr.expr.expr, Expr.Ident)
992
            and expr.expr.expr.name == "task"
993
        ) or (isinstance(expr, Expr.Ident) and expr.name == "task.attempt"):
994
            return True
1✔
995
        exprs.extend(child for child in expr.children if isinstance(child, Expr.Base))
1✔
996
    return False
1✔
997

998

999
def _eval_task_outputs(
1✔
1000
    logger: logging.Logger,
1001
    run_id: str,
1002
    task: Tree.Task,
1003
    env: Env.Bindings[Value.Base],
1004
    container: "TaskContainer",
1005
) -> Env.Bindings[Value.Base]:
1006
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
1007
    with suppress(FileNotFoundError):
1✔
1008
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
1009
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
1010
            # courtesy log message directing user where to find it
1011
            stdout_used = False
1✔
1012
            expr_stack = [outp.expr for outp in task.outputs]
1✔
1013
            while expr_stack:
1✔
1014
                expr = expr_stack.pop()
1✔
1015
                assert isinstance(expr, Expr.Base)
1✔
1016
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
1017
                    stdout_used = True
1✔
1018
                else:
1019
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
1020
            if not stdout_used:
1✔
1021
                logger.info(
1✔
1022
                    _(
1023
                        "command stdout unused; consider output `File cmd_out = stdout()`"
1024
                        " or redirect command to stderr log >&2",
1025
                        stdout_file=stdout_file,
1026
                    )
1027
                )
1028

1029
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
1030
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
1031

1032
    # evaluate output declarations in dependency order
1033
    for decl in _task_decl_eval_order(task.outputs):
1✔
1034
        assert decl.expr
1✔
1035
        # evaluate and check existence of in-container File/Directory output paths (tolerating
1036
        # non-existence for optional outputs); bind to env for subsequent decls
1037
        v = _eval_task_decl(
1✔
1038
            logger,
1039
            decl,
1040
            env,
1041
            stdlib,
1042
            lambda value: _postprocess_task_output_decl_paths(logger, decl, value, container),
1043
        )
1044
        env = env.bind(decl.name, v)
1✔
1045

1046
        # rewrite in-container File/Directory paths to host paths, bind in outputs env
1047
        try:
1✔
1048
            v = Value.rewrite_paths(
1✔
1049
                v, lambda w: _task_output_host_path(logger, decl.name, w, container)
1050
            )
1051
        except Error.RuntimeError as exn:
1✔
1052
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
1053
            raise exn
1✔
1054
        outputs = outputs.bind(decl.name, v)
1✔
1055

1056
    return outputs
1✔
1057

1058

1059
def _postprocess_task_output_decl_paths(
1✔
1060
    logger: logging.Logger,
1061
    decl: Tree.Decl,
1062
    value: Value.Base,
1063
    container: "TaskContainer",
1064
) -> Value.Base:
1065
    """
1066
    Log a task output value, then resolve missing File/Directory paths.
1067
    """
1068
    vj = json.dumps(value.json)
1✔
1069
    logger.info(
1✔
1070
        _("output", name=decl.name, value=(value.json if len(vj) < 4096 else "(((large)))"))
1071
    )
1072
    return _postprocess_task_decl_paths(
1✔
1073
        decl,
1074
        value,
1075
        lambda v: _task_output_missing_path(v, container),
1076
        lambda name: OutputError("File/Directory path not found in task output " + name),
1077
    )
1078

1079

1080
def _task_output_missing_path(
1✔
1081
    v: Union[Value.File, Value.Directory],
1082
    container: "TaskContainer",
1083
) -> Optional[str]:
1084
    """
1085
    Return None for a task output File/Directory path missing from the container.
1086
    """
1087
    container_path = v.value
1✔
1088
    if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
1089
        container_path += "/"
1✔
1090
    if container.host_path(container_path) is None:
1✔
1091
        return None
1✔
1092
    return v.value
1✔
1093

1094

1095
def _task_output_host_path(
1✔
1096
    logger: logging.Logger,
1097
    output_name: str,
1098
    v: Union[Value.File, Value.Directory],
1099
    container: "TaskContainer",
1100
) -> Optional[str]:
1101
    """
1102
    Rewrite an existing task output File/Directory path from container path to host path.
1103
    """
1104
    container_path = v.value
1✔
1105
    if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
1106
        container_path += "/"
1✔
1107
    host_path = container.host_path(container_path)
1✔
1108
    assert host_path is not None
1✔
1109
    if isinstance(v, Value.Directory):
1✔
1110
        if host_path.endswith("/"):
1✔
1111
            host_path = host_path[:-1]
1✔
1112
        _check_directory(host_path, output_name)
1✔
1113
        logger.debug(_("output dir", container=container_path, host=host_path))
1✔
1114
    else:
1115
        logger.debug(_("output file", container=container_path, host=host_path))
1✔
1116
    return host_path
1✔
1117

1118

1119
def _check_directory(host_path: str, output_name: str) -> None:
1✔
1120
    """
1121
    traverse output directory to check that all symlinks are relative & resolve inside the dir
1122
    """
1123

1124
    def raiser(exc: OSError):
1✔
UNCOV
1125
        raise exc
×
1126

1127
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
1128
        for fn in files:
1✔
1129
            fn = os.path.join(root, fn)
1✔
1130
            if os.path.islink(fn) and (
1✔
1131
                not os.path.exists(fn)
1132
                or os.path.isabs(os.readlink(fn))
1133
                or not path_really_within(fn, host_path)
1134
            ):
1135
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
1136

1137

1138
def _warn_struct_extra(
1✔
1139
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
1140
) -> None:
1141
    """
1142
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
1143
    """
1144
    if warned_keys is None:
1✔
1145
        warned_keys = set()
1✔
1146
    if isinstance(v, Value.Struct) and v.extra:
1✔
1147
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
1148
        if extra_keys - warned_keys:
1✔
1149
            logger.notice(
1✔
1150
                _(
1151
                    "extraneous keys in struct initializer",
1152
                    decl=decl_name,
1153
                    struct=str(v.type),
1154
                    extra_keys=list(extra_keys),
1155
                )
1156
            )
1157
            warned_keys.update(extra_keys)
1✔
1158
    for ch in v.children:
1✔
1159
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
1160

1161

1162
def link_outputs(
1✔
1163
    cache: CallCache,
1164
    outputs: Env.Bindings[Value.Base],
1165
    run_dir: str,
1166
    hardlinks: bool = False,
1167
    use_relative_output_paths: bool = False,
1168
) -> Env.Bindings[Value.Base]:
1169
    """
1170
    Following a successful run, the output files may be scattered throughout a complex directory
1171
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
1172
    containing nicely organized symlinks to the output files, and rewrite File values in the
1173
    outputs env to use these symlinks.
1174
    """
1175

1176
    def link1(target: str, link: str, directory: bool) -> None:
1✔
1177
        if hardlinks:
1✔
1178
            # TODO: what if target is an input from a different filesystem?
1179
            if directory:
1✔
1180
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
1181
            else:
1182
                link_force(target, link)
1✔
1183
        else:
1184
            symlink_force(target, link)
1✔
1185

1186
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
1187
        if isinstance(v, (Value.File, Value.Directory)):
1✔
1188
            target = (
1✔
1189
                v.value
1190
                if os.path.exists(v.value)
1191
                else cache.get_download(v.value, isinstance(v, Value.Directory))
1192
            )
1193
            if target:
1✔
1194
                target = os.path.realpath(target)
1✔
1195
                assert os.path.exists(target)
1✔
1196
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
1197
                    # make symlink relative
1198
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
1199
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
1200
                os.makedirs(dn, exist_ok=False)
1✔
1201
                link1(target, link, isinstance(v, Value.Directory))
1✔
1202
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
1203
                # directory without reference to the output types or JSON for whatever reason. It
1204
                # might otherwise have trouble distinguishing Directory outputs among the
1205
                # structured subdirectories we create for compound types.
1206
                if isinstance(v, Value.Directory):
1✔
1207
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
1208
                        pass
1✔
1209
                v.value = link
1✔
1210
        # recurse into compound values
1211
        elif isinstance(v, Value.Array) and v.value:
1✔
1212
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
1213
            for i in range(len(v.value)):
1✔
1214
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
1215
        elif isinstance(v, Value.Map) and v.value:
1✔
1216
            # create a subdirectory for each key, as long as the key names seem to make reasonable
1217
            # path components; otherwise, treat the dict as a list of its values
1218
            keys_ok = (
1✔
1219
                sum(
1220
                    1
1221
                    for b in v.value
1222
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
1223
                    is None
1224
                )
1225
                == 0
1226
            )
1227
            d = int(math.ceil(math.log10(len(v.value))))
1✔
1228
            for i, b in enumerate(v.value):
1✔
1229
                v.value[i] = (
1✔
1230
                    b[0],
1231
                    map_paths(
1232
                        b[1],
1233
                        os.path.join(
1234
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
1235
                        ),
1236
                    ),
1237
                )
1238
        elif isinstance(v, Value.Pair):
1✔
1239
            v.value = (
1✔
1240
                map_paths(v.value[0], os.path.join(dn, "left")),
1241
                map_paths(v.value[1], os.path.join(dn, "right")),
1242
            )
1243
        elif isinstance(v, Value.Struct):
1✔
1244
            for key in v.value:
1✔
1245
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
1246
        return v
1✔
1247

1248
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
1249

1250
    if use_relative_output_paths:
1✔
1251
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
1252

1253
    return outputs.map(
1✔
1254
        lambda binding: Env.Binding(
1255
            binding.name,
1256
            map_paths(
1257
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
1258
                os.path.join(run_dir, "out", binding.name),
1259
            ),
1260
        )
1261
    )
1262

1263

1264
def link_outputs_relative(
1✔
1265
    link1: Callable[[str, str, bool], None],
1266
    cache: CallCache,
1267
    outputs: Env.Bindings[Value.Base],
1268
    run_dir: str,
1269
    hardlinks: bool = False,
1270
) -> Env.Bindings[Value.Base]:
1271
    """
1272
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
1273
    the generated files' paths relative to their task working directory.
1274
    """
1275
    link_destinations: Dict[str, str] = dict()
1✔
1276

1277
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
1278
        target = (
1✔
1279
            v.value
1280
            if os.path.exists(v.value)
1281
            else cache.get_download(v.value, isinstance(v, Value.Directory))
1282
        )
1283
        if target:
1✔
1284
            real_target = os.path.realpath(target)
1✔
1285
            rel_link = None
1✔
1286
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
1287
                # target was generated by current task; use its path relative to the task work dir
1288
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
1289
                    rel_link = os.path.relpath(
1✔
1290
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
1291
                    )
1292
            else:
1293
                # target is an out/ link generated by a call in the current workflow OR a cached
1294
                # run; use the link's path relative to that out/ dir, which by induction should
1295
                # equal its path relative to the original work/ dir.
1296
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
1297
                # user's cwd or the task-generated relative path might coincidentally have
1298
                # something named 'out'.
1299
                p = None
1✔
1300
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
1301
                    if p and (
1✔
1302
                        os.path.isfile(os.path.join(target[:p], "task.log"))
1303
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
1304
                    ):
1305
                        break
1✔
1306
                    p = None
1✔
1307
                if p and p + 5 < len(target):
1✔
1308
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
1309
            # if neither of the above cases applies, then fall back to just the target basename
1310
            rel_link = rel_link or os.path.basename(target)
1✔
1311
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
1312
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
1313
                raise FileExistsError(
1✔
1314
                    "Output filename collision; to allow this, set"
1315
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
1316
                )
1317
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
1318
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
1319
            link_destinations[abs_link] = real_target
1✔
1320
            return abs_link
1✔
UNCOV
1321
        return v.value
×
1322

1323
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
1324

1325

1326
def _warn_output_basename_collisions(
1✔
1327
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
1328
) -> None:
1329
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
1330

1331
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
1332
        target = v.value
1✔
1333
        if os.path.exists(target):
1✔
1334
            target = os.path.realpath(target)
1✔
1335
        basename = os.path.basename(target)
1✔
1336
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
1337
        return v.value
1✔
1338

1339
    Value.rewrite_env_paths(outputs, walker)
1✔
1340

1341
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
1342
    if collisions:
1✔
1343
        logger.warning(
1✔
1344
            _(
1345
                "multiple output files share the same basename; while miniwdl supports this,"
1346
                " consider modifying WDL to ensure distinct output basenames",
1347
                basenames=collisions,
1348
            )
1349
        )
1350

1351

1352
def _delete_work(
1✔
1353
    cfg: config.Loader,
1354
    logger: logging.Logger,
1355
    container: "Optional[TaskContainer]",
1356
    success: bool,
1357
) -> None:
1358
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1359
    if container and (
1✔
1360
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1361
    ):
1362
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1363
            logger.warning(
1✔
1364
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1365
            )
1366
            return
1✔
1367
        container.delete_work(logger, delete_streams=not success)
1✔
1368

1369

1370
class _StdLib(StdLib.Base):
1✔
1371
    logger: logging.Logger
1✔
1372
    container: "TaskContainer"
1✔
1373
    inputs_only: bool  # if True then only permit access to input files
1✔
1374
    source_directory: str
1✔
1375

1376
    def __init__(
1✔
1377
        self,
1378
        wdl_version: str,
1379
        logger: logging.Logger,
1380
        container: "TaskContainer",
1381
        inputs_only: bool,
1382
        source_directory: str = "",
1383
        eval_context: Optional[StdLib.EvalContext] = None,
1384
    ) -> None:
1385
        super().__init__(
1✔
1386
            wdl_version,
1387
            write_dir=os.path.join(container.host_dir, "write_"),
1388
            eval_context=eval_context,
1389
        )
1390
        self.logger = logger
1✔
1391
        self.container = container
1✔
1392
        self.inputs_only = inputs_only
1✔
1393
        self.source_directory = source_directory
1✔
1394

1395
    def _source_relative_host_path(self, filename: str, directory: bool, desc: str) -> str:
1✔
1396
        value = Value.Directory(filename) if directory else Value.File(filename)
1✔
1397
        ans = _resolve_source_relative_path(self.container.cfg, self.source_directory, desc, value)
1✔
1398
        if ans is None:
1✔
1399
            raise Error.InputError(f"File/Directory path not found in {desc}: {filename}")
1✔
1400
        return ans
1✔
1401

1402
    def _devirtualize_filename(self, filename: str, directory: bool = False) -> str:
1✔
1403
        """
1404
        Return the host path for task StdLib direct file access.
1405

1406
        Input/private evaluation may read WDL 1.2 source-relative paths directly from the host
1407
        source directory. Output evaluation keeps existing task-output semantics and resolves paths
1408
        only through the execution directory or already-localized inputs.
1409
        """
1410
        # check allowability of reading this file, & map from in-container to host
1411
        container_filename = filename.rstrip("/") + ("/" if directory else "")
1✔
1412
        ans = self.container.host_path(container_filename, inputs_only=self.inputs_only)
1✔
1413
        if (
1✔
1414
            ans is None
1415
            and self.inputs_only
1416
            and wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
1417
            and not os.path.isabs(filename)
1418
            and not downloadable(self.container.cfg, filename, directory=directory)
1419
        ):
1420
            ans = self._source_relative_host_path(filename, directory, "read_*() argument")
1✔
1421
        if ans is None:
1✔
NEW
1422
            raise OutputError("function was passed non-existent file " + filename)
×
1423
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1424
        return ans
1✔
1425

1426
    def _resolve_source_relative_path(self, filename: str, directory: bool = False) -> str:
1✔
1427
        """
1428
        Resolve a WDL 1.2 source-relative File/Directory StdLib/operator value for a task.
1429

1430
        This is used during input/private evaluation, where source-relative paths are mounted into
1431
        the task container and returned as in-container paths. ``container`` is intentionally
1432
        mutated when a new source-relative path must be mounted.
1433
        """
1434
        if (
1✔
1435
            not self.inputs_only
1436
            or not wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
1437
            or os.path.isabs(filename)
1438
            or downloadable(self.container.cfg, filename, directory=directory)
1439
        ):
1440
            return filename
1✔
1441
        source_path = self._source_relative_host_path(
1✔
1442
            filename, directory, "File/Directory StdLib argument"
1443
        )
1444
        source_path_key = source_path + ("/" if directory else "")
1✔
1445
        self.container.add_paths([source_path_key])
1✔
1446
        return self.container.input_path_map[source_path_key].rstrip("/")
1✔
1447

1448
    def _virtualize_filename(self, filename: str) -> str:
1✔
1449
        # register new file with container input_path_map
1450
        self.container.add_paths([filename])
1✔
1451
        self.logger.debug(
1✔
1452
            _("write_", host=filename, container=self.container.input_path_map[filename])
1453
        )
1454
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1455
        return self.container.input_path_map[filename]
1✔
1456

1457
    def _join_paths_default_directory(self) -> str:
1✔
NEW
1458
        return os.path.join(self.container.container_dir, "work")
×
1459

1460

1461
class InputStdLib(_StdLib):
1✔
1462
    # StdLib for evaluation of task inputs and command
1463
    def __init__(
1✔
1464
        self,
1465
        wdl_version: str,
1466
        logger: logging.Logger,
1467
        container: "TaskContainer",
1468
        source_directory: str = "",
1469
        eval_context: Optional[StdLib.EvalContext] = None,
1470
    ) -> None:
1471
        super().__init__(
1✔
1472
            wdl_version,
1473
            logger,
1474
            container,
1475
            True,
1476
            source_directory=source_directory,
1477
            eval_context=eval_context,
1478
        )
1479

1480

1481
class OutputStdLib(_StdLib):
1✔
1482
    # StdLib for evaluation of task outputs
1483
    def __init__(
1✔
1484
        self,
1485
        wdl_version: str,
1486
        logger: logging.Logger,
1487
        container: "TaskContainer",
1488
        eval_context: Optional[StdLib.EvalContext] = None,
1489
    ) -> None:
1490
        super().__init__(wdl_version, logger, container, False, eval_context=eval_context)
1✔
1491

1492
        setattr(
1✔
1493
            self,
1494
            "stdout",
1495
            StdLib.StaticFunction(
1496
                "stdout",
1497
                [],
1498
                Type.File(),
1499
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1500
            ),
1501
        )
1502
        setattr(
1✔
1503
            self,
1504
            "stderr",
1505
            StdLib.StaticFunction(
1506
                "stderr",
1507
                [],
1508
                Type.File(),
1509
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1510
            ),
1511
        )
1512

1513
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1514
            pat = pattern.coerce(Type.String()).value
1✔
1515
            if not pat:
1✔
UNCOV
1516
                raise OutputError("empty glob() pattern")
×
1517
            assert isinstance(pat, str)
1✔
1518
            if pat[0] == "/":
1✔
1519
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1520
            if pat.startswith("..") or "/.." in pat:
1✔
1521
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1522
            if pat.startswith("./"):
1✔
1523
                pat = pat[2:]
1✔
1524
            # glob the host directory
1525
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1526
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1527
            # convert the host filenames to in-container filenames
1528
            container_files = []
1✔
1529
            for hf in host_files:
1✔
1530
                dstrip = lib.container.host_work_dir()
1✔
1531
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1532
                assert hf.startswith(dstrip)
1✔
1533
                container_files.append(
1✔
1534
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1535
                )
1536
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1537

1538
        setattr(
1✔
1539
            self,
1540
            "glob",
1541
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1542
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc