• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 5065889892

pending completion
5065889892

push

github-actions

GitHub
feat(core): shell completion for sessions (#3450)

74 of 142 new or added lines in 9 files covered. (52.11%)

5761 existing lines in 209 files now uncovered.

17023 of 26599 relevant lines covered (64.0%)

2.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.1
/renku/command/command_builder/command.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Command builder."""
5✔
17

18
import contextlib
5✔
19
import functools
5✔
20
import threading
5✔
21
from collections import defaultdict
5✔
22
from pathlib import Path
5✔
23
from typing import Any, Callable, Dict, List, Optional, Type, Union
5✔
24

25
import inject
5✔
26

27
from renku.core import errors
5✔
28
from renku.core.util.communication import CommunicationCallback
5✔
29
from renku.core.util.git import get_git_path
5✔
30
from renku.domain_model.project_context import project_context
5✔
31

32
_LOCAL = threading.local()
5✔
33

34

35
def check_finalized(f):
5✔
36
    """Decorator to prevent modification of finalized builders.
37

38
    Args:
39
        f: Decorated function.
40

41
    Returns:
42
        Wrapped function.
43
    """
44

45
    @functools.wraps(f)
5✔
46
    def wrapper(*args, **kwargs):
5✔
47
        """Decorator to prevent modification of finalized builders."""
48
        if not args or not isinstance(args[0], Command):
5✔
49
            raise errors.ParameterError("Command hooks need to be `Command` object methods.")
×
50

51
        if args[0].finalized:
5✔
52
            raise errors.CommandFinalizedError("Cannot modify a finalized `Command`.")
×
53

54
        return f(*args, **kwargs)
5✔
55

56
    return wrapper
5✔
57

58

59
def _patched_get_injector_or_die() -> inject.Injector:
5✔
60
    """Patched version of get_injector_or_die with thread local injectors.
61

62
    Allows deferred definition of an injector per thread.
63
    """
64
    injector = getattr(_LOCAL, "injector", None)
5✔
65
    if not injector:
5✔
66
        raise inject.InjectorException("No injector is configured")
4✔
67

68
    return injector
5✔
69

70

71
def _patched_configure(config: Optional[inject.BinderCallable] = None, bind_in_runtime: bool = True) -> inject.Injector:
5✔
72
    """Create an injector with a callable config or raise an exception when already configured.
73

74
    Args:
75
        config(Optional[inject.BinderCallable], optional): Injection binding config (Default value = None).
76
        bind_in_runtime(bool, optional): Whether to allow binding at runtime (Default value = True).
77

78
    Returns:
79
        Injector: Thread-safe injector with bindings applied.
80
    """
81

82
    if getattr(_LOCAL, "injector", None):
5✔
83
        raise inject.InjectorException("Injector is already configured")
×
84

85
    _LOCAL.injector = inject.Injector(config, bind_in_runtime=bind_in_runtime)
5✔
86

87
    return _LOCAL.injector
5✔
88

89

90
inject.configure = _patched_configure
5✔
91
inject.get_injector_or_die = _patched_get_injector_or_die
5✔
92

93

94
def remove_injector():
5✔
95
    """Remove a thread-local injector."""
96
    if getattr(_LOCAL, "injector", None):
5✔
97
        del _LOCAL.injector
5✔
98

99

100
@contextlib.contextmanager
5✔
101
def replace_injection(bindings: Dict, constructor_bindings=None):
5✔
102
    """Temporarily inject various test objects.
103

104
    Args:
105
        bindings: New normal injection bindings to apply.
106
        constructor_bindings: New constructor bindings to apply (Default value = None).
107
    """
108
    constructor_bindings = constructor_bindings or {}
4✔
109

110
    def bind(binder):
4✔
111
        for key, value in bindings.items():
4✔
112
            binder.bind(key, value)
×
113
        for key, value in constructor_bindings.items():
4✔
114
            binder.bind_to_constructor(key, value)
4✔
115

116
    old_injector = getattr(_LOCAL, "injector", None)
4✔
117
    try:
4✔
118
        if old_injector:
4✔
119
            remove_injector()
×
120
        inject.configure(bind, bind_in_runtime=False)
4✔
121

122
        yield
4✔
123
    finally:
124
        remove_injector()
4✔
125

126
        if old_injector:
4✔
127
            _LOCAL.injector = old_injector
×
128

129

130
class Command:
5✔
131
    """Base renku command builder."""
132

133
    HOOK_ORDER = 1
5✔
134

135
    def __init__(self) -> None:
5✔
136
        """__init__ of Command."""
137
        self.injection_pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
5✔
138
        self.pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
5✔
139
        self.post_hooks: Dict[int, List[Callable]] = defaultdict(list)
5✔
140
        self._operation: Optional[Callable] = None
5✔
141
        self._finalized: bool = False
5✔
142
        self._track_std_streams: bool = False
5✔
143
        self._working_directory: Optional[str] = None
5✔
144
        self._context_added: bool = False
5✔
145

146
    def __getattr__(self, name: str) -> Any:
5✔
147
        """Bubble up attributes of wrapped builders."""
148
        if "_builder" in self.__dict__:
5✔
UNCOV
149
            return getattr(self._builder, name)
×
150

151
        raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}")
5✔
152

153
    def __setattr__(self, name: str, value: Any) -> None:
5✔
154
        """Set attributes of wrapped builders."""
155
        if hasattr(self, "_builder") and self.__class__ is not self._builder.__class__:
5✔
156
            self._builder.__setattr__(name, value)
5✔
157

158
        object.__setattr__(self, name, value)
5✔
159

160
    def _injection_pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
5✔
161
        """Setup dependency injections.
162

163
        Args:
164
            builder("Command"): Current ``CommandBuilder``.
165
            context(dict): Current context dictionary.
166
        """
167
        if not project_context.has_context():
5✔
168
            path = get_git_path(self._working_directory or ".")
4✔
169
            project_context.push_path(path)
4✔
170
            self._context_added = True
4✔
171

172
        context["bindings"] = {}
5✔
173
        context["constructor_bindings"] = {}
5✔
174

175
    def _pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
5✔
176
        """Setup project.
177

178
        Args:
179
            builder("Command"): Current ``CommandBuilder``.
180
            context(dict): Current context dictionary.
181
        """
182

183
        stack = contextlib.ExitStack()
5✔
184
        context["stack"] = stack
5✔
185

186
    def _post_hook(self, builder: "Command", context: dict, result: "CommandResult", *args, **kwargs) -> None:
5✔
187
        """Post-hook method.
188

189
        Args:
190
            builder("Command"): Current ``CommandBuilder``.
191
            context(dict): Current context dictionary.
192
            result("CommandResult"): Result of command execution.
193
        """
194
        remove_injector()
5✔
195

196
        if self._context_added:
5✔
197
            project_context.pop_context()
4✔
198

199
        if result.error:
5✔
200
            raise result.error
4✔
201

202
    def execute(self, *args, **kwargs) -> "CommandResult":
5✔
203
        """Execute the wrapped operation.
204

205
        First executes `pre_hooks` in ascending `order`, passing a read/write context between them.
206
        It then calls the wrapped `operation`. The result of the operation then gets pass to all the `post_hooks`,
207
        but in descending `order`. It then returns the result or error if there was one.
208

209
        Returns:
210
            CommandResult: Result of execution of command.
211
        """
212
        if not self.finalized:
5✔
213
            raise errors.CommandNotFinalizedError("Call `build()` before executing a command")
×
214

215
        context: Dict[str, Any] = {}
5✔
216
        if any(self.injection_pre_hooks):
5✔
217
            order = sorted(self.injection_pre_hooks.keys())
5✔
218

219
            for o in order:
5✔
220
                for hook in self.injection_pre_hooks[o]:
5✔
221
                    hook(self, context, *args, **kwargs)
5✔
222

223
        def _bind(binder):
5✔
224
            for key, value in context["bindings"].items():
5✔
225
                binder.bind(key, value)
×
226
            for key, value in context["constructor_bindings"].items():
5✔
227
                binder.bind_to_constructor(key, value)
5✔
228

229
            return binder
5✔
230

231
        inject.configure(_bind, bind_in_runtime=False)
5✔
232

233
        if any(self.pre_hooks):
5✔
234
            order = sorted(self.pre_hooks.keys())
5✔
235

236
            for o in order:
5✔
237
                for hook in self.pre_hooks[o]:
5✔
238
                    try:
5✔
239
                        hook(self, context, *args, **kwargs)
5✔
UNCOV
240
                    except (Exception, BaseException):
×
241
                        # don't leak injections from failed hook
UNCOV
242
                        remove_injector()
×
UNCOV
243
                        raise
×
244

245
        output = None
5✔
246
        error = None
5✔
247

248
        try:
5✔
249
            with context["stack"]:
5✔
250
                output = self._operation(*args, **kwargs)  # type: ignore
5✔
251
        except errors.RenkuException as e:
4✔
252
            error = e
4✔
UNCOV
253
        except (Exception, BaseException):
×
UNCOV
254
            remove_injector()
×
UNCOV
255
            raise
×
256

257
        result = CommandResult(output, error, CommandResult.FAILURE if error else CommandResult.SUCCESS)
5✔
258

259
        if any(self.post_hooks):
5✔
260
            order = sorted(self.post_hooks.keys(), reverse=True)
5✔
261

262
            for o in order:
5✔
263
                for hook in self.post_hooks[o]:
5✔
264
                    hook(self, context, result, *args, **kwargs)
5✔
265

266
        return result
5✔
267

268
    @property
5✔
269
    def finalized(self) -> bool:
5✔
270
        """Whether this builder is still being constructed or has been finalized."""
271
        if hasattr(self, "_builder"):
5✔
272
            return self._builder.finalized
5✔
273
        return self._finalized
5✔
274

275
    def any_builder_is_instance_of(self, cls: Type) -> bool:
5✔
276
        """Check if any 'chained' command builder is an instance of a specific command builder class."""
277
        if isinstance(self, cls):
×
278
            return True
×
279
        elif "_builder" in self.__dict__:
×
280
            return self._builder.any_builder_is_instance_of(cls)
×
281
        else:
282
            return False
×
283

284
    @property
5✔
285
    def will_write_to_database(self) -> bool:
5✔
286
        """Will running the command write anything to the metadata store."""
287
        try:
×
288
            return self._write
×
289
        except AttributeError:
×
290
            return False
×
291

292
    @check_finalized
5✔
293
    def add_injection_pre_hook(self, order: int, hook: Callable):
5✔
294
        """Add a pre-execution hook for dependency injection.
295

296
        Args:
297
            order(int): Determines the order of executed hooks, lower numbers get executed first.
298
            hook(Callable): The hook to add.
299
        """
300
        if hasattr(self, "_builder"):
5✔
301
            self._builder.add_injection_pre_hook(order, hook)
4✔
302
        else:
303
            self.injection_pre_hooks[order].append(hook)
5✔
304

305
    @check_finalized
5✔
306
    def add_pre_hook(self, order: int, hook: Callable):
5✔
307
        """Add a pre-execution hook.
308

309
        Args:
310
            order(int): Determines the order of executed hooks, lower numbers get executed first.
311
            hook(Callable): The hook to add.
312
        """
313
        if hasattr(self, "_builder"):
5✔
314
            self._builder.add_pre_hook(order, hook)
4✔
315
        else:
316
            self.pre_hooks[order].append(hook)
5✔
317

318
    @check_finalized
5✔
319
    def add_post_hook(self, order: int, hook: Callable):
5✔
320
        """Add a post-execution hook.
321

322
        Args:
323
            order(int): Determines the order of executed hooks, higher numbers get executed first.
324
            hook(Callable): The hook to add.
325
        """
326
        if hasattr(self, "_builder"):
5✔
327
            self._builder.add_post_hook(order, hook)
4✔
328
        else:
329
            self.post_hooks[order].append(hook)
5✔
330

331
    @check_finalized
5✔
332
    def build(self) -> "Command":
5✔
333
        """Build (finalize) the command.
334

335
        Returns:
336
            Command: Finalized command that cannot be modified.
337
        """
338
        if not self._operation:
5✔
339
            raise errors.ConfigurationError("`Command` needs to have a wrapped `command` set")
×
340
        self.add_injection_pre_hook(self.HOOK_ORDER, self._injection_pre_hook)
5✔
341
        self.add_pre_hook(self.HOOK_ORDER, self._pre_hook)
5✔
342
        self.add_post_hook(self.HOOK_ORDER, self._post_hook)
5✔
343

344
        self._finalized = True
5✔
345

346
        return self
5✔
347

348
    @check_finalized
5✔
349
    def command(self, operation: Callable):
5✔
350
        """Set the wrapped command.
351

352
        Args:
353
            operation(Callable): The function to wrap in the command builder.
354

355
        Returns:
356
            Command: This command.
357
        """
358

359
        self._operation = operation
5✔
360

361
        return self
5✔
362

363
    @check_finalized
5✔
364
    def working_directory(self, directory: str) -> "Command":
5✔
365
        """Set the working directory for the command.
366

367
        WARNING: Should not be used in the core service.
368

369
        Args:
370
            directory(str): The working directory to work in.
371

372
        Returns:
373
            Command: This command.
374
        """
375
        self._working_directory = directory
×
376

377
        return self
×
378

379
    @check_finalized
5✔
380
    def track_std_streams(self) -> "Command":
5✔
381
        """Whether to track STD streams or not.
382

383
        Returns:
384
            Command: This command.
385
        """
386
        self._track_std_streams = True
×
387

388
        return self
×
389

390
    @check_finalized
5✔
391
    def with_git_isolation(self) -> "Command":
5✔
392
        """Whether to run in git isolation or not."""
UNCOV
393
        from renku.command.command_builder.repo import Isolation
×
394

UNCOV
395
        return Isolation(self)
×
396

397
    @check_finalized
5✔
398
    def with_commit(
5✔
399
        self,
400
        message: Optional[str] = None,
401
        commit_if_empty: bool = False,
402
        raise_if_empty: bool = False,
403
        commit_only: Optional[Union[str, List[Union[str, Path]]]] = None,
404
        skip_staging: bool = False,
405
        skip_dirty_checks: bool = False,
406
    ) -> "Command":
407
        """Create a commit.
408

409
        Args:
410
            message(str, optional): The commit message. Auto-generated if left empty (Default value = None).
411
            commit_if_empty(bool, optional): Whether to commit if there are no modified files (Default value = False).
412
            raise_if_empty(bool, optional): Whether to raise an exception if there are no modified files
413
                (Default value = False).
414
            commit_only(bool, optional): Only commit the supplied paths (Default value = None).
415
            skip_staging(bool): Don't commit staged files.
416
            skip_dirty_checks(bool): Don't check if paths are dirty or staged.
417
        """
418
        from renku.command.command_builder.repo import Commit
4✔
419

420
        return Commit(
4✔
421
            self,
422
            message=message,
423
            commit_if_empty=commit_if_empty,
424
            raise_if_empty=raise_if_empty,
425
            commit_only=commit_only,
426
            skip_staging=skip_staging,
427
            skip_dirty_checks=skip_dirty_checks,
428
        )
429

430
    @check_finalized
5✔
431
    def lock_project(self) -> "Command":
5✔
432
        """Acquire a lock for the whole project."""
433
        from renku.command.command_builder.lock import ProjectLock
4✔
434

435
        return ProjectLock(self)
4✔
436

437
    @check_finalized
5✔
438
    def lock_dataset(self) -> "Command":
5✔
439
        """Acquire a lock for a dataset."""
440
        from renku.command.command_builder.lock import DatasetLock
4✔
441

442
        return DatasetLock(self)
4✔
443

444
    @check_finalized
5✔
445
    def require_migration(self) -> "Command":
5✔
446
        """Check if a migration is needed."""
447
        from renku.command.command_builder.migration import RequireMigration
4✔
448

449
        return RequireMigration(self)
4✔
450

451
    @check_finalized
5✔
452
    def require_clean(self) -> "Command":
5✔
453
        """Check that the repository is clean."""
454
        from renku.command.command_builder.repo import RequireClean
4✔
455

456
        return RequireClean(self)
4✔
457

458
    @check_finalized
5✔
459
    def with_communicator(self, communicator: CommunicationCallback) -> "Command":
5✔
460
        """Create a communicator.
461

462
        Args:
463
            communicator(CommunicationCallback): Communicator to use for writing to user.
464
        """
465
        from renku.command.command_builder.communication import Communicator
4✔
466

467
        return Communicator(self, communicator)
4✔
468

469
    @check_finalized
5✔
470
    def with_database(self, write: bool = False, path: Optional[str] = None, create: bool = False) -> "Command":
5✔
471
        """Provide an object database connection.
472

473
        Args:
474
            write(bool, optional): Whether or not to persist changes to the database (Default value = False).
475
            path(str, optional): Location of the database (Default value = None).
476
            create(bool, optional): Whether the database should be created if it doesn't exist (Default value = False).
477
        """
478
        from renku.command.command_builder.database import DatabaseCommand
5✔
479

480
        return DatabaseCommand(self, write, path, create)
5✔
481

482

483
class CommandResult:
5✔
484
    """The result of a command.
485

486
    The return value of the command is set as `.output`, if there was an error, it is set as `.error`, and
487
    the status of the command is set to either `CommandResult.SUCCESS` or CommandResult.FAILURE`.
488
    """
489

490
    SUCCESS = 0
5✔
491

492
    FAILURE = 1
5✔
493

494
    def __init__(self, output, error, status) -> None:
5✔
495
        """__init__ of CommandResult."""
496
        self.output = output
5✔
497
        self.error = error
5✔
498
        self.status = status
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc