• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 5264241663

pending completion
5264241663

push

github-actions

Ralf Grubenmann
fix(service): do not suggest updating dockerfile for release candidate versions

26163 of 30025 relevant lines covered (87.14%)

5.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.63
/renku/core/workflow/providers/toil.py
1
#
2
# Copyright 2017-2023 - Swiss Data Science Center (SDSC)
3
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
4
# Eidgenössische Technische Hochschule Zürich (ETHZ).
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""toil based provider."""
12✔
18

19
import functools
12✔
20
import itertools
12✔
21
import os
12✔
22
import shutil
12✔
23
import uuid
12✔
24
from abc import abstractmethod
12✔
25
from pathlib import Path
12✔
26
from subprocess import call
12✔
27
from typing import Any, Callable, Dict, List, Union, cast
12✔
28

29
import networkx as nx
12✔
30
from toil.common import Toil
12✔
31
from toil.fileStores import FileID
12✔
32
from toil.fileStores.abstractFileStore import AbstractFileStore
12✔
33
from toil.job import Job, Promise
12✔
34
from toil.leader import FailedJobsException
12✔
35
from toil.lib.docker import apiDockerCall
12✔
36

37
from renku.command.util import progressbar
12✔
38
from renku.core import errors
12✔
39
from renku.core.constant import RENKU_HOME, RENKU_TMP
12✔
40
from renku.core.errors import WorkflowExecuteError
12✔
41
from renku.core.plugin import hookimpl
12✔
42
from renku.core.plugin.provider import RENKU_ENV_PREFIX
12✔
43
from renku.domain_model.workflow.parameter import CommandParameterBase
12✔
44
from renku.domain_model.workflow.plan import Plan
12✔
45
from renku.domain_model.workflow.provider import IWorkflowProvider
12✔
46

47

48
class AbstractToilJob(Job):
12✔
49
    """Toil job implementation for a renku ``Plan``."""
50

51
    def __init__(self, workflow: Plan, *args, **kwargs):
12✔
52
        super().__init__(unitName=workflow.name, displayName=workflow.name, *args, **kwargs)
×
53
        self.workflow: Plan = workflow
×
54
        self._input_files: Dict[str, FileID] = {}
×
55
        self._parents_promise: List[Promise] = []
×
56
        self._environment = os.environ.copy()
×
57

58
    @abstractmethod
12✔
59
    def _execute(self, command_line: List[str], mapped_std: Dict[str, str]) -> int:
12✔
60
        """Executes a given command line."""
61
        raise NotImplementedError
×
62

63
    def set_input_files(self, input_files: Dict[str, Any]):
12✔
64
        """Set the location of inputs that are available in the original project."""
65
        self._input_files = input_files
×
66

67
    def add_input_promise(self, promise: Dict[str, FileID]):
12✔
68
        """Adds a given job's output promise.
69

70
        The promise of a job contains its output locations in the global storage.
71
        """
72
        self._parents_promise.append(promise)
×
73

74
    def run(self, storage):
12✔
75
        """Executing of a renku ``Plan``."""
76
        mapped_std = dict()
×
77
        parent_inputs = dict()
×
78
        for p in self._parents_promise:
×
79
            parent_inputs.update(p)
×
80

81
        def _read_input(input: str, file_metadata):
×
82
            input_path = Path(input)
×
83

84
            if isinstance(file_metadata, dict):
×
85
                input_path.mkdir(parents=True, exist_ok=True)
×
86
                for path, file_id in file_metadata.items():
×
87
                    _read_input(path, file_id)
×
88
            elif not input_path.exists():
×
89
                if len(input_path.parts) > 1:
×
90
                    input_path.parent.mkdir(parents=True, exist_ok=True)
×
91
                storage.readGlobalFile(file_metadata, userPath=input)
×
92

93
        for i in self.workflow.inputs:
×
94
            file_metadata = (
×
95
                parent_inputs[i.actual_value] if i.actual_value in parent_inputs else self._input_files[i.actual_value]
96
            )
97
            _read_input(i.actual_value, file_metadata)
×
98

99
            self._environment[f"{RENKU_ENV_PREFIX}{i.name}"] = str(i.actual_value)
×
100

101
            if i.mapped_to:
×
102
                mapped_std[i.mapped_to.stream_type] = i.actual_value
×
103

104
        for o in self.workflow.outputs:
×
105
            self._environment[f"{RENKU_ENV_PREFIX}{o.name}"] = str(o.actual_value)
×
106
            output_path = Path(o.actual_value)
×
107
            if len(output_path.parts) > 1:
×
108
                output_path.parent.mkdir(parents=True, exist_ok=True)
×
109

110
            if o.mapped_to:
×
111
                mapped_std[o.mapped_to.stream_type] = o.actual_value
×
112

113
        for p in self.workflow.parameters:
×
114
            self._environment[f"{RENKU_ENV_PREFIX}{p.name}"] = str(p.actual_value)
×
115

116
        # construct cmd
117
        cmd = []
×
118

119
        if self.workflow.command:
×
120
            cmd.extend(self.workflow.command.split(" "))
×
121

122
        arguments = itertools.chain(self.workflow.inputs, self.workflow.outputs, self.workflow.parameters)
×
123

124
        arguments_filtered = filter(lambda x: x.position is not None and not getattr(x, "mapped_to", None), arguments)
×
125
        arguments_sorted = sorted(arguments_filtered, key=lambda x: cast(int, x.position))
×
126

127
        for a in arguments_sorted:
×
128
            v = str(a.actual_value) if not isinstance(a.actual_value, str) else a.actual_value
×
129
            if a.prefix:
×
130
                if a.prefix.endswith(" "):
×
131
                    cmd.append(a.prefix[:-1])
×
132
                else:
133
                    v = f"{a.prefix}{v}"
×
134
            cmd.append(v)
×
135

136
        return_code = self._execute(cmd, mapped_std)
×
137
        if return_code not in (self.workflow.success_codes or {0}):
×
138
            raise errors.InvalidSuccessCode(return_code, success_codes=self.workflow.success_codes)
×
139

140
        return _upload_files(
×
141
            storage.writeGlobalFile, cast(List[CommandParameterBase], self.workflow.outputs), Path.cwd()
142
        )
143

144

145
class SubprocessToilJob(AbstractToilJob):
12✔
146
    """A toil job that uses subprocess.call to execute a renku ``Plan``."""
147

148
    def __init__(self, workflow: Plan, *args, **kwargs):
12✔
149
        super().__init__(workflow, *args, **kwargs)
×
150

151
    def _execute(self, command_line: List[str], mapped_std: Dict[str, str]) -> int:
12✔
152
        """Executes a given command line."""
153
        return call(
×
154
            command_line,
155
            cwd=os.getcwd(),
156
            **{  # type: ignore
157
                key: open(value, mode="r" if key == "stdin" else "w")
158
                for key, value in mapped_std.items()  # type: ignore
159
            },
160
            env=self._environment,
161
        )
162

163

164
class DockerToilJob(AbstractToilJob):
12✔
165
    """A toil job that uses apiDockerCall to execute a renku ``Plan``."""
166

167
    def __init__(self, workflow: Plan, docker_config: Dict[str, Any], *args, **kwargs):
12✔
168
        super().__init__(workflow, *args, **kwargs)
×
169
        self._docker_config: Dict[str, Any] = docker_config
×
170

171
    def _execute(self, command_line: List[str], mapped_std: Dict[str, str]) -> int:
12✔
172
        """Executes a given command line."""
173
        # NOTE: Disable detached mode to block for ``apiDockerCall`` to finish
174
        self._docker_config.pop("detach", None)
×
175

176
        parameters: Union[List[str], List[List[str]]] = command_line
×
177

178
        stderr = mapped_std.get("stderr")
×
179
        stdin = mapped_std.get("stdin")
×
180
        if stderr or stdin:
×
181
            raise errors.OperationError("Cannot run workflows that have stdin or stderr redirection with Docker")
×
182
        stdout = mapped_std.get("stdout")
×
183
        if stdout:
×
184
            parameters = [command_line, ["tee", stdout]]
×
185

186
        volumes = self._docker_config.pop("volumes", {})
×
187

188
        if "working_dir" not in self._docker_config:
×
189
            working_dir = "/renku"
×
190
            volumes[os.getcwd()] = {"bind": working_dir, "mode": "rw"}
×
191
        else:
192
            working_dir = self._docker_config.pop("working_dir")
×
193

194
        # NOTE: We cannot get the exit code back from the docker container. The Docker API checks for the exit code and
195
        # raises an exception if it's not 0, so, Plan.success_codes is ignored when running with Docker.
196
        apiDockerCall(
×
197
            self,
198
            parameters=parameters,
199
            detach=False,
200
            environment=self._environment,
201
            volumes=volumes,
202
            working_dir=working_dir,
203
            **self._docker_config,
204
        )
205

206
        return 0
×
207

208

209
def _store_location(
12✔
210
    import_function: Callable[[str], FileID], basedir: Path, location: Path
211
) -> Union[FileID, Dict[str, Any]]:
212
    if location.is_dir():
×
213
        directory_content = dict()
×
214
        for f in location.rglob("*"):
×
215
            directory_content[str(f.relative_to(basedir))] = _store_location(import_function, basedir, f)
×
216
        return directory_content
×
217
    else:
218
        return import_function(str(location))
×
219

220

221
def _upload_files(
12✔
222
    import_function: Callable[[str], FileID], params: List[CommandParameterBase], basedir: Path
223
) -> Dict[str, FileID]:
224
    file_locations = dict()
×
225
    for p in params:
×
226
        location = basedir / p.actual_value
×
227
        if not location.exists():
×
228
            continue
×
229

230
        file_locations[p.actual_value] = _store_location(import_function, basedir, location)
×
231

232
    return file_locations
×
233

234

235
def import_file_wrapper(storage: AbstractFileStore, file_path: str) -> FileID:
12✔
236
    """Wrap importFile accept file:// URIs."""
237
    file_uri = file_path if ":/" in file_path else f"file://{file_path}"
×
238
    return storage.importFile(file_uri)
×
239

240

241
def process_children(
12✔
242
    parent: AbstractToilJob,
243
    dag: nx.DiGraph,
244
    jobs: Dict[int, AbstractToilJob],
245
    basedir: Path,
246
    storage: AbstractFileStore,
247
):
248
    """Recursively process children of a workflow."""
249
    outputs = list()
×
250
    import_function = functools.partial(import_file_wrapper, storage)
×
251
    for child in nx.neighbors(dag, parent.workflow):
×
252
        child_job = jobs[id(child)]
×
253
        file_metadata = _upload_files(import_function, child.inputs, basedir)
×
254
        child_job.set_input_files(file_metadata)
×
255
        child_job.add_input_promise(parent.rv())
×
256
        outputs.append(parent.addFollowOn(child_job).rv())
×
257
        outputs += process_children(child_job, dag, jobs, basedir, storage)
×
258
    return outputs
×
259

260

261
def initialize_jobs(job, basedir, dag, docker_config):
12✔
262
    """Creates the Toil execution plan for the given workflow DAG."""
263
    job.fileStore.logToMaster("executing renku DAG")
×
264
    outputs = list()
×
265
    if docker_config:
×
266
        job.fileStore.logToMaster("executing with Docker")
×
267
        jobs = {id(n): DockerToilJob(n, docker_config) for n in dag.nodes}
×
268
    else:
269
        jobs = {id(n): SubprocessToilJob(n) for n in dag.nodes}
×
270
    import_function = functools.partial(import_file_wrapper, job.fileStore)
×
271
    children = next(nx.topological_generations(dag))
×
272
    for workflow in children:
×
273
        child_job = jobs[id(workflow)]
×
274
        file_metadata = _upload_files(import_function, workflow.inputs, basedir)
×
275
        child_job.set_input_files(file_metadata)
×
276
        outputs.append(job.addChild(child_job).rv())
×
277
        outputs += process_children(child_job, dag, jobs, basedir, job.fileStore)
×
278

279
    return outputs
×
280

281

282
class ToilProvider(IWorkflowProvider):
12✔
283
    """A workflow executor provider using toil."""
284

285
    @hookimpl
12✔
286
    def workflow_provider(self):
12✔
287
        """Workflow provider name."""
288
        return self, "toil"
9✔
289

290
    @hookimpl
12✔
291
    def workflow_execute(self, dag: nx.DiGraph, basedir: Path, config: Dict[str, Any]):
12✔
292
        """Executes a given workflow DAG using Toil."""
293
        tmpdir = (basedir / RENKU_HOME / RENKU_TMP).resolve()
2✔
294
        if not tmpdir.exists():
2✔
295
            tmpdir.mkdir()
2✔
296
        options = Job.Runner.getDefaultOptions(str(tmpdir / uuid.uuid4().hex))
2✔
297
        options.logLevel = "ERROR"
2✔
298
        options.clean = "always"
2✔
299

300
        config = config or {}
2✔
301
        docker_config = config.pop("docker", {})
2✔
302
        if docker_config and "image" not in docker_config:
2✔
303
            raise errors.ConfigurationError("Docker configuration must provide an 'image' property")
×
304

305
        if config:
2✔
306
            for k, v in config.items():
2✔
307
                setattr(options, k, v)
2✔
308

309
        outputs = list()
2✔
310
        try:
2✔
311
            with Toil(options) as toil:
2✔
312
                root_job = Job.wrapJobFn(initialize_jobs, basedir, dag, docker_config)
2✔
313
                job_outputs = toil.start(root_job)
2✔
314

315
                num_outputs = sum(map(lambda x: len(x.values()), job_outputs))
2✔
316
                with progressbar(length=num_outputs, label="Moving outputs") as bar:
2✔
317
                    for collection in job_outputs:
2✔
318
                        for name, fid in collection.items():
2✔
319
                            if isinstance(fid, dict):
2✔
320
                                directory = basedir / name
2✔
321
                                if directory.exists():
2✔
322
                                    shutil.rmtree(str(directory))
2✔
323
                                directory.mkdir()
2✔
324

325
                                for path, file_id in fid.items():
2✔
326
                                    destination = (basedir / path).absolute()
2✔
327
                                    destination.parent.mkdir(parents=True, exist_ok=True)
2✔
328
                                    toil.exportFile(file_id, str(destination))
2✔
329
                                    outputs.append(path)
2✔
330
                            else:
331
                                destination = (basedir / name).absolute()
2✔
332
                                destination.parent.mkdir(parents=True, exist_ok=True)
2✔
333
                                toil.exportFile(fid, str(destination))
2✔
334
                                outputs.append(name)
2✔
335
                            bar.update(1)
2✔
336
        except FailedJobsException as e:
2✔
337
            raise WorkflowExecuteError(e.msg)
2✔
338

339
        return outputs
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc