• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 25303939873

04 May 2026 06:11AM UTC coverage: 95.285% (+0.04%) from 95.248%
25303939873

Pull #828

github

web-flow
Merge 8e3f59da1 into cf8a4b71f
Pull Request #828: WDL 1.2 standard library additions

267 of 279 new or added lines in 2 files covered. (95.7%)

3 existing lines in 3 files now uncovered.

7882 of 8272 relevant lines covered (95.29%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.72
/WDL/runtime/task.py
1
"""
2
Local task runner
3
"""
4

5
import logging
1✔
6
import math
1✔
7
import os
1✔
8
import json
1✔
9
import traceback
1✔
10
import glob
1✔
11
import threading
1✔
12
import shutil
1✔
13
import regex
1✔
14
from typing import Tuple, List, Dict, Optional, Callable, Set, Any, Union, TYPE_CHECKING
1✔
15
from contextlib import ExitStack, suppress
1✔
16
from collections import Counter
1✔
17

18
from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util
1✔
19
from .._util import (
1✔
20
    write_atomic,
21
    write_values_json,
22
    provision_run_dir,
23
    TerminationSignalFlag,
24
    chmod_R_plus,
25
    path_really_within,
26
    LoggingFileHandler,
27
    compose_coroutines,
28
    pathsize,
29
    link_force,
30
    symlink_force,
31
    rmtree_atomic,
32
)
33
from .._util import StructuredLogMessage as _
1✔
34
from . import config, _statusbar
1✔
35
from .download import able as downloadable, run_cached as download
1✔
36
from .cache import CallCache, new as new_call_cache
1✔
37
from .error import OutputError, Interrupted, Terminated, RunFailed, error_json
1✔
38

39
if TYPE_CHECKING:  # otherwise-delayed heavy imports
1✔
40
    from .task_container import TaskContainer
×
41

42

43
def run_local_task(  # type: ignore[return]
1✔
44
    cfg: config.Loader,
45
    task: Tree.Task,
46
    inputs: Env.Bindings[Value.Base],
47
    run_id: Optional[str] = None,
48
    run_dir: Optional[str] = None,
49
    logger_prefix: Optional[List[str]] = None,
50
    _run_id_stack: Optional[List[str]] = None,
51
    _cache: Optional[CallCache] = None,
52
    _plugins: Optional[List[Callable[..., Any]]] = None,
53
) -> Tuple[str, Env.Bindings[Value.Base]]:
54
    """
55
    Run a task locally.
56

57
    Inputs shall have been typechecked already. File inputs are presumed to be local POSIX file
58
    paths that can be mounted into a container.
59

60
    :param run_id: unique ID for the run, defaults to workflow name
61
    :param run_dir: directory under which to create a timestamp-named subdirectory for this run
62
                    (defaults to current working directory).
63
                    If the final path component is ".", then operate in run_dir directly.
64
    """
65
    from .task_container import new as new_task_container  # delay heavy import
1✔
66

67
    _run_id_stack = _run_id_stack or []
1✔
68
    run_id = run_id or task.name
1✔
69
    logger_prefix = (logger_prefix or ["wdl"]) + ["t:" + run_id]
1✔
70
    logger = logging.getLogger(".".join(logger_prefix))
1✔
71
    with ExitStack() as cleanup:
1✔
72
        terminating = cleanup.enter_context(TerminationSignalFlag(logger))
1✔
73
        if terminating():
1✔
74
            raise Terminated(quiet=True)
×
75

76
        # provision run directory and log file
77
        run_dir = provision_run_dir(task.name, run_dir, last_link=not _run_id_stack)
1✔
78
        logfile = os.path.join(run_dir, "task.log")
1✔
79
        cleanup.enter_context(
1✔
80
            LoggingFileHandler(
81
                logger,
82
                logfile,
83
            )
84
        )
85
        if cfg.has_option("logging", "json") and cfg["logging"].get_bool("json"):
1✔
86
            cleanup.enter_context(
1✔
87
                LoggingFileHandler(
88
                    logger,
89
                    logfile + ".json",
90
                    json=True,
91
                )
92
            )
93
        logger.notice(
1✔
94
            _(
95
                "task setup",
96
                name=task.name,
97
                source=task.pos.uri,
98
                line=task.pos.line,
99
                column=task.pos.column,
100
                dir=run_dir,
101
                thread=threading.get_ident(),
102
            )
103
        )
104
        write_values_json(inputs, os.path.join(run_dir, "inputs.json"))
1✔
105

106
        if not _run_id_stack:
1✔
107
            cache = _cache or cleanup.enter_context(new_call_cache(cfg, logger))
1✔
108
            cache.flock(logfile, exclusive=True)  # no containing workflow; flock task.log
1✔
109
        else:
110
            cache = _cache
1✔
111
        assert cache
1✔
112

113
        cleanup.enter_context(_statusbar.task_slotted())
1✔
114
        maybe_container = None
1✔
115
        try:
1✔
116
            cache_key = f"{task.name}/{task.digest}/{Value.digest_env(inputs)}"
1✔
117
            cached = cache.get(cache_key, inputs, task.effective_outputs)
1✔
118
            if cached is not None:
1✔
119
                for decl in task.outputs:
1✔
120
                    v = cached[decl.name]
1✔
121
                    vj = json.dumps(v.json)
1✔
122
                    logger.info(
1✔
123
                        _(
124
                            "cached output",
125
                            name=decl.name,
126
                            value=(v.json if len(vj) < 4096 else "(((large)))"),
127
                        )
128
                    )
129
                # create out/ and outputs.json
130
                _outputs = link_outputs(
1✔
131
                    cache,
132
                    cached,
133
                    run_dir,
134
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
135
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
136
                )
137
                write_values_json(
1✔
138
                    cached, os.path.join(run_dir, "outputs.json"), namespace=task.name
139
                )
140
                logger.notice("done (cached)")
1✔
141
                # returning `cached`, not the rewritten `_outputs`, to retain opportunity to find
142
                # cached downstream inputs
143
                return (run_dir, cached)
1✔
144
            # start plugin coroutines and process inputs through them
145
            with compose_coroutines(
1✔
146
                [
147
                    (
148
                        lambda kwargs, cor=cor: cor(  # type: ignore
149
                            cfg, logger, _run_id_stack + [run_id], run_dir, task, **kwargs
150
                        )
151
                    )
152
                    for cor in (
153
                        [cor2 for _, cor2 in sorted(config.load_plugins(cfg, "task"))]
154
                        + (_plugins or [])
155
                    )
156
                ],
157
                {"inputs": inputs},
158
            ) as plugins:
159
                recv = next(plugins)
1✔
160
                inputs = recv["inputs"]
1✔
161

162
                # download input files, if needed
163
                posix_inputs = _download_input_files(
1✔
164
                    cfg,
165
                    logger,
166
                    logger_prefix,
167
                    run_dir,
168
                    _add_downloadable_defaults(cfg, task.available_inputs, inputs),
169
                    cache,
170
                )
171

172
                # create TaskContainer according to configuration
173
                container = new_task_container(cfg, logger, run_id, run_dir)
1✔
174
                maybe_container = container
1✔
175

176
                # evaluate input/postinput declarations, including mapping from host to
177
                # in-container file paths
178
                container_env = _eval_task_inputs(logger, task, posix_inputs, container)
1✔
179

180
                # evaluate runtime fields
181
                stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
182
                _eval_task_runtime(
1✔
183
                    cfg, logger, run_id, task, posix_inputs, container, container_env, stdlib
184
                )
185
                if task.effective_wdl_version not in ("draft-2", "1.0", "1.1"):
1✔
186
                    container.build_task_runtime_info_struct(logger, run_id, task)
1✔
187
                    assert container.task_runtime_info_struct is not None
1✔
188
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
189

190
                # interpolate command
191
                old_command_dedent = cfg["task_runtime"].get_bool("old_command_dedent")
1✔
192
                # pylint: disable=E1101
193
                placeholder_re = regex.compile(
1✔
194
                    cfg["task_runtime"]["placeholder_regex"], flags=regex.POSIX
195
                )
196
                setattr(
1✔
197
                    stdlib,
198
                    "_placeholder_regex",
199
                    placeholder_re,
200
                )  # hack to pass regex to WDL.Expr.Placeholder._eval
201
                assert isinstance(task.command, Expr.TaskCommand)
1✔
202
                command = task.command.eval(
1✔
203
                    container_env, stdlib, dedent=not old_command_dedent
204
                ).value
205
                delattr(stdlib, "_placeholder_regex")
1✔
206
                if old_command_dedent:  # see issue #674
1✔
207
                    command = _util.strip_leading_whitespace(command)[1]
1✔
208
                logger.debug(_("command", command=command.strip()))
1✔
209

210
                # process command & container through plugins
211
                recv = plugins.send({"command": command, "container": container})
1✔
212
                command, container = (recv[k] for k in ("command", "container"))
1✔
213

214
                # start container & run command (and retry if needed)
215
                _try_task(cfg, task, logger, container, command, terminating)
1✔
216

217
                # bind output declarations to task runtime info with the final return code
218
                if task.effective_wdl_version not in ("draft-2", "1.0", "1.1"):
1✔
219
                    container.update_task_runtime_info_struct(
1✔
220
                        return_code=(
221
                            Value.Int(container.last_exit_code)
222
                            if container.last_exit_code is not None
223
                            else Value.Null()
224
                        ),
225
                    )
226
                    assert container.task_runtime_info_struct is not None
1✔
227
                    container_env = container_env.bind("task", container.task_runtime_info_struct)
1✔
228

229
                # evaluate output declarations
230
                outputs = _eval_task_outputs(logger, run_id, task, container_env, container)
1✔
231

232
                # create output_links
233
                outputs = link_outputs(
1✔
234
                    cache,
235
                    outputs,
236
                    run_dir,
237
                    hardlinks=cfg["file_io"].get_bool("output_hardlinks"),
238
                    use_relative_output_paths=cfg["file_io"].get_bool("use_relative_output_paths"),
239
                )
240

241
                # process outputs through plugins
242
                recv = plugins.send({"outputs": outputs})
1✔
243
                outputs = recv["outputs"]
1✔
244

245
                # clean up, if so configured, and make sure output files will be accessible to
246
                # downstream tasks
247
                _delete_work(cfg, logger, container, True)
1✔
248
                chmod_R_plus(run_dir, file_bits=0o660, dir_bits=0o770)
1✔
249
                _warn_output_basename_collisions(logger, outputs)
1✔
250

251
                # write outputs.json
252
                write_values_json(
1✔
253
                    outputs, os.path.join(run_dir, "outputs.json"), namespace=task.name
254
                )
255
                logger.notice("done")
1✔
256
                if not run_id.startswith("download-"):
1✔
257
                    cache.put(cache_key, outputs, run_dir=run_dir)
1✔
258
                return (run_dir, outputs)
1✔
259
        except Exception as exn:
1✔
260
            tbtxt = traceback.format_exc()
1✔
261
            logger.debug(tbtxt)
1✔
262
            wrapper = RunFailed(task, run_id, run_dir)
1✔
263
            logmsg = _(
1✔
264
                str(wrapper),
265
                dir=run_dir,
266
                **error_json(
267
                    exn, traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None
268
                ),
269
            )
270
            if isinstance(exn, Terminated) and getattr(exn, "quiet", False):
1✔
UNCOV
271
                logger.debug(logmsg)
×
272
            else:
273
                logger.error(logmsg)
1✔
274
            try:
1✔
275
                write_atomic(
1✔
276
                    json.dumps(
277
                        error_json(
278
                            wrapper,
279
                            cause=exn,
280
                            traceback=tbtxt if not isinstance(exn, Error.RuntimeError) else None,
281
                        ),
282
                        indent=2,
283
                    ),
284
                    os.path.join(run_dir, "error.json"),
285
                )
286
            except Exception as exn2:
×
287
                logger.debug(traceback.format_exc())
×
288
                logger.critical(_("failed to write error.json", dir=run_dir, message=str(exn2)))
×
289
            try:
1✔
290
                if maybe_container:
1✔
291
                    _delete_work(cfg, logger, maybe_container, False)
1✔
292
            except Exception as exn2:
×
293
                logger.debug(traceback.format_exc())
×
294
                logger.error(_("delete_work also failed", exception=str(exn2)))
×
295
            raise wrapper from exn
1✔
296

297

298
def _download_input_files(
1✔
299
    cfg: config.Loader,
300
    logger: logging.Logger,
301
    logger_prefix: List[str],
302
    run_dir: str,
303
    inputs: Env.Bindings[Value.Base],
304
    cache: CallCache,
305
) -> Env.Bindings[Value.Base]:
306
    """
307
    Find all File & Directory input values that are downloadable URIs (including any nested within
308
    compound values). Download them to some location under run_dir and return a copy of the inputs
309
    with the URI values replaced by the downloaded paths.
310
    """
311

312
    downloads = 0
1✔
313
    download_bytes = 0
1✔
314
    cached_hits = 0
1✔
315

316
    def rewriter(v: Union[Value.Directory, Value.File]) -> str:
1✔
317
        nonlocal downloads, download_bytes, cached_hits
318
        directory = isinstance(v, Value.Directory)
1✔
319
        uri = v.value
1✔
320
        if downloadable(cfg, uri, directory=directory):
1✔
321
            logger.info(_(f"download input {'directory' if directory else 'file'}", uri=uri))
1✔
322
            cached, filename = download(
1✔
323
                cfg,
324
                logger,
325
                cache,
326
                uri,
327
                directory=directory,
328
                run_dir=os.path.join(run_dir, "download", str(downloads), "."),
329
                logger_prefix=logger_prefix + [f"download{downloads}"],
330
            )
331
            if cached:
1✔
332
                cached_hits += 1
1✔
333
            else:
334
                sz = pathsize(filename)
1✔
335
                logger.info(_("downloaded input", uri=uri, path=filename, bytes=sz))
1✔
336
                downloads += 1
1✔
337
                download_bytes += sz
1✔
338
            return filename
1✔
339
        return uri
1✔
340

341
    ans = Value.rewrite_env_paths(inputs, rewriter)
1✔
342
    if downloads or cached_hits:
1✔
343
        logger.notice(
1✔
344
            _(
345
                "processed input URIs",
346
                downloaded=downloads,
347
                downloaded_bytes=download_bytes,
348
                cached=cached_hits,
349
            )
350
        )
351
    return ans
1✔
352

353

354
def _add_downloadable_defaults(
1✔
355
    cfg: config.Loader, available_inputs: Env.Bindings[Tree.Decl], inputs: Env.Bindings[Value.Base]
356
) -> Env.Bindings[Value.Base]:
357
    """
358
    Look for available File/Directory inputs that default to a string constant appearing to be a
359
    downloadable URI. For each one, add a binding for that default to the user-supplied inputs (if
360
    not already overridden in them).
361

362
    This is to trigger download of the default URIs even though we otherwise don't evaluate input
363
    declarations until after processing downloads.
364
    """
365
    ans = inputs
1✔
366
    for b in available_inputs:
1✔
367
        if (
1✔
368
            isinstance(b.value.type, (Type.File, Type.Directory))
369
            and b.name not in ans
370
            and isinstance(b.value.expr, Expr.String)
371
        ):
372
            directory = isinstance(b.value.type, Type.Directory)
1✔
373
            maybe_uri = b.value.expr.literal
1✔
374
            if maybe_uri and downloadable(cfg, maybe_uri.value, directory=directory):
1✔
375
                v = (
1✔
376
                    Value.Directory(maybe_uri.value, b.value.expr)
377
                    if directory
378
                    else Value.File(maybe_uri.value, b.value.expr)
379
                )
380
                ans = ans.bind(b.name, v)
1✔
381
    return ans
1✔
382

383

384
def _eval_task_inputs(
1✔
385
    logger: logging.Logger,
386
    task: Tree.Task,
387
    posix_inputs: Env.Bindings[Value.Base],
388
    container: "TaskContainer",
389
) -> Env.Bindings[Value.Base]:
390
    # Preprocess inputs: if None value is supplied for an input declared with a default but without
391
    # the ? type quantifier, remove the binding entirely so that the default will be used. In
392
    # contrast, if the input declaration has an -explicitly- optional type, then we'll allow the
393
    # supplied None to override any default.
394
    input_decls = task.available_inputs
1✔
395
    posix_inputs = posix_inputs.filter(
1✔
396
        lambda b: (
397
            not (
398
                isinstance(b.value, Value.Null)
399
                and b.name in input_decls
400
                and input_decls[b.name].expr
401
                and not input_decls[b.name].type.optional
402
            )
403
        )
404
    )
405

406
    # Map all the provided input File & Directory paths to in-container paths
407
    container.add_paths(_fspaths(posix_inputs))
1✔
408
    _warn_input_basename_collisions(logger, container)
1✔
409

410
    # copy posix_inputs with all File & Directory values mapped to their in-container paths
411
    def map_paths(fn: Union[Value.File, Value.Directory]) -> str:
1✔
412
        p = fn.value.rstrip("/")
1✔
413
        if isinstance(fn, Value.Directory):
1✔
414
            p += "/"
1✔
415
        return container.input_path_map[p]
1✔
416

417
    container_inputs = Value.rewrite_env_paths(posix_inputs, map_paths)
1✔
418

419
    # initialize value environment with the inputs
420
    container_env: Env.Bindings[Value.Base] = Env.Bindings()
1✔
421
    for b in container_inputs:
1✔
422
        assert isinstance(b, Env.Binding)
1✔
423
        v = b.value
1✔
424
        assert isinstance(v, Value.Base)
1✔
425
        container_env = container_env.bind(b.name, v)
1✔
426
        vj = json.dumps(v.json)
1✔
427
        logger.info(_("input", name=b.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
428

429
    # collect remaining declarations requiring evaluation.
430
    decls_to_eval = []
1✔
431
    for decl in (task.inputs or []) + (task.postinputs or []):
1✔
432
        if not container_env.has_binding(decl.name):
1✔
433
            decls_to_eval.append(decl)
1✔
434

435
    # topsort them according to internal dependencies. prior static validation
436
    # should have ensured they're acyclic.
437
    decls_by_id, decls_adj = Tree._decl_dependency_matrix(decls_to_eval)
1✔
438
    decls_to_eval = [decls_by_id[did] for did in _util.topsort(decls_adj)]
1✔
439
    assert len(decls_by_id) == len(decls_to_eval)
1✔
440

441
    # evaluate each declaration in that order
442
    # note: the write_* functions call container.add_paths as a side-effect
443
    stdlib = InputStdLib(task.effective_wdl_version, logger, container)
1✔
444
    for decl in decls_to_eval:
1✔
445
        assert isinstance(decl, Tree.Decl)
1✔
446
        v = Value.Null()
1✔
447
        if decl.expr:
1✔
448
            try:
1✔
449
                v = decl.expr.eval(container_env, stdlib=stdlib).coerce(decl.type)
1✔
450
            except Error.RuntimeError as exn:
1✔
451
                setattr(exn, "job_id", decl.workflow_node_id)
1✔
452
                raise exn
1✔
453
            except Exception as exn:
×
454
                exn2 = Error.EvalError(decl, str(exn))
×
455
                setattr(exn2, "job_id", decl.workflow_node_id)
×
456
                raise exn2 from exn
×
457
        else:
458
            assert decl.type.optional
1✔
459
        vj = json.dumps(v.json)
1✔
460
        logger.info(_("eval", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))")))
1✔
461
        container_env = container_env.bind(decl.name, v)
1✔
462

463
    return container_env
1✔
464

465

466
def _fspaths(env: Env.Bindings[Value.Base]) -> Set[str]:
1✔
467
    """
468
    Get the unique paths of all File & Directory values in the environment. Directory paths will
469
    have a trailing '/'.
470
    """
471
    ans = set()
1✔
472

473
    def collector(v: Value.Base) -> None:
1✔
474
        if isinstance(v, Value.File):
1✔
475
            assert not v.value.endswith("/")
1✔
476
            ans.add(v.value)
1✔
477
        elif isinstance(v, Value.Directory):
1✔
478
            ans.add(v.value.rstrip("/") + "/")
1✔
479
        for ch in v.children:
1✔
480
            collector(ch)
1✔
481

482
    for b in env:
1✔
483
        collector(b.value)
1✔
484
    return ans
1✔
485

486

487
def _warn_input_basename_collisions(logger: logging.Logger, container: "TaskContainer") -> None:
1✔
488
    basenames = Counter(
1✔
489
        [os.path.basename((p[:-1] if p.endswith("/") else p)) for p in container.input_path_map_rev]
490
    )
491
    collisions = [nm for nm, n in basenames.items() if n > 1]
1✔
492
    if collisions:
1✔
493
        logger.warning(
1✔
494
            _(
495
                "mounting input files with colliding basenames in separate container directories",
496
                basenames=collisions,
497
            )
498
        )
499

500

501
def _eval_task_runtime(
1✔
502
    cfg: config.Loader,
503
    logger: logging.Logger,
504
    run_id: str,
505
    task: Tree.Task,
506
    inputs: Env.Bindings[Value.Base],
507
    container: "TaskContainer",
508
    env: Env.Bindings[Value.Base],
509
    stdlib: StdLib.Base,
510
) -> None:
511
    # evaluate runtime{} expressions (merged with any configured defaults)
512
    runtime_defaults = cfg.get_dict("task_runtime", "defaults")
1✔
513
    if run_id.startswith("download-"):
1✔
514
        runtime_defaults.update(cfg.get_dict("task_runtime", "download_defaults"))
1✔
515
    runtime_values = {}
1✔
516
    for key, v in runtime_defaults.items():
1✔
517
        runtime_values[key] = Value.from_json(Type.Any(), v)
1✔
518
    for key, expr in task.runtime.items():  # evaluate expressions in source code
1✔
519
        runtime_values[key] = expr.eval(env, stdlib)
1✔
520
    for b in inputs.enter_namespace("runtime"):
1✔
521
        runtime_values[b.name] = b.value  # input overrides
1✔
522
    for b in inputs.enter_namespace("requirements"):
1✔
523
        runtime_values[b.name] = b.value
1✔
524
    logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items())))
1✔
525

526
    # have container implementation validate & postprocess into container.runtime_values
527
    container.process_runtime(logger, runtime_values)
1✔
528

529
    if container.runtime_values:
1✔
530
        logger.info(_("effective runtime", **container.runtime_values))
1✔
531

532
    # add any configured overrides for in-container environment variables
533
    container.runtime_values.setdefault("env", {})
1✔
534
    env_vars_override = {}
1✔
535
    env_vars_skipped = []
1✔
536
    for ev_name, ev_value in cfg["task_runtime"].get_dict("env").items():
1✔
537
        if ev_value is None:
1✔
538
            try:
1✔
539
                env_vars_override[ev_name] = os.environ[ev_name]
1✔
540
            except KeyError:
1✔
541
                env_vars_skipped.append(ev_name)
1✔
542
        else:
543
            env_vars_override[ev_name] = str(ev_value)
1✔
544
    if env_vars_skipped:
1✔
545
        logger.warning(
1✔
546
            _("skipping pass-through of undefined environment variable(s)", names=env_vars_skipped)
547
        )
548
    if cfg.get_bool("file_io", "mount_tmpdir") or task.name in cfg.get_list(
1✔
549
        "file_io", "mount_tmpdir_for"
550
    ):
551
        env_vars_override["TMPDIR"] = os.path.join(
1✔
552
            container.container_dir, "work", "_miniwdl_tmpdir"
553
        )
554
    if env_vars_override:
1✔
555
        # usually don't dump values into log, as they may often be auth tokens
556
        logger.notice(
1✔
557
            _(
558
                "overriding environment variables (portability warning)",
559
                names=list(env_vars_override.keys()),
560
            )
561
        )
562
        logger.debug(
1✔
563
            _("overriding environment variables (portability warning)", **env_vars_override)
564
        )
565
        container.runtime_values["env"].update(env_vars_override)
1✔
566

567
    # process decls with "env" decorator (EXPERIMENTAL)
568
    env_decls: Dict[str, Value.Base] = {}
1✔
569
    for decl in (task.inputs or []) + task.postinputs:
1✔
570
        if decl.decor.get("env", False) is True:
1✔
571
            if not env_decls:
1✔
572
                logger.warning(
1✔
573
                    "task env declarations are an experimental feature, subject to change"
574
                )
575
            v = env[decl.name]
1✔
576
            if isinstance(v, (Value.String, Value.File, Value.Directory)):
1✔
577
                v = v.value
1✔
578
            else:
579
                v = json.dumps(v.json)
1✔
580
            env_decls[decl.name] = v
1✔
581
    container.runtime_values["env"].update(env_decls)
1✔
582

583
    unused_keys = list(
1✔
584
        key
585
        for key in runtime_values
586
        if key not in ("memory", "docker", "container") and key not in container.runtime_values
587
    )
588
    if unused_keys:
1✔
589
        logger.warning(_("ignored runtime settings", keys=unused_keys))
1✔
590

591

592
def _try_task(
1✔
593
    cfg: config.Loader,
594
    task: Tree.Task,
595
    logger: logging.Logger,
596
    container: "TaskContainer",
597
    command: str,
598
    terminating: Callable[[], bool],
599
) -> None:
600
    """
601
    Run the task command in the container, retrying up to runtime.preemptible occurrences of
602
    Interrupted errors, plus up to runtime.maxRetries occurrences of any error.
603
    """
604
    from docker.errors import BuildError as DockerBuildError  # delay heavy import
1✔
605

606
    max_retries = container.runtime_values.get("maxRetries", 0)
1✔
607
    max_interruptions = container.runtime_values.get("preemptible", 0)
1✔
608
    retries = 0
1✔
609
    interruptions = 0
1✔
610

611
    while True:
×
612
        if terminating():
1✔
613
            raise Terminated()
1✔
614
        # copy input files, if needed
615
        if cfg.get_bool("file_io", "copy_input_files") or task.name in cfg.get_list(
1✔
616
            "file_io", "copy_input_files_for"
617
        ):
618
            container.copy_input_files(logger)
1✔
619
        host_tmpdir = (
1✔
620
            os.path.join(container.host_work_dir(), "_miniwdl_tmpdir")
621
            if cfg.get_bool("file_io", "mount_tmpdir")
622
            or task.name in cfg.get_list("file_io", "mount_tmpdir_for")
623
            else None
624
        )
625

626
        try:
1✔
627
            # start container & run command
628
            if host_tmpdir:
1✔
629
                logger.debug(_("creating task temp directory", TMPDIR=host_tmpdir))
1✔
630
                os.mkdir(host_tmpdir, mode=0o770)
1✔
631
            try:
1✔
632
                if task.effective_wdl_version not in ("draft-2", "1.0", "1.1"):
1✔
633
                    container.update_task_runtime_info_struct(
1✔
634
                        attempt=Value.Int(max(0, container.try_counter - 1)),
635
                        return_code=Value.Null(),
636
                    )
637
                    # FIXME: The command has already been interpolated, so retry attempts won't see
638
                    # the updated task.attempt value; output declarations will.
639
                return container.run(logger, command)
1✔
640
            finally:
641
                if host_tmpdir:
1✔
642
                    logger.info(_("deleting task temp directory", TMPDIR=host_tmpdir))
1✔
643
                    rmtree_atomic(host_tmpdir)
1✔
644
                if (
1✔
645
                    "preemptible" in container.runtime_values
646
                    and cfg.has_option("task_runtime", "_mock_interruptions")
647
                    and interruptions < cfg["task_runtime"].get_int("_mock_interruptions")
648
                ):
649
                    raise Interrupted("mock interruption") from None
1✔
650
        except Exception as exn:
1✔
651
            if isinstance(exn, Interrupted) and interruptions < max_interruptions:
1✔
652
                logger.error(
1✔
653
                    _(
654
                        "interrupted task will be retried",
655
                        error=exn.__class__.__name__,
656
                        message=str(exn),
657
                        prev_interruptions=interruptions,
658
                        max_interruptions=max_interruptions,
659
                    )
660
                )
661
                interruptions += 1
1✔
662
            elif (
1✔
663
                not isinstance(exn, (Terminated, DockerBuildError))
664
                and retries < max_retries
665
                and not terminating()
666
            ):
667
                logger.error(
1✔
668
                    _(
669
                        "failed task will be retried",
670
                        error=exn.__class__.__name__,
671
                        message=str(exn),
672
                        prev_retries=retries,
673
                        max_retries=max_retries,
674
                    )
675
                )
676
                retries += 1
1✔
677
            else:
678
                raise
1✔
679
            _delete_work(cfg, logger, container, False)
1✔
680
            container.reset(logger)
1✔
681

682

683
def _eval_task_outputs(
1✔
684
    logger: logging.Logger,
685
    run_id: str,
686
    task: Tree.Task,
687
    env: Env.Bindings[Value.Base],
688
    container: "TaskContainer",
689
) -> Env.Bindings[Value.Base]:
690
    stdout_file = os.path.join(container.host_dir, "stdout.txt")
1✔
691
    with suppress(FileNotFoundError):
1✔
692
        if os.path.getsize(stdout_file) > 0 and not run_id.startswith("download-"):
1✔
693
            # If the task produced nonempty stdout that isn't used in the WDL outputs, generate a
694
            # courtesy log message directing user where to find it
695
            stdout_used = False
1✔
696
            expr_stack = [outp.expr for outp in task.outputs]
1✔
697
            while expr_stack:
1✔
698
                expr = expr_stack.pop()
1✔
699
                assert isinstance(expr, Expr.Base)
1✔
700
                if isinstance(expr, Expr.Apply) and expr.function_name == "stdout":
1✔
701
                    stdout_used = True
1✔
702
                else:
703
                    expr_stack.extend(expr.children)  # type: ignore[arg-type]
1✔
704
            if not stdout_used:
1✔
705
                logger.info(
1✔
706
                    _(
707
                        "command stdout unused; consider output `File cmd_out = stdout()`"
708
                        " or redirect command to stderr log >&2",
709
                        stdout_file=stdout_file,
710
                    )
711
                )
712

713
    # Helpers to rewrite File/Directory from in-container paths to host paths
714
    # First pass -- convert nonexistent output paths to None/Null
715
    def rewriter1(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
716
        container_path = v.value
1✔
717
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
718
            container_path += "/"
1✔
719
        if container.host_path(container_path) is None:
1✔
720
            logger.warning(
1✔
721
                _(
722
                    "output path not found in container (error unless declared type is optional)",
723
                    output=output_name,
724
                    path=container_path,
725
                )
726
            )
727
            return None
1✔
728
        return v.value
1✔
729

730
    # Second pass -- convert in-container paths to host paths
731
    def rewriter2(v: Union[Value.File, Value.Directory], output_name: str) -> Optional[str]:
1✔
732
        container_path = v.value
1✔
733
        if isinstance(v, Value.Directory) and not container_path.endswith("/"):
1✔
734
            container_path += "/"
1✔
735
        host_path = container.host_path(container_path)
1✔
736
        assert host_path is not None
1✔
737
        if isinstance(v, Value.Directory):
1✔
738
            if host_path.endswith("/"):
1✔
739
                host_path = host_path[:-1]
1✔
740
            _check_directory(host_path, output_name)
1✔
741
            logger.debug(_("output dir", container=container_path, host=host_path))
1✔
742
        else:
743
            logger.debug(_("output file", container=container_path, host=host_path))
1✔
744
        return host_path
1✔
745

746
    stdlib = OutputStdLib(task.effective_wdl_version, logger, container)
1✔
747
    outputs: Env.Bindings[Value.Base] = Env.Bindings()
1✔
748
    for decl in task.outputs:
1✔
749
        assert decl.expr
1✔
750
        try:
1✔
751
            v = decl.expr.eval(env, stdlib=stdlib).coerce(decl.type)
1✔
752
        except Error.RuntimeError as exn:
1✔
753
            setattr(exn, "job_id", decl.workflow_node_id)
1✔
754
            raise exn
1✔
755
        except Exception as exn:
×
756
            exn2 = Error.EvalError(decl, str(exn))
×
757
            setattr(exn2, "job_id", decl.workflow_node_id)
×
758
            raise exn2 from exn
×
759
        _warn_struct_extra(logger, decl.name, v)
1✔
760
        vj = json.dumps(v.json)
1✔
761
        logger.info(
1✔
762
            _("output", name=decl.name, value=(v.json if len(vj) < 4096 else "(((large)))"))
763
        )
764

765
        # Now, a delicate sequence for postprocessing File outputs (including Files nested within
766
        # compound values)
767

768
        # First convert nonexistent paths to None/Null, and bind this in the environment for
769
        # evaluating subsequent output expressions.
770
        v = Value.rewrite_paths(v, lambda w: rewriter1(w, decl.name))
1✔
771
        env = env.bind(decl.name, v)
1✔
772
        # check if any nonexistent paths were provided for (non-optional) File/Directory types
773
        # Value.Null.coerce has a special behavior for us to raise FileNotFoundError for a
774
        # non-optional File/Directory type.
775
        try:
1✔
776
            v = v.coerce(decl.type)
1✔
777
        except FileNotFoundError:
1✔
778
            err = OutputError("File/Directory path not found in task output " + decl.name)
1✔
779
            setattr(err, "job_id", decl.workflow_node_id)
1✔
780
            raise err
1✔
781
        # Rewrite in-container paths to host paths
782
        v = Value.rewrite_paths(v, lambda w: rewriter2(w, decl.name))
1✔
783
        outputs = outputs.bind(decl.name, v)
1✔
784

785
    return outputs
1✔
786

787

788
def _check_directory(host_path: str, output_name: str) -> None:
1✔
789
    """
790
    traverse output directory to check that all symlinks are relative & resolve inside the dir
791
    """
792

793
    def raiser(exc: OSError):
1✔
794
        raise exc
×
795

796
    for root, subdirs, files in os.walk(host_path, onerror=raiser, followlinks=False):
1✔
797
        for fn in files:
1✔
798
            fn = os.path.join(root, fn)
1✔
799
            if os.path.islink(fn) and (
1✔
800
                not os.path.exists(fn)
801
                or os.path.isabs(os.readlink(fn))
802
                or not path_really_within(fn, host_path)
803
            ):
804
                raise OutputError(f"Directory in output {output_name} contains unusable symlink")
1✔
805

806

807
def _warn_struct_extra(
1✔
808
    logger: logging.Logger, decl_name: str, v: Value.Base, warned_keys: Optional[Set[str]] = None
809
) -> None:
810
    """
811
    Log notices about extraneous keys found in struct initialization from JSON/Map/Object
812
    """
813
    if warned_keys is None:
1✔
814
        warned_keys = set()
1✔
815
    if isinstance(v, Value.Struct) and v.extra:
1✔
816
        extra_keys = set(k for k in v.extra if not k.startswith("#"))
1✔
817
        if extra_keys - warned_keys:
1✔
818
            logger.notice(
1✔
819
                _(
820
                    "extraneous keys in struct initializer",
821
                    decl=decl_name,
822
                    struct=str(v.type),
823
                    extra_keys=list(extra_keys),
824
                )
825
            )
826
            warned_keys.update(extra_keys)
1✔
827
    for ch in v.children:
1✔
828
        _warn_struct_extra(logger, decl_name, ch, warned_keys)
1✔
829

830

831
def link_outputs(
1✔
832
    cache: CallCache,
833
    outputs: Env.Bindings[Value.Base],
834
    run_dir: str,
835
    hardlinks: bool = False,
836
    use_relative_output_paths: bool = False,
837
) -> Env.Bindings[Value.Base]:
838
    """
839
    Following a successful run, the output files may be scattered throughout a complex directory
840
    tree used for execution. To help navigating this, generate a subdirectory of the run directory
841
    containing nicely organized symlinks to the output files, and rewrite File values in the
842
    outputs env to use these symlinks.
843
    """
844

845
    def link1(target: str, link: str, directory: bool) -> None:
1✔
846
        if hardlinks:
1✔
847
            # TODO: what if target is an input from a different filesystem?
848
            if directory:
1✔
849
                shutil.copytree(target, link, symlinks=True, copy_function=link_force)
1✔
850
            else:
851
                link_force(target, link)
1✔
852
        else:
853
            symlink_force(target, link)
1✔
854

855
    def map_paths(v: Value.Base, dn: str) -> Value.Base:
1✔
856
        if isinstance(v, (Value.File, Value.Directory)):
1✔
857
            target = (
1✔
858
                v.value
859
                if os.path.exists(v.value)
860
                else cache.get_download(v.value, isinstance(v, Value.Directory))
861
            )
862
            if target:
1✔
863
                target = os.path.realpath(target)
1✔
864
                assert os.path.exists(target)
1✔
865
                if not hardlinks and path_really_within(target, os.path.dirname(run_dir)):
1✔
866
                    # make symlink relative
867
                    target = os.path.relpath(target, start=os.path.realpath(dn))
1✔
868
                link = os.path.join(dn, os.path.basename(v.value.rstrip("/")))
1✔
869
                os.makedirs(dn, exist_ok=False)
1✔
870
                link1(target, link, isinstance(v, Value.Directory))
1✔
871
                # Drop a dotfile alongside Directory outputs, to inform a program crawling the out/
872
                # directory without reference to the output types or JSON for whatever reason. It
873
                # might otherwise have trouble distinguishing Directory outputs among the
874
                # structured subdirectories we create for compound types.
875
                if isinstance(v, Value.Directory):
1✔
876
                    with open(os.path.join(dn, ".WDL_Directory"), "w") as _dotfile:
1✔
877
                        pass
1✔
878
                v.value = link
1✔
879
        # recurse into compound values
880
        elif isinstance(v, Value.Array) and v.value:
1✔
881
            d = int(math.ceil(math.log10(len(v.value))))  # how many digits needed
1✔
882
            for i in range(len(v.value)):
1✔
883
                v.value[i] = map_paths(v.value[i], os.path.join(dn, str(i).rjust(d, "0")))
1✔
884
        elif isinstance(v, Value.Map) and v.value:
1✔
885
            # create a subdirectory for each key, as long as the key names seem to make reasonable
886
            # path components; otherwise, treat the dict as a list of its values
887
            keys_ok = (
1✔
888
                sum(
889
                    1
890
                    for b in v.value
891
                    if regex.fullmatch("[-_a-zA-Z0-9][-_a-zA-Z0-9.]*", str(b[0]).strip("'\""))
892
                    is None
893
                )
894
                == 0
895
            )
896
            d = int(math.ceil(math.log10(len(v.value))))
1✔
897
            for i, b in enumerate(v.value):
1✔
898
                v.value[i] = (
1✔
899
                    b[0],
900
                    map_paths(
901
                        b[1],
902
                        os.path.join(
903
                            dn, str(b[0]).strip("'\"") if keys_ok else str(i).rjust(d, "0")
904
                        ),
905
                    ),
906
                )
907
        elif isinstance(v, Value.Pair):
1✔
908
            v.value = (
1✔
909
                map_paths(v.value[0], os.path.join(dn, "left")),
910
                map_paths(v.value[1], os.path.join(dn, "right")),
911
            )
912
        elif isinstance(v, Value.Struct):
1✔
913
            for key in v.value:
1✔
914
                v.value[key] = map_paths(v.value[key], os.path.join(dn, key))
1✔
915
        return v
1✔
916

917
    os.makedirs(os.path.join(run_dir, "out"), exist_ok=False)
1✔
918

919
    if use_relative_output_paths:
1✔
920
        return link_outputs_relative(link1, cache, outputs, run_dir, hardlinks=hardlinks)
1✔
921

922
    return outputs.map(
1✔
923
        lambda binding: Env.Binding(
924
            binding.name,
925
            map_paths(
926
                Value.rewrite_paths(binding.value, lambda v: v.value),  # nop to deep copy
927
                os.path.join(run_dir, "out", binding.name),
928
            ),
929
        )
930
    )
931

932

933
def link_outputs_relative(
1✔
934
    link1: Callable[[str, str, bool], None],
935
    cache: CallCache,
936
    outputs: Env.Bindings[Value.Base],
937
    run_dir: str,
938
    hardlinks: bool = False,
939
) -> Env.Bindings[Value.Base]:
940
    """
941
    link_outputs with [file_io] use_relative_output_paths = true. We organize the links to reflect
942
    the generated files' paths relative to their task working directory.
943
    """
944
    link_destinations: Dict[str, str] = dict()
1✔
945

946
    def map_path_relative(v: Union[Value.File, Value.Directory]) -> str:
1✔
947
        target = (
1✔
948
            v.value
949
            if os.path.exists(v.value)
950
            else cache.get_download(v.value, isinstance(v, Value.Directory))
951
        )
952
        if target:
1✔
953
            real_target = os.path.realpath(target)
1✔
954
            rel_link = None
1✔
955
            if path_really_within(target, os.path.join(run_dir, "work")):
1✔
956
                # target was generated by current task; use its path relative to the task work dir
957
                if not os.path.basename(run_dir).startswith("download-"):  # except download tasks
1✔
958
                    rel_link = os.path.relpath(
1✔
959
                        real_target, os.path.realpath(os.path.join(run_dir, "work"))
960
                    )
961
            else:
962
                # target is an out/ link generated by a call in the current workflow OR a cached
963
                # run; use the link's path relative to that out/ dir, which by induction should
964
                # equal its path relative to the original work/ dir.
965
                # we need heuristic to find the out/ dir in a task/workflow run directory, since the
966
                # user's cwd or the task-generated relative path might coincidentally have
967
                # something named 'out'.
968
                p = None
1✔
969
                for p in reversed([m.span()[0] for m in regex.finditer("/out(?=/)", target)]):
1✔
970
                    if p and (
1✔
971
                        os.path.isfile(os.path.join(target[:p], "task.log"))
972
                        or os.path.isfile(os.path.join(target[:p], "workflow.log"))
973
                    ):
974
                        break
1✔
975
                    p = None
1✔
976
                if p and p + 5 < len(target):
1✔
977
                    rel_link = os.path.relpath(target, target[: p + 5])
1✔
978
            # if neither of the above cases applies, then fall back to just the target basename
979
            rel_link = rel_link or os.path.basename(target)
1✔
980
            abs_link = os.path.join(os.path.join(run_dir, "out"), rel_link)
1✔
981
            if link_destinations.get(abs_link, real_target) != real_target:
1✔
982
                raise FileExistsError(
1✔
983
                    "Output filename collision; to allow this, set"
984
                    " [file_io] use_relative_output_paths = false. Affected path: " + abs_link
985
                )
986
            os.makedirs(os.path.dirname(abs_link), exist_ok=True)
1✔
987
            link1(real_target, abs_link, isinstance(v, Value.Directory))
1✔
988
            link_destinations[abs_link] = real_target
1✔
989
            return abs_link
1✔
990
        return v.value
×
991

992
    return Value.rewrite_env_paths(outputs, map_path_relative)
1✔
993

994

995
def _warn_output_basename_collisions(
1✔
996
    logger: logging.Logger, outputs: Env.Bindings[Value.Base]
997
) -> None:
998
    targets_by_basename: Dict[str, Set[str]] = {}
1✔
999

1000
    def walker(v: Union[Value.File, Value.Directory]) -> str:
1✔
1001
        target = v.value
1✔
1002
        if os.path.exists(target):
1✔
1003
            target = os.path.realpath(target)
1✔
1004
        basename = os.path.basename(target)
1✔
1005
        targets_by_basename.setdefault(basename, set()).add(target)
1✔
1006
        return v.value
1✔
1007

1008
    Value.rewrite_env_paths(outputs, walker)
1✔
1009

1010
    collisions = [bn for bn, targets in targets_by_basename.items() if len(targets) > 1]
1✔
1011
    if collisions:
1✔
1012
        logger.warning(
1✔
1013
            _(
1014
                "multiple output files share the same basename; while miniwdl supports this,"
1015
                " consider modifying WDL to ensure distinct output basenames",
1016
                basenames=collisions,
1017
            )
1018
        )
1019

1020

1021
def _delete_work(
1✔
1022
    cfg: config.Loader,
1023
    logger: logging.Logger,
1024
    container: "Optional[TaskContainer]",
1025
    success: bool,
1026
) -> None:
1027
    opt = cfg["file_io"]["delete_work"].strip().lower()
1✔
1028
    if container and (
1✔
1029
        opt == "always" or (success and opt == "success") or (not success and opt == "failure")
1030
    ):
1031
        if success and not cfg["file_io"].get_bool("output_hardlinks"):
1✔
1032
            logger.warning(
1✔
1033
                "ignoring configuration [file_io] delete_work because it requires also output_hardlinks = true"
1034
            )
1035
            return
1✔
1036
        container.delete_work(logger, delete_streams=not success)
1✔
1037

1038

1039
class _StdLib(StdLib.Base):
1✔
1040
    logger: logging.Logger
1✔
1041
    container: "TaskContainer"
1✔
1042
    inputs_only: bool  # if True then only permit access to input files
1✔
1043

1044
    def __init__(
1✔
1045
        self,
1046
        wdl_version: str,
1047
        logger: logging.Logger,
1048
        container: "TaskContainer",
1049
        inputs_only: bool,
1050
    ) -> None:
1051
        super().__init__(wdl_version, write_dir=os.path.join(container.host_dir, "write_"))
1✔
1052
        self.logger = logger
1✔
1053
        self.container = container
1✔
1054
        self.inputs_only = inputs_only
1✔
1055

1056
    def _devirtualize_filename(self, filename: str) -> str:
1✔
1057
        # check allowability of reading this file, & map from in-container to host
1058
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
1059
        if ans is None:
1✔
1060
            raise OutputError("function was passed non-existent file " + filename)
×
1061
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
1062
        return ans
1✔
1063

1064
    def _virtualize_filename(self, filename: str) -> str:
1✔
1065
        # register new file with container input_path_map
1066
        self.container.add_paths([filename])
1✔
1067
        self.logger.debug(
1✔
1068
            _("write_", host=filename, container=self.container.input_path_map[filename])
1069
        )
1070
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
1071
        return self.container.input_path_map[filename]
1✔
1072

1073

1074
class InputStdLib(_StdLib):
1✔
1075
    # StdLib for evaluation of task inputs and command
1076
    def __init__(
1✔
1077
        self,
1078
        wdl_version: str,
1079
        logger: logging.Logger,
1080
        container: "TaskContainer",
1081
    ) -> None:
1082
        super().__init__(wdl_version, logger, container, True)
1✔
1083

1084

1085
class OutputStdLib(_StdLib):
1✔
1086
    # StdLib for evaluation of task outputs
1087
    def __init__(
1✔
1088
        self,
1089
        wdl_version: str,
1090
        logger: logging.Logger,
1091
        container: "TaskContainer",
1092
    ) -> None:
1093
        super().__init__(wdl_version, logger, container, False)
1✔
1094

1095
        setattr(
1✔
1096
            self,
1097
            "stdout",
1098
            StdLib.StaticFunction(
1099
                "stdout",
1100
                [],
1101
                Type.File(),
1102
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
1103
            ),
1104
        )
1105
        setattr(
1✔
1106
            self,
1107
            "stderr",
1108
            StdLib.StaticFunction(
1109
                "stderr",
1110
                [],
1111
                Type.File(),
1112
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
1113
            ),
1114
        )
1115

1116
        def _glob(pattern: Value.String, lib: OutputStdLib = self) -> Value.Array:
1✔
1117
            pat = pattern.coerce(Type.String()).value
1✔
1118
            if not pat:
1✔
1119
                raise OutputError("empty glob() pattern")
×
1120
            assert isinstance(pat, str)
1✔
1121
            if pat[0] == "/":
1✔
1122
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
1123
            if pat.startswith("..") or "/.." in pat:
1✔
1124
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
1125
            if pat.startswith("./"):
1✔
1126
                pat = pat[2:]
1✔
1127
            # glob the host directory
1128
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
1129
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
1130
            # convert the host filenames to in-container filenames
1131
            container_files = []
1✔
1132
            for hf in host_files:
1✔
1133
                dstrip = lib.container.host_work_dir()
1✔
1134
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
1135
                assert hf.startswith(dstrip)
1✔
1136
                container_files.append(
1✔
1137
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
1138
                )
1139
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
1140

1141
        setattr(
1✔
1142
            self,
1143
            "glob",
1144
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
1145
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc