• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 14471580514

11 Jul 2024 02:47PM UTC coverage: 57.068% (-20.2%) from 77.316%
14471580514

push

github

web-flow
fix: properly remove activities in workflow remove if catalog is corrupt (#3729)

0 of 1 new or added line in 1 file covered. (0.0%)

6141 existing lines in 213 files now uncovered.

17395 of 30481 relevant lines covered (57.07%)

1.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.38
/renku/command/command_builder/command.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Command builder."""
4✔
17

18
import contextlib
4✔
19
import functools
4✔
20
import shutil
4✔
21
import threading
4✔
22
from collections import defaultdict
4✔
23
from pathlib import Path
4✔
24
from typing import Any, Callable, Dict, List, Optional, Type, Union
4✔
25

26
import inject
4✔
27

28
from renku.core import errors
4✔
29
from renku.core.util.communication import CommunicationCallback
4✔
30
from renku.core.util.git import get_git_path
4✔
31
from renku.domain_model.project_context import project_context
4✔
32

33
_LOCAL = threading.local()
4✔
34

35

36
def check_finalized(f):
4✔
37
    """Decorator to prevent modification of finalized builders.
38

39
    Args:
40
        f: Decorated function.
41

42
    Returns:
43
        Wrapped function.
44
    """
45

46
    @functools.wraps(f)
4✔
47
    def wrapper(*args, **kwargs):
4✔
48
        """Decorator to prevent modification of finalized builders."""
49
        if not args or not isinstance(args[0], Command):
4✔
50
            raise errors.ParameterError("Command hooks need to be `Command` object methods.")
×
51

52
        if args[0].finalized:
4✔
53
            raise errors.CommandFinalizedError("Cannot modify a finalized `Command`.")
×
54

55
        return f(*args, **kwargs)
4✔
56

57
    return wrapper
4✔
58

59

60
def _patched_get_injector_or_die() -> inject.Injector:
4✔
61
    """Patched version of get_injector_or_die with thread local injectors.
62

63
    Allows deferred definition of an injector per thread.
64
    """
65
    injector = getattr(_LOCAL, "injector", None)
4✔
66
    if not injector:
4✔
UNCOV
67
        raise inject.InjectorException("No injector is configured")
×
68

69
    return injector
4✔
70

71

72
def _patched_configure(config: Optional[inject.BinderCallable] = None, bind_in_runtime: bool = True) -> inject.Injector:
4✔
73
    """Create an injector with a callable config or raise an exception when already configured.
74

75
    Args:
76
        config(Optional[inject.BinderCallable], optional): Injection binding config (Default value = None).
77
        bind_in_runtime(bool, optional): Whether to allow binding at runtime (Default value = True).
78

79
    Returns:
80
        Injector: Thread-safe injector with bindings applied.
81
    """
82

83
    if getattr(_LOCAL, "injector", None):
4✔
84
        raise inject.InjectorException("Injector is already configured")
×
85

86
    _LOCAL.injector = inject.Injector(config, bind_in_runtime=bind_in_runtime)
4✔
87

88
    return _LOCAL.injector
4✔
89

90

91
inject.configure = _patched_configure
4✔
92
inject.get_injector_or_die = _patched_get_injector_or_die
4✔
93

94

95
def remove_injector():
4✔
96
    """Remove a thread-local injector."""
97
    if getattr(_LOCAL, "injector", None):
4✔
98
        del _LOCAL.injector
4✔
99

100

101
@contextlib.contextmanager
4✔
102
def replace_injection(bindings: Dict, constructor_bindings=None):
4✔
103
    """Temporarily inject various test objects.
104

105
    Args:
106
        bindings: New normal injection bindings to apply.
107
        constructor_bindings: New constructor bindings to apply (Default value = None).
108
    """
109
    constructor_bindings = constructor_bindings or {}
2✔
110

111
    def bind(binder):
2✔
112
        for key, value in bindings.items():
2✔
113
            binder.bind(key, value)
×
114
        for key, value in constructor_bindings.items():
2✔
115
            binder.bind_to_constructor(key, value)
2✔
116

117
    old_injector = getattr(_LOCAL, "injector", None)
2✔
118
    try:
2✔
119
        if old_injector:
2✔
120
            remove_injector()
×
121
        inject.configure(bind, bind_in_runtime=False)
2✔
122

123
        yield
2✔
124
    finally:
125
        remove_injector()
2✔
126

127
        if old_injector:
2✔
128
            _LOCAL.injector = old_injector
×
129

130

131
class Command:
4✔
132
    """Base renku command builder."""
133

134
    HOOK_ORDER = 1
4✔
135

136
    def __init__(self) -> None:
4✔
137
        """__init__ of Command."""
138
        self.injection_pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
4✔
139
        self.pre_hooks: Dict[int, List[Callable]] = defaultdict(list)
4✔
140
        self.post_hooks: Dict[int, List[Callable]] = defaultdict(list)
4✔
141
        self._operation: Optional[Callable] = None
4✔
142
        self._finalized: bool = False
4✔
143
        self._track_std_streams: bool = False
4✔
144
        self._working_directory: Optional[str] = None
4✔
145
        self._context_added: bool = False
4✔
146

147
    def __getattr__(self, name: str) -> Any:
4✔
148
        """Bubble up attributes of wrapped builders."""
149
        if "_builder" in self.__dict__:
4✔
150
            return getattr(self._builder, name)
2✔
151

152
        raise AttributeError(f"{self.__class__.__name__} object has no attribute {name}")
4✔
153

154
    def __setattr__(self, name: str, value: Any) -> None:
4✔
155
        """Set attributes of wrapped builders."""
156
        if hasattr(self, "_builder") and self.__class__ is not self._builder.__class__:
4✔
157
            self._builder.__setattr__(name, value)
4✔
158

159
        object.__setattr__(self, name, value)
4✔
160

161
    def _injection_pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
4✔
162
        """Setup dependency injections.
163

164
        Args:
165
            builder("Command"): Current ``CommandBuilder``.
166
            context(dict): Current context dictionary.
167
        """
168
        if not project_context.has_context():
4✔
UNCOV
169
            path = get_git_path(self._working_directory or ".")
×
UNCOV
170
            project_context.push_path(path)
×
UNCOV
171
            self._context_added = True
×
172

173
        context["bindings"] = {}
4✔
174
        context["constructor_bindings"] = {}
4✔
175

176
    def _pre_hook(self, builder: "Command", context: dict, *args, **kwargs) -> None:
4✔
177
        """Setup project.
178

179
        Args:
180
            builder("Command"): Current ``CommandBuilder``.
181
            context(dict): Current context dictionary.
182
        """
183

184
        stack = contextlib.ExitStack()
4✔
185
        context["stack"] = stack
4✔
186

187
    def _post_hook(self, builder: "Command", context: dict, result: "CommandResult", *args, **kwargs) -> None:
4✔
188
        """Post-hook method.
189

190
        Args:
191
            builder("Command"): Current ``CommandBuilder``.
192
            context(dict): Current context dictionary.
193
            result("CommandResult"): Result of command execution.
194
        """
195
        remove_injector()
4✔
196

197
        if self._context_added:
4✔
UNCOV
198
            project_context.pop_context()
×
199

200
        if result.error:
4✔
201
            raise result.error
2✔
202

203
    def execute(self, *args, **kwargs) -> "CommandResult":
4✔
204
        """Execute the wrapped operation.
205

206
        First executes `pre_hooks` in ascending `order`, passing a read/write context between them.
207
        It then calls the wrapped `operation`. The result of the operation then gets pass to all the `post_hooks`,
208
        but in descending `order`. It then returns the result or error if there was one.
209

210
        Returns:
211
            CommandResult: Result of execution of command.
212
        """
213
        if not self.finalized:
4✔
214
            raise errors.CommandNotFinalizedError("Call `build()` before executing a command")
×
215

216
        context: Dict[str, Any] = {}
4✔
217
        if any(self.injection_pre_hooks):
4✔
218
            order = sorted(self.injection_pre_hooks.keys())
4✔
219

220
            for o in order:
4✔
221
                for hook in self.injection_pre_hooks[o]:
4✔
222
                    hook(self, context, *args, **kwargs)
4✔
223

224
        def _bind(binder):
4✔
225
            for key, value in context["bindings"].items():
4✔
226
                binder.bind(key, value)
×
227
            for key, value in context["constructor_bindings"].items():
4✔
228
                binder.bind_to_constructor(key, value)
4✔
229

230
            return binder
4✔
231

232
        inject.configure(_bind, bind_in_runtime=False)
4✔
233

234
        if any(self.pre_hooks):
4✔
235
            order = sorted(self.pre_hooks.keys())
4✔
236

237
            for o in order:
4✔
238
                for hook in self.pre_hooks[o]:
4✔
239
                    try:
4✔
240
                        hook(self, context, *args, **kwargs)
4✔
UNCOV
241
                    except (Exception, BaseException):
×
242
                        # don't leak injections from failed hook
UNCOV
243
                        remove_injector()
×
UNCOV
244
                        raise
×
245

246
        output = None
4✔
247
        error = None
4✔
248

249
        try:
4✔
250
            with context["stack"]:
4✔
251
                output = self._operation(*args, **kwargs)  # type: ignore
4✔
252
        except errors.RenkuException as e:
2✔
253
            error = e
2✔
UNCOV
254
        except (Exception, BaseException):
×
UNCOV
255
            remove_injector()
×
UNCOV
256
            raise
×
257

258
        result = CommandResult(output, error, CommandResult.FAILURE if error else CommandResult.SUCCESS)
4✔
259

260
        if any(self.post_hooks):
4✔
261
            order = sorted(self.post_hooks.keys(), reverse=True)
4✔
262

263
            for o in order:
4✔
264
                for hook in self.post_hooks[o]:
4✔
265
                    hook(self, context, result, *args, **kwargs)
4✔
266

267
        return result
4✔
268

269
    @property
4✔
270
    def finalized(self) -> bool:
4✔
271
        """Whether this builder is still being constructed or has been finalized."""
272
        if hasattr(self, "_builder"):
4✔
273
            return self._builder.finalized
4✔
274
        return self._finalized
4✔
275

276
    def any_builder_is_instance_of(self, cls: Type) -> bool:
4✔
277
        """Check if any 'chained' command builder is an instance of a specific command builder class."""
278
        if isinstance(self, cls):
×
279
            return True
×
280
        elif "_builder" in self.__dict__:
×
281
            return self._builder.any_builder_is_instance_of(cls)
×
282
        else:
283
            return False
×
284

285
    @property
4✔
286
    def will_write_to_database(self) -> bool:
4✔
287
        """Will running the command write anything to the metadata store."""
288
        try:
×
289
            return self._write
×
290
        except AttributeError:
×
291
            return False
×
292

293
    @check_finalized
4✔
294
    def add_injection_pre_hook(self, order: int, hook: Callable):
4✔
295
        """Add a pre-execution hook for dependency injection.
296

297
        Args:
298
            order(int): Determines the order of executed hooks, lower numbers get executed first.
299
            hook(Callable): The hook to add.
300
        """
301
        if hasattr(self, "_builder"):
4✔
302
            self._builder.add_injection_pre_hook(order, hook)
4✔
303
        else:
304
            self.injection_pre_hooks[order].append(hook)
4✔
305

306
    @check_finalized
4✔
307
    def add_pre_hook(self, order: int, hook: Callable):
4✔
308
        """Add a pre-execution hook.
309

310
        Args:
311
            order(int): Determines the order of executed hooks, lower numbers get executed first.
312
            hook(Callable): The hook to add.
313
        """
314
        if hasattr(self, "_builder"):
4✔
315
            self._builder.add_pre_hook(order, hook)
4✔
316
        else:
317
            self.pre_hooks[order].append(hook)
4✔
318

319
    @check_finalized
4✔
320
    def add_post_hook(self, order: int, hook: Callable):
4✔
321
        """Add a post-execution hook.
322

323
        Args:
324
            order(int): Determines the order of executed hooks, higher numbers get executed first.
325
            hook(Callable): The hook to add.
326
        """
327
        if hasattr(self, "_builder"):
4✔
328
            self._builder.add_post_hook(order, hook)
4✔
329
        else:
330
            self.post_hooks[order].append(hook)
4✔
331

332
    @check_finalized
4✔
333
    def build(self) -> "Command":
4✔
334
        """Build (finalize) the command.
335

336
        Returns:
337
            Command: Finalized command that cannot be modified.
338
        """
339
        if not self._operation:
4✔
340
            raise errors.ConfigurationError("`Command` needs to have a wrapped `command` set")
×
341
        self.add_injection_pre_hook(self.HOOK_ORDER, self._injection_pre_hook)
4✔
342
        self.add_pre_hook(self.HOOK_ORDER, self._pre_hook)
4✔
343
        self.add_post_hook(self.HOOK_ORDER, self._post_hook)
4✔
344

345
        self._finalized = True
4✔
346

347
        return self
4✔
348

349
    @check_finalized
4✔
350
    def command(self, operation: Callable):
4✔
351
        """Set the wrapped command.
352

353
        Args:
354
            operation(Callable): The function to wrap in the command builder.
355

356
        Returns:
357
            Command: This command.
358
        """
359

360
        self._operation = operation
4✔
361

362
        return self
4✔
363

364
    @check_finalized
4✔
365
    def working_directory(self, directory: str) -> "Command":
4✔
366
        """Set the working directory for the command.
367

368
        WARNING: Should not be used in the core service.
369

370
        Args:
371
            directory(str): The working directory to work in.
372

373
        Returns:
374
            Command: This command.
375
        """
376
        self._working_directory = directory
×
377

378
        return self
×
379

380
    @check_finalized
4✔
381
    def track_std_streams(self) -> "Command":
4✔
382
        """Whether to track STD streams or not.
383

384
        Returns:
385
            Command: This command.
386
        """
387
        self._track_std_streams = True
×
388

389
        return self
×
390

391
    @check_finalized
4✔
392
    def with_git_isolation(self) -> "Command":
4✔
393
        """Whether to run in git isolation or not."""
394
        from renku.command.command_builder.repo import Isolation
2✔
395

396
        return Isolation(self)
2✔
397

398
    @check_finalized
4✔
399
    def with_commit(
4✔
400
        self,
401
        message: Optional[str] = None,
402
        commit_if_empty: bool = False,
403
        raise_if_empty: bool = False,
404
        commit_only: Optional[Union[str, List[Union[str, Path]]]] = None,
405
        skip_staging: bool = False,
406
        skip_dirty_checks: bool = False,
407
    ) -> "Command":
408
        """Create a commit.
409

410
        Args:
411
            message(str, optional): The commit message. Auto-generated if left empty (Default value = None).
412
            commit_if_empty(bool, optional): Whether to commit if there are no modified files (Default value = False).
413
            raise_if_empty(bool, optional): Whether to raise an exception if there are no modified files
414
                (Default value = False).
415
            commit_only(bool, optional): Only commit the supplied paths (Default value = None).
416
            skip_staging(bool): Don't commit staged files.
417
            skip_dirty_checks(bool): Don't check if paths are dirty or staged.
418
        """
419
        from renku.command.command_builder.repo import Commit
4✔
420

421
        return Commit(
4✔
422
            self,
423
            message=message,
424
            commit_if_empty=commit_if_empty,
425
            raise_if_empty=raise_if_empty,
426
            commit_only=commit_only,
427
            skip_staging=skip_staging,
428
            skip_dirty_checks=skip_dirty_checks,
429
        )
430

431
    @check_finalized
4✔
432
    def lock_project(self) -> "Command":
4✔
433
        """Acquire a lock for the whole project."""
434
        from renku.command.command_builder.lock import ProjectLock
4✔
435

436
        return ProjectLock(self)
4✔
437

438
    @check_finalized
4✔
439
    def lock_dataset(self) -> "Command":
4✔
440
        """Acquire a lock for a dataset."""
441
        from renku.command.command_builder.lock import DatasetLock
2✔
442

443
        return DatasetLock(self)
2✔
444

445
    @check_finalized
4✔
446
    def require_migration(self) -> "Command":
4✔
447
        """Check if a migration is needed."""
448
        from renku.command.command_builder.migration import RequireMigration
2✔
449

450
        return RequireMigration(self)
2✔
451

452
    @check_finalized
4✔
453
    def require_clean(self) -> "Command":
4✔
454
        """Check that the repository is clean."""
455
        from renku.command.command_builder.repo import RequireClean
4✔
456

457
        return RequireClean(self)
4✔
458

459
    @check_finalized
4✔
460
    def require_login(self) -> "Command":
4✔
461
        """Check that the user is logged in."""
462
        from renku.command.command_builder.repo import RequireLogin
×
463

464
        return RequireLogin(self)
×
465

466
    @check_finalized
4✔
467
    def with_communicator(self, communicator: CommunicationCallback) -> "Command":
4✔
468
        """Create a communicator.
469

470
        Args:
471
            communicator(CommunicationCallback): Communicator to use for writing to user.
472
        """
473
        from renku.command.command_builder.communication import Communicator
4✔
474

475
        return Communicator(self, communicator)
4✔
476

477
    @check_finalized
4✔
478
    def with_database(self, write: bool = False, path: Optional[str] = None, create: bool = False) -> "Command":
4✔
479
        """Provide an object database connection.
480

481
        Args:
482
            write(bool, optional): Whether or not to persist changes to the database (Default value = False).
483
            path(str, optional): Location of the database (Default value = None).
484
            create(bool, optional): Whether the database should be created if it doesn't exist (Default value = False).
485
        """
486
        from renku.command.command_builder.database import DatabaseCommand
4✔
487

488
        return DatabaseCommand(self, write, path, create)
4✔
489

490
    @check_finalized
4✔
491
    def with_gitlab_api(self) -> "Command":
4✔
492
        """Inject gitlab api client."""
493
        from renku.command.command_builder.gitlab import GitlabApiCommand
×
494

495
        return GitlabApiCommand(self)
×
496

497
    @check_finalized
4✔
498
    def with_storage_api(self) -> "Command":
4✔
499
        """Inject storage api client."""
500
        from renku.command.command_builder.storage import StorageApiCommand
×
501

502
        return StorageApiCommand(self)
×
503

504

505
class CommandResult:
4✔
506
    """The result of a command.
507

508
    The return value of the command is set as `.output`, if there was an error, it is set as `.error`, and
509
    the status of the command is set to either `CommandResult.SUCCESS` or CommandResult.FAILURE`.
510
    """
511

512
    SUCCESS = 0
4✔
513

514
    FAILURE = 1
4✔
515

516
    def __init__(self, output, error, status) -> None:
4✔
517
        """__init__ of CommandResult."""
518
        self.output = output
4✔
519
        self.error = error
4✔
520
        self.status = status
4✔
521

522

523
class RequireExecutable(Command):
4✔
524
    """Builder to check if an executable is installed."""
525

526
    HOOK_ORDER = 4
4✔
527

528
    def __init__(self, builder: Command, executable: str) -> None:
4✔
529
        """__init__ of RequireExecutable."""
530
        self._builder = builder
×
531
        self._executable = executable
×
532

533
    def _pre_hook(self, builder: Command, context: dict, *args, **kwargs) -> None:
4✔
534
        """Check if an executable exists on the system.
535

536
        Args:
537
            builder(Command): Current ``CommandBuilder``.
538
            context(dict): Current context.
539
        """
540
        if not shutil.which(self._executable):
×
541
            raise errors.ExecutableNotFound(
×
542
                f"Couldn't find the executable '{self._executable}' on this system. Please make sure it's installed"
543
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc