• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 4145649460

pending completion
4145649460

push

github-actions

GitHub
Merge branch 'develop' into allow-ref-target-for-release-action

25096 of 28903 relevant lines covered (86.83%)

4.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.76
/renku/core/workflow/execute.py
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2017-2022 - Swiss Data Science Center (SDSC)
4
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
5
# Eidgenössische Technische Hochschule Zürich (ETHZ).
6
#
7
# Licensed under the Apache License, Version 2.0 (the "License");
8
# you may not use this file except in compliance with the License.
9
# You may obtain a copy of the License at
10
#
11
#     http://www.apache.org/licenses/LICENSE-2.0
12
#
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS,
15
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
# See the License for the specific language governing permissions and
17
# limitations under the License.
18
"""Plan execution."""
10✔
19

20
import itertools
10✔
21
import re
10✔
22
from functools import reduce
10✔
23
from pathlib import Path
10✔
24
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
10✔
25

26
from pydantic import validate_arguments
10✔
27

28
from renku.command.command_builder import inject
10✔
29
from renku.core import errors
10✔
30
from renku.core.interface.activity_gateway import IActivityGateway
10✔
31
from renku.core.interface.plan_gateway import IPlanGateway
10✔
32
from renku.core.plugin.provider import execute
10✔
33
from renku.core.storage import check_external_storage, pull_paths_from_storage
10✔
34
from renku.core.util import communication
10✔
35
from renku.core.util.datetime8601 import local_now
10✔
36
from renku.core.util.os import is_subpath, safe_read_yaml
10✔
37
from renku.core.workflow.model.concrete_execution_graph import ExecutionGraph
10✔
38
from renku.core.workflow.plan import is_plan_removed
10✔
39
from renku.core.workflow.plan_factory import delete_indirect_files_list
10✔
40
from renku.core.workflow.value_resolution import ValueResolver
10✔
41
from renku.domain_model.project_context import project_context
10✔
42
from renku.domain_model.provenance.activity import Activity, ActivityCollection, WorkflowFileActivityCollection
10✔
43
from renku.domain_model.workflow.plan import AbstractPlan
10✔
44
from renku.domain_model.workflow.workflow_file import WorkflowFileCompositePlan
10✔
45

46
if TYPE_CHECKING:
10✔
47
    from networkx import DiGraph
×
48

49

50
@inject.params(activity_gateway=IActivityGateway, plan_gateway=IPlanGateway)
10✔
51
def execute_workflow_graph(
10✔
52
    dag: "DiGraph",
53
    activity_gateway: IActivityGateway,
54
    plan_gateway: IPlanGateway,
55
    provider="toil",
56
    config=None,
57
    workflow_file_plan: Optional[WorkflowFileCompositePlan] = None,
58
):
59
    """Execute a Run with/without subprocesses.
60

61
    Args:
62
        dag(DiGraph): The workflow graph to execute.
63
        activity_gateway(IActivityGateway): The injected activity gateway.
64
        plan_gateway(IPlanGateway): The injected plan gateway.
65
        provider: Provider to run the workflow with (Default value = "toil").
66
        config: Path to config for the workflow provider (Default value = None).
67
        workflow_file_plan (Optional[WorkflowFileCompositePlan): If passed, a workflow file is executed, so, store
68
            related metadata.
69
    """
70
    inputs = {i.actual_value for p in dag.nodes for i in p.inputs}
4✔
71
    # NOTE: Pull inputs from Git LFS or other storage backends
72
    if check_external_storage():
4✔
73
        pull_paths_from_storage(project_context.repository, *inputs)
4✔
74

75
    # check whether the none generated inputs of workflows are available
76
    outputs = {o.actual_value for p in dag.nodes for o in p.outputs}
4✔
77
    inputs = {i for i in inputs if i not in outputs and not any(is_subpath(path=i, base=o) for o in outputs)}
4✔
78
    for i in inputs:
4✔
79
        if not Path(i).exists():
4✔
80
            raise errors.ParameterError(f"Invalid input value: Input '{i}' does not exist!", show_prefix=False)
×
81

82
    delete_indirect_files_list(project_context.path)
4✔
83

84
    if config:
4✔
85
        config = safe_read_yaml(config)
2✔
86

87
    started_at_time = local_now()
4✔
88

89
    execute(dag=dag, basedir=project_context.path, provider=provider, config=config)
4✔
90

91
    ended_at_time = local_now()
4✔
92

93
    activities = []
4✔
94

95
    for plan in dag.nodes:
4✔
96
        # NOTE: Update plans are copies of Plan objects. We need to use the original Plan objects to avoid duplicates.
97
        original_plan = plan_gateway.get_by_id(plan.id)
4✔
98

99
        # NOTE: Workflow files don't have an original plan
100
        if not original_plan:
4✔
101
            original_plan = plan
4✔
102

103
        activity = Activity.from_plan(
4✔
104
            plan=plan,
105
            repository=project_context.repository,
106
            started_at_time=started_at_time,
107
            ended_at_time=ended_at_time,
108
        )
109
        activity.association.plan = original_plan
4✔
110
        activity_gateway.add(activity)
4✔
111
        activities.append(activity)
4✔
112

113
    if workflow_file_plan:
4✔
114
        activity_collection = WorkflowFileActivityCollection.from_activities(
4✔
115
            activities=activities, plan=workflow_file_plan
116
        )
117
        activity_gateway.add_activity_collection(activity_collection)
4✔
118
    elif len(activities) > 1:
2✔
119
        activity_collection = ActivityCollection(activities=activities)
2✔
120
        activity_gateway.add_activity_collection(activity_collection)
2✔
121

122

123
def check_for_cycles(graph: ExecutionGraph):
10✔
124
    """Check for cycles in the graph and raises an error if there are any."""
125
    if not graph.cycles:
4✔
126
        return
4✔
127

128
    cycles_str = []
2✔
129
    for cycle in graph.cycles:
2✔
130
        nodes = []
2✔
131
        for node in cycle:
2✔
132
            if isinstance(node, AbstractPlan):
2✔
133
                nodes.append(f"[{node.name}]")
2✔
134
            else:
135
                cls = node.__class__.__name__.replace("Command", "")
2✔
136
                nodes.append(f"{cls}: {node.actual_value}")
2✔
137
        cycles_str.append(" -> ".join(nodes))
2✔
138

139
    message = "Circular workflows are not supported in Renku. Please remove these cycles:\n\t"
2✔
140
    message += "\n\t".join(cycles_str)
2✔
141
    raise errors.GraphCycleError(message=message, cycles=[])
2✔
142

143

144
@inject.autoparams()
10✔
145
@validate_arguments(config=dict(arbitrary_types_allowed=True))
10✔
146
def execute_workflow(
10✔
147
    name_or_id: str,
148
    set_params: List[str],
149
    provider: str,
150
    config: Optional[str],
151
    values: Optional[str],
152
    plan_gateway: IPlanGateway,
153
):
154
    """Execute a plan with specified values.
155

156
    Args:
157
        name_or_id(str): Name or id of the Plan to iterate.
158
        set_params(List[str]): List of values specified for workflow parameters.
159
        provider(str): Name of the workflow provider backend to use for execution.
160
        config(Optional[str]): Path to config for the workflow provider.
161
        values(Optional[str]): Path to YAMl file containing values specified for workflow parameters.
162
        plan_gateway(IPlanGateway): The plan gateway.
163
    """
164
    workflow = plan_gateway.get_by_name_or_id(name_or_id)
2✔
165

166
    if is_plan_removed(workflow):
2✔
167
        raise errors.ParameterError(f"The specified workflow '{name_or_id}' cannot be found.")
2✔
168

169
    # apply the provided parameter settings provided by user
170
    override_params = dict()
2✔
171
    if values:
2✔
172
        override_params.update(safe_read_yaml(values))
2✔
173

174
    if set_params:
2✔
175
        from deepmerge import always_merger
2✔
176

177
        for param in set_params:
2✔
178
            name, value = param.split("=", maxsplit=1)
2✔
179
            keys = name.split(".")
2✔
180

181
            set_param = reduce(lambda x, y: {y: x}, reversed(keys), value)  # type: ignore
2✔
182
            override_params = always_merger.merge(override_params, set_param)
2✔
183

184
    rv = ValueResolver.get(workflow, override_params)
2✔
185

186
    workflow = rv.apply()
2✔
187

188
    if rv.missing_parameters:
2✔
189
        communication.warn(
×
190
            f'Could not resolve the following parameters in "{workflow.name}" workflow: '
191
            f'{",".join(rv.missing_parameters)}'
192
        )
193

194
    graph = ExecutionGraph([workflow], virtual_links=True)
2✔
195
    execute_workflow_graph(dag=graph.workflow_graph, provider=provider, config=config)
2✔
196

197

198
def _extract_iterate_parameters(values: Dict[str, Any], index_pattern: re.Pattern, tag_separator: str = "@"):
10✔
199
    """Recursively extracts the iteration parameters from the workflow values given by the user.
200

201
    Args:
202
        values(Dict[str, Any]): Plan values to iterate over.
203
        index_pattern(re.Pattern): Pattern for parameter indexes.
204
        tag_separator(str, optional): Separator for tagged values (Default value = "@").
205

206
    Returns:
207
        Tuple of ``(iter_params, params)`` where ``params`` are regular parameters
208
        and ``iter_params`` are parameters with iteration values.
209
    """
210
    iter_params: Dict[str, Any] = {"indexed": {}, "params": {}, "tagged": {}}
×
211
    params: Dict[str, Any] = {}
×
212
    for param_name, param_value in values.items():
×
213
        if isinstance(param_value, str) and index_pattern.search(param_value):
×
214
            iter_params["indexed"][param_name] = param_value
×
215
            params[param_name] = param_value
×
216
        elif isinstance(param_value, list):
×
217
            if len(param_value) == 1:
×
218
                communication.warn(
×
219
                    f"The parameter '{param_name}' has only one element '{param_value}', "
220
                    "changing it to be a fixed parameter!"
221
                )
222
                params[param_name] = param_value[0]
×
223
                continue
×
224

225
            if tag_separator in param_name:
×
226
                name, tag = param_name.split(tag_separator, maxsplit=1)
×
227
                if tag in iter_params["tagged"]:
×
228
                    iter_params["tagged"][tag][name] = param_value
×
229
                else:
230
                    iter_params["tagged"][tag] = {name: param_value}
×
231

232
                params[name] = param_value
×
233
            else:
234
                iter_params["params"][param_name] = param_value
×
235
                params[param_name] = param_value
×
236
        elif isinstance(param_value, dict):
×
237
            inner_iter_params, inner_params = _extract_iterate_parameters(param_value, index_pattern, tag_separator)
×
238
            iter_params["params"].update([(f"{param_name}.{ik}", iv) for ik, iv in inner_iter_params["params"].items()])
×
239
            iter_params["indexed"].update(
×
240
                [(f"{param_name}.{ik}", iv) for ik, iv in inner_iter_params["indexed"].items()]
241
            )
242
            for tag, param in inner_iter_params["tagged"].items():
×
243
                if tag in iter_params["tagged"]:
×
244
                    iter_params["tagged"][tag].update([(f"{param_name}.{ik}", iv) for ik, iv in param.items()])
×
245
                else:
246
                    iter_params["tagged"][tag] = dict([(f"{param_name}.{ik}", iv) for ik, iv in param.items()])
×
247
            params[param_name] = inner_params
×
248
        else:
249
            params[param_name] = param_value
×
250
    return iter_params, params
×
251

252

253
def _validate_iterate_parameters(
10✔
254
    workflow: AbstractPlan, workflow_params: Dict[str, Any], iter_params: Dict[str, Any]
255
) -> Optional[Dict[str, Any]]:
256
    """Validates the user provided iteration parameters.
257

258
    Args:
259
        workflow(AbstractPlan): The Plan to validate parameters against.
260
        workflow_params(Dict[str, Any]): The plain parameters to check.
261
        iter_params(Dict[str, Any]): The iterative parameters to check.
262

263
    Returns:
264
        Dictionary of validated iteration parameters.
265
    """
266
    import copy
×
267

268
    rv = ValueResolver.get(copy.deepcopy(workflow), workflow_params)
×
269
    rv.apply()
×
270

271
    mp_paths = [mp.split(".") for mp in rv.missing_parameters]
×
272
    for collection in [iter_params["indexed"], iter_params["params"], *iter_params["tagged"].values()]:
×
273
        remove_keys = []
×
274
        for p in collection.keys():
×
275
            parameter_path = p.split(".")
×
276
            if any(parameter_path[: len(mp)] == mp for mp in mp_paths):
×
277
                remove_keys.append(p)
×
278

279
        for rk in remove_keys:
×
280
            collection.pop(rk)
×
281

282
    # validate tagged
283
    empty_tags = []
×
284
    for k, tagged_params in iter_params["tagged"].items():
×
285
        if len(tagged_params) == 0:
×
286
            empty_tags.append(k)
×
287
        else:
288
            tagged_params_values = list(tagged_params.values())
×
289
            tag_size = len(tagged_params_values[0])
×
290
            for p in tagged_params_values[1:]:
×
291
                num_params = len(p)
×
292
                if tag_size != num_params:
×
293
                    communication.error(
×
294
                        f"'{k}' tagged parameters '{tagged_params}' has different number of possible values!"
295
                    )
296
                    return None
×
297

298
    for et in empty_tags:
×
299
        iter_params["tagged"].pop(et)
×
300

301
    if (len(iter_params["indexed"]) == 0) and (len(iter_params["params"]) == 0) and (len(iter_params["tagged"]) == 0):
×
302
        raise errors.UsageError(
×
303
            "Please check the provided mappings as none of the "
304
            f"parameters are present in the '{workflow.name}' workflow"
305
        )
306

307
    if rv.missing_parameters:
×
308
        communication.confirm(
×
309
            f'Could not resolve the following parameters in "{workflow.name}" workflow: '
310
            f'{", ".join(rv.missing_parameters)}. Resume the execution?',
311
            abort=True,
312
        )
313

314
    return iter_params
×
315

316

317
def _build_iterations(
10✔
318
    workflow: AbstractPlan, workflow_params: Dict[str, Any], iter_params: Dict[str, Any], index_pattern: re.Pattern
319
) -> Tuple[List[AbstractPlan], List[Dict]]:
320
    """Instantiate the workflows for each iteration.
321

322
    Args:
323
        workflow(AbstractPlan): The base workflow to use as a template.
324
        workflow_params(Dict[str, Any]): The plain parameters to use.
325
        iter_params(Dict[str, Any]): The iterative parameters to use.
326
        index_pattern(re.Pattern): The pattern for the index placeholder.
327

328
    Returns:
329
        Tuple of ``(plans, itervalues)`` with ``plans`` being a list of all
330
        plans for each iteration and ``itervalues`` being a list of all values
331
        for each iteration.
332
    """
333
    import copy
×
334

335
    from deepmerge import always_merger
×
336

337
    plans = []
×
338
    execute_plan = []
×
339

340
    columns = list(iter_params["params"].keys())
×
341
    tagged_values = []
×
342
    for tag in iter_params["tagged"].values():
×
343
        columns.extend(tag.keys())
×
344
        tagged_values.append(zip(*tag.values()))
×
345

346
    def _flatten(values):
×
347
        for i in values:
×
348
            if isinstance(i, (list, tuple)):
×
349
                for k in i:
×
350
                    yield k
×
351
            else:
352
                yield i
×
353

354
    for i, values in enumerate(itertools.product(*iter_params["params"].values(), *tagged_values)):
×
355
        plan_params = copy.deepcopy(workflow_params)
×
356
        iteration_values = {}
×
357
        for k, v in iter_params["indexed"].items():
×
358
            value = index_pattern.sub(str(i), v)
×
359
            set_param = reduce(lambda x, y: {y: x}, reversed(k.split(".")), value)  # type: ignore
×
360
            plan_params = always_merger.merge(plan_params, set_param)
×
361
            iteration_values[k] = value
×
362

363
        for param_key, param_value in zip(columns, _flatten(values)):
×
364
            set_param = reduce(lambda x, y: {y: x}, reversed(param_key.split(".")), param_value)  # type: ignore
×
365
            plan_params = always_merger.merge(plan_params, set_param)
×
366
            iteration_values[param_key] = param_value
×
367

368
        execute_plan.append(iteration_values)
×
369
        rv = ValueResolver.get(copy.deepcopy(workflow), plan_params)
×
370
        plans.append(rv.apply())
×
371

372
    return plans, execute_plan
×
373

374

375
@inject.autoparams()
10✔
376
@validate_arguments(config=dict(arbitrary_types_allowed=True))
10✔
377
def iterate_workflow(
10✔
378
    name_or_id: str,
379
    mapping_path: Optional[str],
380
    mappings: List[str],
381
    dry_run: bool,
382
    provider: str,
383
    config: Optional[str],
384
    plan_gateway: IPlanGateway,
385
):
386
    """Iterate a workflow repeatedly with differing values.
387

388
    Args:
389
        name_or_id(str): Name or id of the Plan to iterate.
390
        mapping_path(str): Path to file defining workflow mappings.
391
        mappings(List[str]): List of workflow mappings.
392
        dry_run(bool): Whether to preview execution or actually run it.
393
        provider(str): Name of the workflow provider backend to use for execution.
394
        config(Optional[str]): Path to config for the workflow provider.
395
        plan_gateway(IPlanGateway): The plan gateway.
396
    """
397
    import ast
×
398

399
    from deepmerge import always_merger
×
400

401
    from renku.core.util.tabulate import tabulate
×
402

403
    if mapping_path is None and len(mappings) == 0:
×
404
        raise errors.UsageError("No mapping has been given for the iteration!")
×
405

406
    workflow = plan_gateway.get_by_name_or_id(name_or_id)
×
407

408
    if is_plan_removed(workflow):
×
409
        raise errors.ParameterError(f"The specified workflow '{name_or_id}' cannot be found.")
×
410

411
    tag_separator = "@"
×
412
    index_pattern = re.compile(r"{iter_index}")
×
413

414
    iter_params: Dict[str, Any] = {"indexed": {}, "params": {}, "tagged": {}}
×
415
    workflow_params = {}
×
416
    if mapping_path:
×
417
        mapping = safe_read_yaml(mapping_path)
×
418
        iter_params, workflow_params = _extract_iterate_parameters(mapping, index_pattern, tag_separator=tag_separator)
×
419

420
    for m in mappings:
×
421
        param_name, param_value = m.split("=", maxsplit=1)
×
422
        if index_pattern.search(param_value):
×
423
            iter_params["indexed"][param_name] = param_value  # type: ignore
×
424
        else:
425
            try:
×
426
                evaluated_param_value = ast.literal_eval(param_value)
×
427
            except Exception:
×
428
                raise errors.ParameterError(
×
429
                    f"The value of '{param_name}' parameter is neither a list nor templated variable!"
430
                )
431

432
            if isinstance(evaluated_param_value, list) and len(evaluated_param_value) == 1:
×
433
                communication.warn(
×
434
                    f"The parameter '{param_name}' has only one element '{param_value}', "
435
                    "changing it to be a fixed parameter!"
436
                )
437
                workflow_params[param_name] = param_value[0]
×
438
                continue
×
439
            elif not isinstance(evaluated_param_value, list):
×
440
                workflow_params[param_name] = param_value
×
441
                continue
×
442

443
            if tag_separator in param_name:
×
444
                name, tag = param_name.split(tag_separator, maxsplit=1)
×
445
                if tag in iter_params["tagged"]:
×
446
                    iter_params["tagged"][tag][name] = evaluated_param_value
×
447
                else:
448
                    iter_params["tagged"][tag] = {name: evaluated_param_value}
×
449

450
                param_name = name
×
451
            else:
452
                iter_params["params"][param_name] = evaluated_param_value
×
453

454
        set_param = reduce(lambda x, y: {y: x}, reversed(param_name.split(".")), param_value)  # type: ignore
×
455
        workflow_params = always_merger.merge(workflow_params, set_param)
×
456

457
    validated_iter_params = _validate_iterate_parameters(workflow, workflow_params, cast(Dict[str, Any], iter_params))
×
458
    if validated_iter_params is None:
×
459
        return
×
460

461
    plans, execute_plan = _build_iterations(workflow, workflow_params, validated_iter_params, index_pattern)
×
462

463
    communication.echo(f"\n\n{tabulate(execute_plan, execute_plan[0].keys())}")
×
464
    if not dry_run:
×
465
        graph = ExecutionGraph(workflows=plans, virtual_links=True)
×
466
        execute_workflow_graph(dag=graph.workflow_graph, provider=provider, config=config)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc