• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chanzuckerberg / miniwdl / 26370286027

24 May 2026 07:13PM UTC coverage: 95.698% (+0.02%) from 95.679%
26370286027

Pull #880

github

web-flow
Merge dcdb32b30 into 3331d6c09
Pull Request #880: refactor runtime code

759 of 776 new or added lines in 5 files covered. (97.81%)

15 existing lines in 2 files now uncovered.

8476 of 8857 relevant lines covered (95.7%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.64
/WDL/runtime/_stdlib.py
1
"""
2
Runtime StdLib implementations for tasks and workflows.
3
"""
4

5
import glob
1✔
6
import hashlib
1✔
7
import logging
1✔
8
import os
1✔
9
from typing import TYPE_CHECKING, Optional
1✔
10

11
from .. import Env, Error, StdLib, Type, Value
1✔
12
from .._util import WDLVersion, wdl_version_geq
1✔
13
from .._util import StructuredLogMessage as _
1✔
14
from . import config
1✔
15
from .cache import CallCache
1✔
16
from .download import able as downloadable
1✔
17
from .error import OutputError
1✔
18
from ._io_helpers import _resolve_source_relative_path, _resolve_workflow_path, _source_directory
1✔
19

20
if TYPE_CHECKING:
1✔
NEW
21
    from .task_container import TaskContainer
×
NEW
22
    from ._workflow_state import StateMachine
×
23

24

25
class TaskStdLib(StdLib.Base):
1✔
26
    logger: logging.Logger
1✔
27
    container: "TaskContainer"
1✔
28
    inputs_only: bool  # if True then only permit access to input files
1✔
29
    source_directory: str
1✔
30

31
    def __init__(
1✔
32
        self,
33
        wdl_version: str,
34
        logger: logging.Logger,
35
        container: "TaskContainer",
36
        inputs_only: bool,
37
        source_directory: str = "",
38
        eval_context: Optional[StdLib.EvalContext] = None,
39
    ) -> None:
40
        super().__init__(
1✔
41
            wdl_version,
42
            write_dir=os.path.join(container.host_dir, "write_"),
43
            eval_context=eval_context,
44
        )
45
        self.logger = logger
1✔
46
        self.container = container
1✔
47
        self.inputs_only = inputs_only
1✔
48
        self.source_directory = source_directory
1✔
49

50
    def _source_relative_host_path(self, filename: str, desc: str) -> str:
1✔
51
        directory = filename.endswith("/")
1✔
52
        value = Value.Directory(filename) if directory else Value.File(filename)
1✔
53
        ans = _resolve_source_relative_path(self.container.cfg, self.source_directory, desc, value)
1✔
54
        if ans is None:
1✔
55
            raise Error.InputError(f"File/Directory path not found in {desc}: {filename}")
1✔
56
        return ans
1✔
57

58
    def _devirtualize_filename(self, filename: str) -> str:
1✔
59
        """
60
        Return the host path for task StdLib direct file access.
61

62
        Directory paths are denoted by a trailing "/".
63
        Input/private evaluation may read WDL 1.2 source-relative paths directly from the host
64
        source directory. Output evaluation keeps existing task-output semantics and resolves paths
65
        only through the execution directory or already-localized inputs.
66
        """
67
        # check allowability of reading this file, & map from in-container to host
68
        directory = filename.endswith("/")
1✔
69
        ans = self.container.host_path(filename, inputs_only=self.inputs_only)
1✔
70
        if (
1✔
71
            ans is None
72
            and self.inputs_only
73
            and wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
74
            and not os.path.isabs(filename)
75
            and not downloadable(self.container.cfg, filename, directory=directory)
76
        ):
77
            ans = self._source_relative_host_path(filename, "read_*() argument")
1✔
78
        if ans is None:
1✔
NEW
79
            raise OutputError("function was passed non-existent file " + filename)
×
80
        self.logger.debug(_("read_", container=filename, host=ans))
1✔
81
        return ans
1✔
82

83
    def _resolve_source_relative_path(self, filename: str) -> str:
1✔
84
        """
85
        Resolve a WDL 1.2 source-relative File/Directory StdLib/operator value for a task.
86

87
        Directory paths are denoted by a trailing "/".
88
        This is used during input/private evaluation, where source-relative paths are mounted into
89
        the task container and returned as in-container paths. ``container`` is intentionally
90
        mutated when a new source-relative path must be mounted.
91
        """
92
        directory = filename.endswith("/")
1✔
93
        if (
1✔
94
            not self.inputs_only
95
            or not wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
96
            or os.path.isabs(filename)
97
            or downloadable(self.container.cfg, filename, directory=directory)
98
        ):
99
            return filename
1✔
100
        source_path = self._source_relative_host_path(filename, "File/Directory StdLib argument")
1✔
101
        source_path_key = source_path + ("/" if directory else "")
1✔
102
        self.container.add_paths([source_path_key])
1✔
103
        return self.container.input_path_map[source_path_key].rstrip("/")
1✔
104

105
    def _virtualize_filename(self, filename: str) -> str:
1✔
106
        # register new file with container input_path_map
107
        self.container.add_paths([filename])
1✔
108
        self.logger.debug(
1✔
109
            _("write_", host=filename, container=self.container.input_path_map[filename])
110
        )
111
        self.logger.info(_("wrote", file=self.container.input_path_map[filename]))
1✔
112
        return self.container.input_path_map[filename]
1✔
113

114
    def _join_paths_default_directory(self) -> str:
1✔
NEW
115
        return os.path.join(self.container.container_dir, "work")
×
116

117

118
class TaskInputStdLib(TaskStdLib):
1✔
119
    # StdLib for evaluation of task inputs and command
120
    def __init__(
1✔
121
        self,
122
        wdl_version: str,
123
        logger: logging.Logger,
124
        container: "TaskContainer",
125
        source_directory: str = "",
126
        eval_context: Optional[StdLib.EvalContext] = None,
127
    ) -> None:
128
        super().__init__(
1✔
129
            wdl_version,
130
            logger,
131
            container,
132
            True,
133
            source_directory=source_directory,
134
            eval_context=eval_context,
135
        )
136

137

138
class TaskOutputStdLib(TaskStdLib):
1✔
139
    # StdLib for evaluation of task outputs
140
    def __init__(
1✔
141
        self,
142
        wdl_version: str,
143
        logger: logging.Logger,
144
        container: "TaskContainer",
145
        eval_context: Optional[StdLib.EvalContext] = None,
146
    ) -> None:
147
        super().__init__(wdl_version, logger, container, False, eval_context=eval_context)
1✔
148

149
        setattr(
1✔
150
            self,
151
            "stdout",
152
            StdLib.StaticFunction(
153
                "stdout",
154
                [],
155
                Type.File(),
156
                lambda: Value.File(os.path.join(self.container.container_dir, "stdout.txt")),
157
            ),
158
        )
159
        setattr(
1✔
160
            self,
161
            "stderr",
162
            StdLib.StaticFunction(
163
                "stderr",
164
                [],
165
                Type.File(),
166
                lambda: Value.File(os.path.join(self.container.container_dir, "stderr.txt")),
167
            ),
168
        )
169

170
        def _glob(pattern: Value.String, lib: TaskOutputStdLib = self) -> Value.Array:
1✔
171
            pat = pattern.coerce(Type.String()).value
1✔
172
            if not pat:
1✔
NEW
173
                raise OutputError("empty glob() pattern")
×
174
            assert isinstance(pat, str)
1✔
175
            if pat[0] == "/":
1✔
176
                raise OutputError("glob() pattern must be relative to task working directory")
1✔
177
            if pat.startswith("..") or "/.." in pat:
1✔
178
                raise OutputError("glob() pattern must not use .. uplevels")
1✔
179
            if pat.startswith("./"):
1✔
180
                pat = pat[2:]
1✔
181
            # glob the host directory
182
            pat = os.path.join(lib.container.host_work_dir(), pat)
1✔
183
            host_files = sorted(fn for fn in glob.glob(pat) if os.path.isfile(fn))
1✔
184
            # convert the host filenames to in-container filenames
185
            container_files = []
1✔
186
            for hf in host_files:
1✔
187
                dstrip = lib.container.host_work_dir()
1✔
188
                dstrip += "" if dstrip.endswith("/") else "/"
1✔
189
                assert hf.startswith(dstrip)
1✔
190
                container_files.append(
1✔
191
                    os.path.join(lib.container.container_dir, "work", hf[len(dstrip) :])
192
                )
193
            return Value.Array(Type.File(), [Value.File(fn) for fn in container_files])
1✔
194

195
        setattr(
1✔
196
            self,
197
            "glob",
198
            StdLib.StaticFunction("glob", [Type.String()], Type.Array(Type.File()), _glob),
199
        )
200

201

202
class WorkflowStdLib(StdLib.Base):
1✔
203
    "checks against & updates the file/directory allowlist for the read_* and write_* functions"
204

205
    cfg: config.Loader
1✔
206
    state: "StateMachine"
1✔
207
    cache: CallCache
1✔
208

209
    def __init__(
1✔
210
        self,
211
        cfg: config.Loader,
212
        wdl_version: str,
213
        state: "StateMachine",
214
        cache: CallCache,
215
        eval_context: Optional[StdLib.EvalContext] = None,
216
    ) -> None:
217
        super().__init__(
1✔
218
            wdl_version,
219
            write_dir=os.path.join(state.run_dir, "write_"),
220
            eval_context=eval_context,
221
        )
222
        self.cfg = cfg
1✔
223
        self.state = state
1✔
224
        self.cache = cache
1✔
225

226
    def _source_relative_host_path(self, filename: str, desc: str) -> str:
1✔
227
        directory = filename.endswith("/")
1✔
228
        value = Value.Directory(filename) if directory else Value.File(filename)
1✔
229
        ans = _resolve_source_relative_path(
1✔
230
            self.cfg, _source_directory(self.state.workflow), desc, value
231
        )
232
        if ans is None:
1✔
233
            raise Error.InputError(f"File/Directory path not found in {desc}: {filename}")
1✔
234
        return ans
1✔
235

236
    def _devirtualize_filename(self, filename: str) -> str:
1✔
237
        directory = filename.endswith("/")
1✔
238
        if downloadable(self.cfg, filename, directory=directory):
1✔
239
            cached = self.cache.get_download(filename)
1✔
240
            if cached:
1✔
241
                return cached
1✔
242
        if (
1✔
243
            wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
244
            and not os.path.isabs(filename)
245
            and not downloadable(self.cfg, filename, directory=directory)
246
        ):
247
            source_path = self._source_relative_host_path(filename, "read_*() argument")
1✔
248
            self.state.fspath_allowlist.add(source_path + ("/" if directory else ""))
1✔
249
            filename = source_path
1✔
250
        ans = _resolve_workflow_path(
1✔
251
            self.cfg,
252
            self.state.fspath_allowlist,
253
            "read_*() argument",
254
            Value.Directory(filename) if directory else Value.File(filename),
255
        )
256
        assert ans is not None
1✔
257
        return ans
1✔
258

259
    def _resolve_source_relative_path(self, filename: str) -> str:
1✔
260
        """
261
        Resolve a File/Directory StdLib/operator value in a workflow.
262

263
        Directory paths are denoted by a trailing "/".
264
        WDL 1.2 source-relative paths resolve against the workflow source directory, and are
265
        intentionally added to the workflow allowlist as a side effect. Pre-1.2 relative paths don't
266
        get source-directory semantics, but they still pass through the workflow path boundary for
267
        compatibility with legacy ``allow_any_input`` workflows whose File/Directory values have been
268
        resolved to host paths during declaration binding.
269
        """
270
        directory = filename.endswith("/")
1✔
271
        if (
1✔
272
            wdl_version_geq(self.wdl_version, WDLVersion.V1_2)
273
            and not os.path.isabs(filename)
274
            and not downloadable(self.cfg, filename, directory=directory)
275
        ):
276
            source_path = self._source_relative_host_path(
1✔
277
                filename, "File/Directory StdLib argument"
278
            )
279
            self.state.fspath_allowlist.add(source_path + ("/" if directory else ""))
1✔
280
            return source_path
1✔
281
        if not os.path.isabs(filename) and not downloadable(
1✔
282
            self.cfg, filename, directory=directory
283
        ):
284
            ans = _resolve_workflow_path(
1✔
285
                self.cfg,
286
                self.state.fspath_allowlist,
287
                "File/Directory StdLib argument",
288
                Value.Directory(filename) if directory else Value.File(filename),
289
            )
290
            assert ans is not None
1✔
291
            return ans
1✔
292
        return filename
1✔
293

294
    def _join_paths_default_directory(self) -> str:
1✔
295
        source = self.state.workflow.pos.abspath
1✔
296
        if not source or source == "(buffer)":
1✔
297
            raise NotImplementedError("join_paths() relative path resolution requires WDL source")
1✔
298
        return os.path.dirname(source)
1✔
299

300
    def _virtualize_filename(self, filename: str) -> str:
1✔
301
        # After write_* generates a file at the workflow level, query CallCache for an existing
302
        # identical file; if available, then return that copy. This improves cacheability of
303
        # downstream tasks that consume the written file, given the unique temp filename for each
304
        # such file.
305
        # We fully content-digest the file, but it can't be too large since it was originally
306
        # serialized from miniwdl memory.
307
        assert not filename.endswith("/")  # FIXME if/when stdlib functions handle directories
1✔
308
        hasher = hashlib.sha256()
1✔
309
        with open(filename, "rb") as f:
1✔
310
            for chunk in iter(lambda: f.read(1048576), b""):
1✔
311
                hasher.update(chunk)
1✔
312
        cache_in: Env.Bindings[Value.Base] = Env.Bindings()
1✔
313
        cache_in = cache_in.bind("file_sha256", Value.String(hasher.hexdigest()))
1✔
314
        cache_key = "write_/" + Value.digest_env(cache_in)
1✔
315
        cache_out_types: Env.Bindings[Type.Base] = Env.Bindings()
1✔
316
        cache_out_types = cache_out_types.bind("file", Type.File())
1✔
317
        cache_out = self.cache.get(cache_key, cache_in, cache_out_types)
1✔
318
        if cache_out:
1✔
319
            filename = cache_out.resolve("file").value
1✔
320
        else:
321
            # otherwise, put our newly-written file to the cache, and proceed to use it
322
            self.cache.put(
1✔
323
                cache_key,
324
                Env.Bindings(Env.Binding("file", Value.File(filename))),
325
                run_dir=self.state.run_dir,
326
            )
327

328
        # whichever path we took: allow-list the filename
329
        self.state.fspath_allowlist.add(filename)
1✔
330
        return filename
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc