• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 4182445342

pending completion
4182445342

Pull #3320

github-actions

GitHub
Merge f135d5076 into 0e5906373
Pull Request #3320: chore: fix coveralls comments

12864 of 23835 relevant lines covered (53.97%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.34
/renku/command/command_builder/command.py
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright 2018-2022 - Swiss Data Science Center (SDSC)
4
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
5
# Eidgenössische Technische Hochschule Zürich (ETHZ).
6
#
7
# Licensed under the Apache License, Version 2.0 (the "License");
8
# you may not use this file except in compliance with the License.
9
# You may obtain a copy of the License at
10
#
11
#     http://www.apache.org/licenses/LICENSE-2.0
12
#
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS,
15
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
# See the License for the specific language governing permissions and
17
# limitations under the License.
18
"""Command builder."""
3✔
19

20
import contextlib
3✔
21
import functools
3✔
22
import threading
3✔
23
from collections import defaultdict
3✔
24
from pathlib import Path
3✔
25
from typing import Any, Callable, Dict, List, Optional, Type, Union
3✔
26

27
import inject
3✔
28

29
from renku.core import errors
3✔
30
from renku.core.util.communication import CommunicationCallback
3✔
31
from renku.core.util.git import get_git_path
3✔
32
from renku.domain_model.project_context import project_context
3✔
33

34
_LOCAL = threading.local()
3✔
35

36

37
def check_finalized(f):
3✔
38
    """Decorator to prevent modification of finalized builders.
39

40
    Args:
41
        f: Decorated function.
42

43
    Returns:
44
        Wrapped function.
45
    """
46

47
    @functools.wraps(f)
3✔
48
    def wrapper(*args, **kwargs):
3✔
49
        """Decorator to prevent modification of finalized builders."""
50
        if not args or not isinstance(args[0], Command):
3✔
51
            raise errors.ParameterError("Command hooks need to be `Command` object methods.")
×
52

53
        if args[0].finalized:
3✔
54
            raise errors.CommandFinalizedError("Cannot modify a finalized `Command`.")
×
55

56
        return f(*args, **kwargs)
3✔
57

58
    return wrapper
3✔
59

60

61
def _patched_get_injector_or_die() -> inject.Injector:
3✔
62
    """Patched version of get_injector_or_die with thread local injectors.
63

64
    Allows deferred definition of an injector per thread.
65
    """
66
    injector = getattr(_LOCAL, "injector", None)
3✔
67
    if not injector:
3✔
68
        raise inject.InjectorException("No injector is configured")
×
69

70
    return injector
3✔
71

72

73
def _patched_configure(config: Optional[inject.BinderCallable] = None, bind_in_runtime: bool = True) -> inject.Injector:
3✔
74
    """Create an injector with a callable config or raise an exception when already configured.
75

76
    Args:
77
        config(Optional[inject.BinderCallable], optional): Injection binding config (Default value = None).
78
        bind_in_runtime(bool, optional): Whether to allow binding at runtime (Default value = True).
79

80
    Returns:
81
        Injector: Thread-safe injector with bindings applied.
82
    """
83

84
    if getattr(_LOCAL, "injector", None):
3✔
85
        raise inject.InjectorException("Injector is already configured")
×
86

87
    _LOCAL.injector = inject.Injector(config, bind_in_runtime=bind_in_runtime)
3✔
88

89
    return _LOCAL.injector
3✔
90

91

92
inject.configure = _patched_configure
3✔
93
inject.get_injector_or_die = _patched_get_injector_or_die
3✔
94

95

96
def remove_injector():
3✔
97
    """Remove a thread-local injector."""
98
    if getattr(_LOCAL, "injector", None):
3✔
99
        del _LOCAL.injector
3✔
100

101

102
@contextlib.contextmanager
3✔
103
def replace_injection(bindings: Dict, constructor_bindings=None):
3✔
104
    """Temporarily inject various test objects.
105

106
    Args:
107
        bindings: New normal injection bindings to apply.
108
        constructor_bindings: New constructor bindings to apply (Default value = None).
109
    """
110
    constructor_bindings = constructor_bindings or {}
×
111

112
    def bind(binder):
×
113
        for key, value in bindings.items():
×
114
            binder.bind(key, value)
×
115
        for key, value in constructor_bindings.items():
×
116
            binder.bind_to_constructor(key, value)
×
117

118
    old_injector = getattr(_LOCAL, "injector", None)
×
119
    try:
×
120
        if old_injector:
×
121
            remove_injector()
×
122
        inject.configure(bind, bind_in_runtime=False)
×
123

124
        yield
×
125
    finally:
126
        remove_injector()
×
127

128
        if old_injector:
×
129
            _LOCAL.injector = old_injector
×
130

131

132
class Command:
3✔
133
    """Base renku command builder."""
134

135
    HOOK_ORDER = 1
3✔
136

137
    def __init__(self) -> None:
3✔
138
        """__init__ of Command."""
139
        self.injection_pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
3✔
140
        self.pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
3✔
141
        self.post_hooks: Dict[int, List[Callable]] = defaultdict(list)
3✔
142
        self._operation: Optional[Callable] = None
3✔
143
        self._finalized: bool = False
3✔
144
        self._track_std_streams: bool = False
3✔
145
        self._working_directory: Optional[str] = None
3✔
146
        self._context_added: bool = False
3✔
147

148
    def __getattr__(self, name: str) -> Any:
3✔
149
        """Bubble up attributes of wrapped builders."""
150
        if "_builder" in self.__dict__:
3✔
151
            return getattr(self._builder, name)
×
152

153
        raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}")
3✔
154

155
    def __setattr__(self, name: str, value: Any) -> None:
3✔
156
        """Set attributes of wrapped builders."""
157
        if hasattr(self, "_builder") and self.__class__ is not self._builder.__class__:
3✔
158
            self._builder.__setattr__(name, value)
3✔
159

160
        object.__setattr__(self, name, value)
3✔
161

162
    def _injection_pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
3✔
163
        """Setup dependency injections.
164

165
        Args:
166
            builder("Command"): Current ``CommandBuilder``.
167
            context(dict): Current context dictionary.
168
        """
169
        if not project_context.has_context():
3✔
170
            path = get_git_path(self._working_directory or ".")
×
171
            project_context.push_path(path)
×
172
            self._context_added = True
×
173

174
        context["bindings"] = {}
3✔
175
        context["constructor_bindings"] = {}
3✔
176

177
    def _pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
3✔
178
        """Setup project.
179

180
        Args:
181
            builder("Command"): Current ``CommandBuilder``.
182
            context(dict): Current context dictionary.
183
        """
184

185
        stack = contextlib.ExitStack()
3✔
186
        context["stack"] = stack
3✔
187

188
    def _post_hook(self, builder: "Command", context: dict, result: "CommandResult", *args, **kwargs) -> None:
3✔
189
        """Post-hook method.
190

191
        Args:
192
            builder("Command"): Current ``CommandBuilder``.
193
            context(dict): Current context dictionary.
194
            result("CommandResult"): Result of command execution.
195
        """
196
        remove_injector()
3✔
197

198
        if self._context_added:
3✔
199
            project_context.pop_context()
×
200

201
        if result.error:
3✔
202
            raise result.error
×
203

204
    def execute(self, *args, **kwargs) -> "CommandResult":
3✔
205
        """Execute the wrapped operation.
206

207
        First executes `pre_hooks` in ascending `order`, passing a read/write context between them.
208
        It then calls the wrapped `operation`. The result of the operation then gets pass to all the `post_hooks`,
209
        but in descending `order`. It then returns the result or error if there was one.
210

211
        Returns:
212
            CommandResult: Result of execution of command.
213
        """
214
        if not self.finalized:
3✔
215
            raise errors.CommandNotFinalizedError("Call `build()` before executing a command")
×
216

217
        context: Dict[str, Any] = {}
3✔
218
        if any(self.injection_pre_hooks):
3✔
219
            order = sorted(self.injection_pre_hooks.keys())
3✔
220

221
            for o in order:
3✔
222
                for hook in self.injection_pre_hooks[o]:
3✔
223
                    hook(self, context, *args, **kwargs)
3✔
224

225
        def _bind(binder):
3✔
226
            for key, value in context["bindings"].items():
3✔
227
                binder.bind(key, value)
×
228
            for key, value in context["constructor_bindings"].items():
3✔
229
                binder.bind_to_constructor(key, value)
3✔
230

231
            return binder
3✔
232

233
        inject.configure(_bind, bind_in_runtime=False)
3✔
234

235
        if any(self.pre_hooks):
3✔
236
            order = sorted(self.pre_hooks.keys())
3✔
237

238
            for o in order:
3✔
239
                for hook in self.pre_hooks[o]:
3✔
240
                    try:
3✔
241
                        hook(self, context, *args, **kwargs)
3✔
242
                    except (Exception, BaseException):
×
243
                        # don't leak injections from failed hook
244
                        remove_injector()
×
245
                        raise
×
246

247
        output = None
3✔
248
        error = None
3✔
249

250
        try:
3✔
251
            with context["stack"]:
3✔
252
                output = self._operation(*args, **kwargs)  # type: ignore
3✔
253
        except errors.RenkuException as e:
×
254
            error = e
×
255
        except (Exception, BaseException):
×
256
            remove_injector()
×
257
            raise
×
258

259
        result = CommandResult(output, error, CommandResult.FAILURE if error else CommandResult.SUCCESS)
3✔
260

261
        if any(self.post_hooks):
3✔
262
            order = sorted(self.post_hooks.keys(), reverse=True)
3✔
263

264
            for o in order:
3✔
265
                for hook in self.post_hooks[o]:
3✔
266
                    hook(self, context, result, *args, **kwargs)
3✔
267

268
        return result
3✔
269

270
    @property
3✔
271
    def finalized(self) -> bool:
3✔
272
        """Whether this builder is still being constructed or has been finalized."""
273
        if hasattr(self, "_builder"):
3✔
274
            return self._builder.finalized
3✔
275
        return self._finalized
3✔
276

277
    def any_builder_is_instance_of(self, cls: Type) -> bool:
3✔
278
        """Check if any 'chained' command builder is an instance of a specific command builder class."""
279
        if isinstance(self, cls):
×
280
            return True
×
281
        elif "_builder" in self.__dict__:
×
282
            return self._builder.any_builder_is_instance_of(cls)
×
283
        else:
284
            return False
×
285

286
    @property
3✔
287
    def will_write_to_database(self) -> bool:
3✔
288
        """Will running the command write anything to the metadata store."""
289
        try:
×
290
            return self._write
×
291
        except AttributeError:
×
292
            return False
×
293

294
    @check_finalized
3✔
295
    def add_injection_pre_hook(self, order: int, hook: Callable):
3✔
296
        """Add a pre-execution hook for dependency injection.
297

298
        Args:
299
            order(int): Determines the order of executed hooks, lower numbers get executed first.
300
            hook(Callable): The hook to add.
301
        """
302
        if hasattr(self, "_builder"):
3✔
303
            self._builder.add_injection_pre_hook(order, hook)
2✔
304
        else:
305
            self.injection_pre_hooks[order].append(hook)
3✔
306

307
    @check_finalized
3✔
308
    def add_pre_hook(self, order: int, hook: Callable):
3✔
309
        """Add a pre-execution hook.
310

311
        Args:
312
            order(int): Determines the order of executed hooks, lower numbers get executed first.
313
            hook(Callable): The hook to add.
314
        """
315
        if hasattr(self, "_builder"):
3✔
316
            self._builder.add_pre_hook(order, hook)
2✔
317
        else:
318
            self.pre_hooks[order].append(hook)
3✔
319

320
    @check_finalized
3✔
321
    def add_post_hook(self, order: int, hook: Callable):
3✔
322
        """Add a post-execution hook.
323

324
        Args:
325
            order(int): Determines the order of executed hooks, higher numbers get executed first.
326
            hook(Callable): The hook to add.
327
        """
328
        if hasattr(self, "_builder"):
3✔
329
            self._builder.add_post_hook(order, hook)
2✔
330
        else:
331
            self.post_hooks[order].append(hook)
3✔
332

333
    @check_finalized
3✔
334
    def build(self) -> "Command":
3✔
335
        """Build (finalize) the command.
336

337
        Returns:
338
            Command: Finalized command that cannot be modified.
339
        """
340
        if not self._operation:
3✔
341
            raise errors.ConfigurationError("`Command` needs to have a wrapped `command` set")
×
342
        self.add_injection_pre_hook(self.HOOK_ORDER, self._injection_pre_hook)
3✔
343
        self.add_pre_hook(self.HOOK_ORDER, self._pre_hook)
3✔
344
        self.add_post_hook(self.HOOK_ORDER, self._post_hook)
3✔
345

346
        self._finalized = True
3✔
347

348
        return self
3✔
349

350
    @check_finalized
3✔
351
    def command(self, operation: Callable):
3✔
352
        """Set the wrapped command.
353

354
        Args:
355
            operation(Callable): The function to wrap in the command builder.
356

357
        Returns:
358
            Command: This command.
359
        """
360

361
        self._operation = operation
3✔
362

363
        return self
3✔
364

365
    @check_finalized
3✔
366
    def working_directory(self, directory: str) -> "Command":
3✔
367
        """Set the working directory for the command.
368

369
        WARNING: Should not be used in the core service.
370

371
        Args:
372
            directory(str): The working directory to work in.
373

374
        Returns:
375
            Command: This command.
376
        """
377
        self._working_directory = directory
×
378

379
        return self
×
380

381
    @check_finalized
3✔
382
    def track_std_streams(self) -> "Command":
3✔
383
        """Whether to track STD streams or not.
384

385
        Returns:
386
            Command: This command.
387
        """
388
        self._track_std_streams = True
×
389

390
        return self
×
391

392
    @check_finalized
3✔
393
    def with_git_isolation(self) -> "Command":
3✔
394
        """Whether to run in git isolation or not."""
395
        from renku.command.command_builder.repo import Isolation
×
396

397
        return Isolation(self)
×
398

399
    @check_finalized
3✔
400
    def with_commit(
3✔
401
        self,
402
        message: Optional[str] = None,
403
        commit_if_empty: bool = False,
404
        raise_if_empty: bool = False,
405
        commit_only: Optional[Union[str, List[Union[str, Path]]]] = None,
406
        skip_staging: bool = False,
407
        skip_dirty_checks: bool = False,
408
    ) -> "Command":
409
        """Create a commit.
410

411
        Args:
412
            message(str, optional): The commit message. Auto-generated if left empty (Default value = None).
413
            commit_if_empty(bool, optional): Whether to commit if there are no modified files (Default value = False).
414
            raise_if_empty(bool, optional): Whether to raise an exception if there are no modified files
415
                (Default value = False).
416
            commit_only(bool, optional): Only commit the supplied paths (Default value = None).
417
            skip_staging(bool): Don't commit staged files.
418
            skip_dirty_checks(bool): Don't check if paths are dirty or staged.
419
        """
420
        from renku.command.command_builder.repo import Commit
2✔
421

422
        return Commit(
2✔
423
            self,
424
            message=message,
425
            commit_if_empty=commit_if_empty,
426
            raise_if_empty=raise_if_empty,
427
            commit_only=commit_only,
428
            skip_staging=skip_staging,
429
            skip_dirty_checks=skip_dirty_checks,
430
        )
431

432
    @check_finalized
3✔
433
    def lock_project(self) -> "Command":
3✔
434
        """Acquire a lock for the whole project."""
435
        from renku.command.command_builder.lock import ProjectLock
×
436

437
        return ProjectLock(self)
×
438

439
    @check_finalized
3✔
440
    def lock_dataset(self) -> "Command":
3✔
441
        """Acquire a lock for a dataset."""
442
        from renku.command.command_builder.lock import DatasetLock
×
443

444
        return DatasetLock(self)
×
445

446
    @check_finalized
3✔
447
    def require_migration(self) -> "Command":
3✔
448
        """Check if a migration is needed."""
449
        from renku.command.command_builder.migration import RequireMigration
2✔
450

451
        return RequireMigration(self)
2✔
452

453
    @check_finalized
3✔
454
    def require_clean(self) -> "Command":
3✔
455
        """Check that the repository is clean."""
456
        from renku.command.command_builder.repo import RequireClean
2✔
457

458
        return RequireClean(self)
2✔
459

460
    @check_finalized
3✔
461
    def with_communicator(self, communicator: CommunicationCallback) -> "Command":
3✔
462
        """Create a communicator.
463

464
        Args:
465
            communicator(CommunicationCallback): Communicator to use for writing to user.
466
        """
467
        from renku.command.command_builder.communication import Communicator
2✔
468

469
        return Communicator(self, communicator)
2✔
470

471
    @check_finalized
3✔
472
    def with_database(self, write: bool = False, path: Optional[str] = None, create: bool = False) -> "Command":
3✔
473
        """Provide an object database connection.
474

475
        Args:
476
            write(bool, optional): Whether or not to persist changes to the database (Default value = False).
477
            path(str, optional): Location of the database (Default value = None).
478
            create(bool, optional): Whether the database should be created if it doesn't exist (Default value = False).
479
        """
480
        from renku.command.command_builder.database import DatabaseCommand
3✔
481

482
        return DatabaseCommand(self, write, path, create)
3✔
483

484

485
class CommandResult:
3✔
486
    """The result of a command.
487

488
    The return value of the command is set as `.output`, if there was an error, it is set as `.error`, and
489
    the status of the command is set to either `CommandResult.SUCCESS` or CommandResult.FAILURE`.
490
    """
491

492
    SUCCESS = 0
3✔
493

494
    FAILURE = 1
3✔
495

496
    def __init__(self, output, error, status) -> None:
3✔
497
        """__init__ of CommandResult."""
498
        self.output = output
3✔
499
        self.error = error
3✔
500
        self.status = status
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc