• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-python / 9058668052

13 May 2024 07:05AM UTC coverage: 77.713% (-8.4%) from 86.115%
9058668052

Pull #3727

github

web-flow
Merge 128d38387 into 050ed61bf
Pull Request #3727: fix: don't fail session launch when gitlab couldn't be reached

15 of 29 new or added lines in 3 files covered. (51.72%)

2594 existing lines in 125 files now uncovered.

23893 of 30745 relevant lines covered (77.71%)

3.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.15
/renku/infrastructure/database.py
1
# Copyright Swiss Data Science Center (SDSC). A partnership between
2
# École Polytechnique Fédérale de Lausanne (EPFL) and
3
# Eidgenössische Technische Hochschule Zürich (ETHZ).
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16
"""Custom database for store Persistent objects."""
7✔
17

18
import datetime
7✔
19
import hashlib
7✔
20
import importlib
7✔
21
import io
7✔
22
import json
7✔
23
from enum import Enum
7✔
24
from pathlib import Path
7✔
25
from types import BuiltinFunctionType, FunctionType
7✔
26
from typing import Any, Dict, List, Optional, Union, cast
7✔
27
from uuid import uuid4
7✔
28

29
import deal
7✔
30
import persistent
7✔
31
import zstandard as zstd
7✔
32
from BTrees.Length import Length
7✔
33
from BTrees.OOBTree import BTree, OOBucket, OOSet, OOTreeSet
7✔
34
from persistent import GHOST, UPTODATE
7✔
35
from persistent.interfaces import IPickleCache
7✔
36
from zc.relation.catalog import Catalog
7✔
37
from ZODB.utils import z64
7✔
38
from zope.interface import implementer
7✔
39
from zope.interface.interface import InterfaceClass
7✔
40

41
from renku.core import errors
7✔
42
from renku.domain_model.project import Project
7✔
43
from renku.infrastructure.immutable import Immutable
7✔
44
from renku.infrastructure.persistent import Persistent
7✔
45

46
OID_TYPE = str
7✔
47
TYPE_TYPE = "type"
7✔
48
FUNCTION_TYPE = "function"
7✔
49
REFERENCE_TYPE = "reference"
7✔
50
SET_TYPE = "set"
7✔
51
FROZEN_SET_TYPE = "frozenset"
7✔
52
MARKER = object()
7✔
53
"""These are used as _p_serial to mark if an object was read from storage or is new"""
7✔
54

55
NEW = z64  # NOTE: Do not change this value since this is the default when a Persistent object is created
7✔
56
PERSISTED = b"1" * 8
7✔
57

58

59
def _is_module_allowed(module_name: str, type_name: str):
7✔
60
    """Checks whether it is allowed to import from the given module for security purposes.
61

62
    Args:
63
        module_name(str): The module name to check.
64
        type_name(str): The type within the module to check.
65

66
    Raises:
67
        TypeError: If the type is now allowed in the database.
68
    """
69

70
    if module_name not in ["BTrees", "builtins", "datetime", "persistent", "renku", "zc", "zope", "deal"]:
7✔
71
        raise TypeError(f"Objects of type '{type_name}' are not allowed")
×
72

73

74
def get_type_name(object) -> Optional[str]:
7✔
75
    """Return fully-qualified object's type name.
76

77
    Args:
78
        object:  The object to get the type name for.
79

80
    Returns:
81
        Optional[str]: The fully qualified type name.
82

83
    """
84
    if object is None:
7✔
85
        return None
7✔
86

87
    object_type = object if isinstance(object, type) else type(object)
7✔
88
    return f"{object_type.__module__}.{object_type.__qualname__}"
7✔
89

90

91
def get_class(type_name: Optional[str]) -> Optional[type]:
7✔
92
    """Return the class for a fully-qualified type name.
93

94
    Args:
95
        type_name(Optional[str]): The name of the class to get.
96

97
    Returns:
98
        Optional[type]: The class.
99

100
    """
101
    if type_name is None:
7✔
102
        return None
7✔
103

104
    components = type_name.split(".")
7✔
105
    module_name = components[0]
7✔
106

107
    _is_module_allowed(module_name, type_name)
7✔
108

109
    module = __import__(module_name)
7✔
110

111
    return get_attribute(module, components[1:])
7✔
112

113

114
def get_attribute(object, name: Union[List[str], str]):
7✔
115
    """Return an attribute of an object.
116

117
    Args:
118
        object: The object to get an attribute on.
119
        name(Union[List[str], str): The name of the attribute to get.
120

121
    Returns:
122
        The value of the attribute.
123
    """
124
    import sys
7✔
125

126
    components = name.split(".") if isinstance(name, str) else name
7✔
127

128
    def _module_name(o):
7✔
129
        return o.__module__ if hasattr(o, "__module__") else o.__name__
7✔
130

131
    module_name = _module_name(object)
7✔
132
    root_module_name = module_name.split(".")[0]
7✔
133

134
    for component in components:
7✔
135
        module_name = _module_name(object)
7✔
136
        if not hasattr(object, component) and f"{module_name}.{component}" not in sys.modules:
7✔
137
            try:
1✔
138
                _is_module_allowed(root_module_name, object.__name__)
1✔
139
                object = importlib.import_module(f".{component}", package=module_name)
1✔
UNCOV
140
                continue
×
141
            except ModuleNotFoundError:
1✔
142
                pass
1✔
143

144
        object = getattr(object, component)
7✔
145

146
    return object
7✔
147

148

149
class RenkuOOBTree(BTree):
7✔
150
    """Customize ``BTrees.OOBTree.BTree`` implementation."""
151

152
    max_leaf_size = 1000
7✔
153
    max_internal_size = 2000
7✔
154

155

156
class Database:
7✔
157
    """The Metadata Object Database.
158

159
    This class is equivalent to a ``persistent.DataManager`` and implements
160
    the ``persistent.interfaces.IPersistentDataManager`` interface.
161
    """
162

163
    ROOT_OID = "root"
7✔
164

165
    def __init__(self, storage):
7✔
166
        self._storage: Storage = storage
7✔
167
        self._cache = Cache()
7✔
168
        # The pre-cache is used by get to avoid infinite loops when objects load their state
169
        self._pre_cache: Dict[OID_TYPE, persistent.Persistent] = {}
7✔
170
        # Objects added explicitly by add() or when serializing other objects. After commit they are moved to _cache.
171
        self._objects_to_commit: Dict[OID_TYPE, persistent.Persistent] = {}
7✔
172
        self._reader: ObjectReader = ObjectReader(database=self)
7✔
173
        self._writer: ObjectWriter = ObjectWriter(database=self)
7✔
174
        self._root: RenkuOOBTree
7✔
175

176
        self._initialize_root()
7✔
177

178
    @classmethod
7✔
179
    def from_path(cls, path: Union[Path, str]) -> "Database":
7✔
180
        """Create a Storage and Database using the given path.
181

182
        Args:
183
            path(Union[pathlib.Path, str]): The path of the database.
184

185
        Returns:
186
            The database object.
187
        """
188
        storage = Storage(path)
7✔
189
        return Database(storage=storage)
7✔
190

191
    @staticmethod
7✔
192
    def generate_oid(object: persistent.Persistent) -> OID_TYPE:
7✔
193
        """Generate an ``oid`` for a ``persistent.Persistent`` object based on its id.
194

195
        Args:
196
            object(persistent.Persistent): The object to create an oid for.
197

198
        Returns:
199
            An oid for the object.
200
        """
201
        oid = getattr(object, "_p_oid")
7✔
202
        if oid:
7✔
203
            assert isinstance(oid, OID_TYPE)
×
204
            return oid
×
205

206
        id: Optional[str] = getattr(object, "id", None) or getattr(object, "_id", None)
7✔
207
        if id:
7✔
208
            return Database.hash_id(id)
6✔
209

210
        return Database.new_oid()
7✔
211

212
    @staticmethod
7✔
213
    def hash_id(id: str) -> OID_TYPE:
7✔
214
        """Return ``oid`` from id.
215

216
        Args:
217
            id(str): The id to hash.
218

219
        Returns:
220
            OID_TYPE: The hashed id.
221
        """
222
        return hashlib.sha3_256(id.encode("utf-8")).hexdigest()
6✔
223

224
    @staticmethod
7✔
225
    def new_oid():
7✔
226
        """Generate a random ``oid``."""
227
        return f"{uuid4().hex}{uuid4().hex}"
7✔
228

229
    @staticmethod
7✔
230
    def _get_filename_from_oid(oid: OID_TYPE) -> str:
7✔
231
        return oid.lower()
7✔
232

233
    def __getitem__(self, key) -> "Index":
7✔
234
        return self._root[key]
7✔
235

236
    def clear(self):
7✔
237
        """Remove all objects and clear all caches. Objects won't be deleted in the storage."""
238
        self._cache.clear()
7✔
239
        self._pre_cache.clear()
7✔
240
        self._objects_to_commit.clear()
7✔
241
        # NOTE: Clear root at the end because it will be added to _objects_to_commit when `register` is called.
242
        self._root.clear()
7✔
243

244
    def _initialize_root(self):
7✔
245
        """Initialize root object."""
246
        try:
7✔
247
            self._root = cast(RenkuOOBTree, self.get(Database.ROOT_OID))
7✔
248
        except errors.ObjectNotFoundError:
7✔
249
            self._root = RenkuOOBTree()
7✔
250
            self._root._p_oid = Database.ROOT_OID
7✔
251
            self.register(self._root)
7✔
252

253
    def add_index(
7✔
254
        self, name: str, object_type: type, attribute: Optional[str] = None, key_type: Optional[type] = None
255
    ) -> "Index":
256
        """Add an index.
257

258
        Args:
259
            name(str): The name of the index.
260
            object_type(type): The type contained within the index.
261
            attribute(str, optional): The attribute of the contained object to create a key from (Default value = None).
262
            key_type(type, optional): The type of the key (Default value = None).
263

264
        Returns:
265
            Index: The created ``Index`` object.
266
        """
267
        assert name not in self._root, f"Index or object already exists: '{name}'"
7✔
268

269
        index = Index(name=name, object_type=object_type, attribute=attribute, key_type=key_type)
7✔
270
        index._p_jar = self
7✔
271

272
        self._root[name] = index
7✔
273

274
        return index
7✔
275

276
    def add_root_object(self, name: str, obj: Persistent):
7✔
277
        """Add an object to the DB root.
278

279
        Args:
280
            name(str): The key of the object.
281
            obj(Persistent): The object to store.
282
        """
283
        assert name not in self._root, f"Index or object already exists: '{name}'"
7✔
284

285
        obj._p_jar = self
7✔
286
        obj._p_oid = name
7✔
287

288
        self._root[name] = obj
7✔
289

290
    def add(self, object: persistent.Persistent, oid: OID_TYPE):
7✔
291
        """Add a new object to the database.
292

293
        NOTE: Normally, we add objects to indexes but this method adds objects directly to Dataset's root. Use it only
294
        for singleton objects that have no Index defined for them (e.g. Project).
295

296
        Args:
297
            object(persistent.Persistent): The object to add.
298
            oid(OID_TYPE, optional): The oid for the object (Default value = None).
299
        """
300
        assert not oid or isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
×
301
        object._p_oid = oid
×
302

303
        self.register(object)
×
304

305
    def register(self, object: persistent.Persistent):
7✔
306
        """Register a persistent.Persistent object to be stored.
307

308
        NOTE: When a persistent.Persistent object is changed it calls this method.
309

310
        Args:
311
            object(persistent.Persistent): The object to register with the database.
312
        """
313
        assert isinstance(object, persistent.Persistent), f"Cannot add non-Persistent object: '{object}'"
7✔
314

315
        if object._p_oid is None:
7✔
316
            object._p_oid = self.generate_oid(object)
×
317
        elif isinstance(object, Persistent):
7✔
318
            # NOTE: A safety-net to make sure that all objects have correct p_oid
319
            id = getattr(object, "id")
6✔
320
            expected_oid = Database.hash_id(id)
6✔
321
            actual_oid = object._p_oid
6✔
322
            assert actual_oid == expected_oid, f"Object has wrong oid: {actual_oid} != {expected_oid}"
6✔
323

324
        object._p_jar = self
7✔
325
        # object._p_serial = NEW
326
        self._objects_to_commit[object._p_oid] = object
7✔
327

328
    def get(self, oid: OID_TYPE) -> persistent.Persistent:
7✔
329
        """Get the object by ``oid``.
330

331
        Args:
332
            oid(OID_TYPE): The oid of the object to get.
333

334
        Returns:
335
            persistent.Persistent: The object.
336
        """
337
        if oid != Database.ROOT_OID and oid in self._root:  # NOTE: Avoid looping if getting "root"
7✔
338
            return self._root[oid]
5✔
339
        object = self.get_cached(oid)
7✔
340
        if object is not None:
7✔
341
            return object
4✔
342

343
        object = self.get_from_path(path=self._get_filename_from_oid(oid))
7✔
344

345
        if isinstance(object, Persistent):
7✔
346
            object.freeze()
4✔
347

348
        # NOTE: Avoid infinite loop if object tries to load its state before it is added to the cache
349
        self._pre_cache[oid] = object
7✔
350
        self._cache[oid] = object
7✔
351
        self._pre_cache.pop(oid)
7✔
352

353
        return object
7✔
354

355
    def get_from_path(
7✔
356
        self, path: str, absolute: bool = False, override_type: Optional[str] = None
357
    ) -> persistent.Persistent:
358
        """Load a database object from a path.
359

360
        Args:
361
            path(str): Path of the database object.
362
            absolute(bool): Whether the path is absolute or a filename inside the database (Default value = False).
363
            override_type(Optional[str]): load object as a different type than what is set inside `renku_data_type`
364
                (Default value = None).
365
        Returns:
366
            persistent.Persistent: The object.
367
        """
368
        deal.disable(warn=False)
7✔
369
        data = self._storage.load(filename=path, absolute=absolute)
7✔
370
        if override_type is not None:
7✔
371
            if "@renku_data_type" not in data:
4✔
372
                raise errors.IncompatibleParametersError("Cannot override type on found data.")
×
373

374
            data["@renku_data_type"] = override_type
4✔
375
        object = self._reader.deserialize(data)
7✔
376
        object._p_changed = 0
7✔
377
        object._p_serial = PERSISTED
7✔
378
        deal.enable()
7✔
379
        return object
7✔
380

381
    def get_by_id(self, id: str) -> persistent.Persistent:
7✔
382
        """Return an object by its id.
383

384
        Args:
385
            id(str): The id to look up.
386

387
        Returns:
388
            persistent.Persistent: The object with the given id.
389
        """
390
        oid = Database.hash_id(id)
4✔
391
        return self.get(oid)
4✔
392

393
    def get_cached(self, oid: OID_TYPE) -> Optional[persistent.Persistent]:
7✔
394
        """Return an object if it is in the cache or will be committed.
395

396
        Args:
397
            oid(OID_TYPE): The id of the object to look up.
398

399
        Returns:
400
            Optional[persistent.Persistent]: The cached object.
401
        """
402
        object = self._cache.get(oid)
7✔
403
        if object is not None:
7✔
404
            return object
6✔
405

406
        object = self._pre_cache.get(oid)
7✔
407
        if object is not None:
7✔
408
            return object
×
409

410
        object = self._objects_to_commit.get(oid)
7✔
411
        if object is not None:
7✔
412
            return object
×
413

414
        return None
7✔
415

416
    def remove_root_object(self, name: str) -> None:
7✔
417
        """Remove a root object from the database.
418

419
        Args:
420
            name(str): The name of the root object to remove.
421
        """
422
        assert name in self._root, f"Index or object doesn't exist in root: '{name}'"
5✔
423

424
        obj = self.get(name)
5✔
425
        self.remove_from_cache(obj)
5✔
426

427
        del self._root[name]
5✔
428

429
    def new_ghost(self, oid: OID_TYPE, object: persistent.Persistent):
7✔
430
        """Create a new ghost object.
431

432
        Args:
433
            oid(OID_TYPE): The oid of the new ghost object.
434
            object(persistent.Persistent): The object to create a new ghost entry for.
435
        """
436
        object._p_jar = self
7✔
437
        self._cache.new_ghost(oid, object)
7✔
438

439
    def setstate(self, object: persistent.Persistent):
7✔
440
        """Load the state for a ghost object.
441

442
        Args:
443
            object(persistent.Persistent): The object to set the state on.
444
        """
445
        deal.disable(warn=False)
7✔
446
        data = self._storage.load(filename=self._get_filename_from_oid(object._p_oid))
7✔
447
        self._reader.set_ghost_state(object, data)
7✔
448
        object._p_serial = PERSISTED
7✔
449
        if isinstance(object, Persistent):
7✔
450
            object.freeze()
6✔
451
        deal.enable()
7✔
452

453
    def commit(self):
7✔
454
        """Commit modified and new objects."""
455
        while self._objects_to_commit:
7✔
456
            _, object = self._objects_to_commit.popitem()
7✔
457
            if object._p_changed or object._p_serial == NEW:
7✔
458
                self._store_object(object)
7✔
459

460
    def _store_object(self, object: persistent.Persistent):
7✔
461
        data = self._writer.serialize(object)
7✔
462
        compress = False if isinstance(object, (Catalog, RenkuOOBTree, OOBucket, Project, Index)) else True
7✔
463
        self._storage.store(filename=self._get_filename_from_oid(object._p_oid), data=data, compress=compress)
7✔
464

465
        self._cache[object._p_oid] = object
7✔
466

467
        object._p_changed = 0  # NOTE: transition from changed to up-to-date
7✔
468
        object._p_serial = PERSISTED
7✔
469

470
    def persist_to_path(self, object: persistent.Persistent, path: Path):
7✔
471
        """Store an object to path."""
472
        data = self._writer.serialize(object)
×
473
        compress = False if isinstance(object, (Catalog, RenkuOOBTree, OOBucket, Project, Index)) else True
×
474
        self._storage.store(filename=str(path), data=data, compress=compress, absolute=True)
×
475

476
    def remove_from_cache(self, object: persistent.Persistent):
7✔
477
        """Remove an object from cache.
478

479
        Args:
480
            object(persistent.Persistent): The object to remove.
481
        """
482
        oid = object._p_oid
5✔
483

484
        def remove_from(cache):
5✔
485
            existing_entry = cache.get(oid)
5✔
486
            if existing_entry is object:
5✔
487
                cache.pop(oid)
5✔
488

489
        remove_from(self._cache)
5✔
490
        remove_from(self._pre_cache)
5✔
491
        remove_from(self._objects_to_commit)
5✔
492

493
    def readCurrent(self, object):
7✔
494
        """We don't use this method but some Persistent logic require its existence.
495

496
        Args:
497
            object: The object to read.
498
        """
499
        assert object._p_jar is self
7✔
500
        assert object._p_oid is not None
7✔
501

502
    def oldstate(self, object, tid):
7✔
503
        """See ``persistent.interfaces.IPersistentDataManager::oldstate``."""
504
        raise NotImplementedError
×
505

506

507
@implementer(IPickleCache)
7✔
508
class Cache:
7✔
509
    """Database ``Cache``."""
510

511
    def __init__(self):
7✔
512
        self._entries = {}
7✔
513

514
    def __len__(self):
7✔
515
        return len(self._entries)
×
516

517
    def __getitem__(self, oid):
7✔
518
        assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
×
519
        return self._entries[oid]
×
520

521
    def __setitem__(self, oid, object):
7✔
522
        assert isinstance(object, persistent.Persistent), f"Cannot cache non-Persistent objects: '{object}'"
7✔
523
        assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
7✔
524

525
        assert object._p_jar is not None, "Cached object jar missing"
7✔
526
        assert oid == object._p_oid, f"Cache key does not match oid: {oid} != {object._p_oid}"
7✔
527

528
        if oid in self._entries:
7✔
529
            existing_data = self.get(oid)
7✔
530
            if existing_data is not object:
7✔
531
                raise ValueError(f"The same oid exists: {existing_data} != {object}")
×
532

533
        self._entries[oid] = object
7✔
534

535
    def __delitem__(self, oid):
7✔
536
        assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
×
537
        self._entries.pop(oid)
×
538

539
    def clear(self):
7✔
540
        """Remove all entries."""
541
        self._entries.clear()
7✔
542

543
    def pop(self, oid, default=MARKER):
7✔
544
        """Remove and return an object.
545

546
        Args:
547
            oid: The oid of the object to remove from the cache.
548
            default: Default value to return (Default value = MARKER).
549
        Raises:
550
            KeyError: If object wasn't found and no default was given.
551
        Returns:
552
            The removed object or the default value if it doesn't exist.
553
        """
554
        return self._entries.pop(oid) if default is MARKER else self._entries.pop(oid, default)
5✔
555

556
    def get(self, oid, default=None):
7✔
557
        """See ``IPickleCache``.
558

559
        Args:
560
            oid: The oid of the object to get.
561
            default: Default value to return if object wasn't found (Default value = None).
562

563
        Returns:
564
            The object or default value if the object wasn't found.
565
        """
566
        assert isinstance(oid, OID_TYPE), f"Invalid oid type: '{type(oid)}'"
7✔
567
        return self._entries.get(oid, default)
7✔
568

569
    def new_ghost(self, oid, object):
7✔
570
        """See ``IPickleCache``."""
571
        assert object._p_oid is None, f"Object already has an oid: {object}"
7✔
572
        assert object._p_jar is not None, f"Object does not have a jar: {object}"
7✔
573
        assert oid not in self._entries, f"Duplicate oid: {oid}"
7✔
574

575
        object._p_oid = oid
7✔
576
        if object._p_state != GHOST:
7✔
577
            object._p_invalidate()
7✔
578

579
        self[oid] = object
7✔
580

581

582
class Index(persistent.Persistent):
7✔
583
    """Database index."""
584

585
    # NOTE: If this field isn't None, we use it as the index-attribute instead of ``_attribute``. This is used to avoid
586
    # creating a migration when the index-attribute changes.
587
    _v_main_attribute: Optional[str] = None
7✔
588

589
    def __init__(self, *, name: str, object_type, attribute: Optional[str], key_type=None):
7✔
590
        """Create an index where keys are extracted using ``attribute`` from an object or a key.
591

592
        Args:
593
            name (str): Index's name.
594
            object_type: Type of objects that the index points to.
595
            attribute (Optional[str], optional): Name of an attribute to be used to automatically generate a key
596
                (e.g. `entity.path`).
597
            key_type: Type of keys. If not None then a key must be provided when updating the index
598
                (Default value = None).
599
        """
600
        assert name == name.lower(), f"Index name must be all lowercase: '{name}'."
7✔
601

602
        super().__init__()
7✔
603

604
        self._p_oid = f"{name}-index"
7✔
605
        self._name: str = name
7✔
606
        self._object_type = object_type
7✔
607
        self._key_type = key_type
7✔
608
        self._attribute: Optional[str] = attribute
7✔
609
        self._entries: RenkuOOBTree = RenkuOOBTree()
7✔
610
        self._entries._p_oid = name
7✔
611

612
    def __len__(self):
7✔
613
        return len(self._entries)
×
614

615
    def __contains__(self, key):
7✔
616
        return key in self._entries
×
617

618
    def __getitem__(self, key):
7✔
619
        return self._entries[key]
1✔
620

621
    def __setitem__(self, key, value):
7✔
622
        # NOTE: if Index is using a key object then we cannot check if key is valid. It's safer to use `add` method
623
        # instead of setting values directly.
624
        self._verify_and_get_key(object=value, key_object=None, key=key, missing_key_object_ok=True)
1✔
625

626
        self._entries[key] = value
1✔
627

628
    def __getstate__(self):
7✔
629
        return {
7✔
630
            "name": self._name,
631
            "object_type": get_type_name(self._object_type),
632
            "key_type": get_type_name(self._key_type),
633
            "attribute": self._attribute,
634
            "entries": self._entries,
635
        }
636

637
    def __setstate__(self, data):
7✔
638
        self._name = data.pop("name")
7✔
639
        self._object_type = get_class(data.pop("object_type"))
7✔
640
        self._key_type = get_class(data.pop("key_type"))
7✔
641
        self._attribute = data.pop("attribute")
7✔
642
        self._entries = data.pop("entries")
7✔
643

644
    def __iter__(self):
7✔
645
        return self._entries.__iter__()
×
646

647
    def __repr__(self) -> str:
7✔
648
        return f"<Index {self.name} on {self._object_type.__name__}.{self._v_main_attribute or self._attribute}>"
×
649

650
    @property
7✔
651
    def name(self) -> str:
7✔
652
        """Return Index's name."""
653
        return self._name
1✔
654

655
    @property
7✔
656
    def object_type(self) -> type:
7✔
657
        """Return Index's object_type."""
658
        return self._object_type
×
659

660
    def get(self, key, default=None):
7✔
661
        """Return an entry based on its key.
662

663
        Args:
664
            key: The key of the entry to get.
665
            default: Default value to return of entry wasn't found (Default value = None).
666

667
        Returns:
668
            The found entry or the default value if it wasn't found.
669
        """
670
        return self._entries.get(key, default)
6✔
671

672
    def pop(self, key, default=MARKER):
7✔
673
        """Remove and return an object.
674

675
        Args:
676
            key: The key of the entry to remove.
677
            default: Default value to return of entry wasn't found (Default value = MARKER).
678

679
        Returns:
680
            The removed entry or the default value if it wasn't found.
681
        """
682
        if not key:
6✔
683
            return
×
684
        return self._entries.pop(key) if default is MARKER else self._entries.pop(key, default)
6✔
685

686
    def keys(self, min=None, max=None, excludemin=False, excludemax=False):
7✔
687
        """Return an iterator of keys."""
688
        return self._entries.keys(min=min, max=max, excludemin=excludemin, excludemax=excludemax)
1✔
689

690
    def values(self):
7✔
691
        """Return an iterator of values."""
692
        return self._entries.values()
7✔
693

694
    def items(self):
7✔
695
        """Return an iterator of keys and values."""
696
        return self._entries.items()
4✔
697

698
    def add(self, object: persistent.Persistent, *, key: Optional[str] = None, key_object=None, verify=True):
7✔
699
        """Update index with object.
700

701
        If `Index._attribute` is not None then key is automatically generated.
702
        Key is extracted from `key_object` if it is not None; otherwise, it's extracted from `object`.
703

704
        Args:
705
            object(persistent.Persistent): Object to add.
706
            key(Optional[str], optional): Key to use in the index (Default value = None).
707
            key_object: Object to use to extract a key from (Default value = None).
708
            verify: Whether to check if the key is valid (Default value = True).
709
        """
710
        assert isinstance(object, self._object_type), f"Cannot add objects of type '{type(object)}'"
6✔
711
        key = self._verify_and_get_key(
6✔
712
            object=object, key_object=key_object, key=key, missing_key_object_ok=False, verify=verify
713
        )
714
        self._entries[key] = object
6✔
715

716
    def remove(self, object: persistent.Persistent, *, key: Optional[str] = None, key_object=None, verify=True):
7✔
717
        """Remove object from the index.
718

719
        If `Index._attribute` is not None then key is automatically generated.
720
        Key is extracted from `key_object` if it is not None; otherwise, it's extracted from `object`.
721

722
        Args:
723
            object(persistent.Persistent): Object to add.
724
            key(Optional[str], optional): Key to use in the index (Default value = None).
725
            key_object: Object to use to extract a key from (Default value = None).
726
            verify: Whether to check if the key is valid (Default value = True).
727
        """
728
        assert isinstance(object, self._object_type), f"Cannot remove objects of type '{type(object)}'"
1✔
729
        key = self._verify_and_get_key(
1✔
730
            object=object, key_object=key_object, key=key, missing_key_object_ok=False, verify=verify
731
        )
732
        del self._entries[key]
1✔
733

734
    def generate_key(self, object: persistent.Persistent, *, key_object=None):
7✔
735
        """Return index key for an object.
736

737
        Key is extracted from `key_object` if it is not None; otherwise, it's extracted from `object`.
738

739
        Args:
740
            object(persistent.Persistent): The object to generate a key for.
741
            key_object: The object to derive a key from (Default value = None).
742

743
        Returns:
744
            A key for object.
745
        """
746
        return self._verify_and_get_key(object=object, key_object=key_object, key=None, missing_key_object_ok=False)
1✔
747

748
    def _verify_and_get_key(
7✔
749
        self, *, object: persistent.Persistent, key_object, key, missing_key_object_ok, verify=True
750
    ):
751
        if self._key_type:
6✔
752
            if not missing_key_object_ok:
1✔
753
                assert isinstance(key_object, self._key_type), f"Invalid key type: {type(key_object)} for '{self.name}'"
1✔
754
        else:
755
            assert key_object is None, f"Index '{self.name}' does not accept 'key_object'"
6✔
756

757
        attribute = self._v_main_attribute or self._attribute
6✔
758

759
        if attribute:
6✔
760
            key_object = key_object or object
6✔
761
            correct_key = get_attribute(key_object, attribute)
6✔
762
            if key is not None:
6✔
763
                if verify:
1✔
764
                    assert key == correct_key, f"Incorrect key for index '{self.name}': '{key}' != '{correct_key}'"
1✔
765
                else:
766
                    correct_key = key
×
767
        else:
768
            assert key is not None, "No key is provided"
4✔
769
            correct_key = key
4✔
770

771
        return correct_key
6✔
772

773

774
class Storage:
7✔
775
    """Store Persistent objects on the disk."""
776

777
    OID_FILENAME_LENGTH = 64
7✔
778

779
    def __init__(self, path: Union[Path, str]):
7✔
780
        self.path = Path(path)
7✔
781
        self.zstd_compressor = zstd.ZstdCompressor()
7✔
782
        self.zstd_decompressor = zstd.ZstdDecompressor()
7✔
783

784
    def store(self, filename: str, data: Union[Dict, List], compress=False, absolute: bool = False):
7✔
785
        """Store object.
786

787
        Args:
788
            filename(str): Target file name to store data in.
789
            data(Union[Dict, List]): The data to store.
790
            compress(bool): Whether to compress the data or store it as plain json (Default value = False).
791
            absolute(bool): Whether filename is an absolute path (Default value = False).
792
        """
793
        assert isinstance(filename, str)
7✔
794

795
        if absolute:
7✔
796
            path = Path(filename)
×
797
        else:
798
            is_oid_path = len(filename) == Storage.OID_FILENAME_LENGTH
7✔
799
            if is_oid_path:
7✔
800
                path = self.path / filename[0:2] / filename[2:4] / filename
7✔
801
                path.parent.mkdir(parents=True, exist_ok=True)
7✔
802
            else:
803
                path = self.path / filename
7✔
804
                self.path.mkdir(parents=True, exist_ok=True)
7✔
805

806
        if compress:
7✔
807
            with open(path, "wb") as fb, self.zstd_compressor.stream_writer(fb) as compressor:
7✔
808
                with io.TextIOWrapper(compressor) as out:
7✔
809
                    json.dump(data, out, ensure_ascii=False)
7✔
810
        else:
811
            with open(path, "w") as ft:
7✔
812
                json.dump(data, ft, ensure_ascii=False, sort_keys=True, indent=2)
7✔
813

814
    def load(self, filename: str, absolute: bool = False):
7✔
815
        """Load data for object with object id oid.
816

817
        Args:
818
            filename(str): The file name of the data to load.
819
            absolute(bool): Whether the path is absolute or a filename inside the database (Default value: False).
820
        Returns:
821
            The loaded data in dictionary form.
822
        """
823
        assert isinstance(filename, str)
7✔
824

825
        if absolute:
7✔
826
            path = Path(filename)
×
827
        else:
828
            is_oid_path = len(filename) == Storage.OID_FILENAME_LENGTH
7✔
829
            if is_oid_path:
7✔
830
                path = self.path / filename[0:2] / filename[2:4] / filename
7✔
831
            else:
832
                path = self.path / filename
7✔
833

834
        if not path.exists():
7✔
835
            raise errors.ObjectNotFoundError(filename)
7✔
836

837
        with open(path, "rb") as file:
7✔
838
            header = int.from_bytes(file.read(4), "little")
7✔
839
            file.seek(0)
7✔
840
            if header == zstd.MAGIC_NUMBER:
7✔
841
                with self.zstd_decompressor.stream_reader(file) as zfile:
7✔
842
                    data = json.load(zfile)
7✔
843
            else:
844
                try:
7✔
845
                    data = json.load(file)
7✔
846
                except json.JSONDecodeError:
1✔
847
                    raise errors.MetadataCorruptError(path)
1✔
848
        return data
7✔
849

850

851
class ObjectWriter:
7✔
852
    """Serialize objects for storage in storage."""
853

854
    def __init__(self, database: Database):
7✔
855
        self._database: Database = database
7✔
856

857
    def serialize(self, object: persistent.Persistent):
7✔
858
        """Convert an object to JSON.
859

860
        Args:
861
            object(persistent.Persistent): Object to serialize.
862

863
        Returns:
864
            dict: Dictionary containing serialized data.
865
        """
866
        assert isinstance(object, persistent.Persistent), f"Cannot serialize object of type '{type(object)}': {object}"
7✔
867
        assert object._p_oid, f"Object does not have an oid: '{object}'"
7✔
868
        assert object._p_jar is not None, f"Object is not associated with a Database: '{object}'"
7✔
869

870
        self._serialization_cache: Dict[int, Any] = {}
7✔
871
        state = object.__getstate__()
7✔
872
        was_dict = isinstance(state, dict)
7✔
873
        data = self._serialize_helper(state)
7✔
874
        is_dict = isinstance(data, dict)
7✔
875

876
        if not is_dict or (is_dict and not was_dict):
7✔
877
            data = {"@renku_data_value": data}
7✔
878

879
        data["@renku_data_type"] = get_type_name(object)
7✔
880
        data["@renku_oid"] = object._p_oid
7✔
881

882
        return data
7✔
883

884
    def _serialize_helper(self, obj):
7✔
885
        # TODO: Raise an error if an unsupported object is being serialized
886
        if obj is None:
7✔
887
            return None
7✔
888
        elif isinstance(obj, (int, float, str, bool)):
7✔
889
            return obj
7✔
890
        elif isinstance(obj, list):
7✔
891
            return [self._serialize_helper(value) for value in obj]
7✔
892
        elif isinstance(obj, set):
7✔
893
            return {
×
894
                "@renku_data_type": SET_TYPE,
895
                "@renku_data_value": [self._serialize_helper(value) for value in obj],
896
            }
897
        elif isinstance(obj, frozenset):
7✔
898
            return {
×
899
                "@renku_data_type": FROZEN_SET_TYPE,
900
                "@renku_data_value": [self._serialize_helper(value) for value in obj],
901
            }
902
        elif isinstance(obj, dict):
7✔
903
            result = dict()
7✔
904
            items = sorted(obj.items(), key=lambda x: x[0])
7✔
905
            for key, value in items:
7✔
906
                result[key] = self._serialize_helper(value)
7✔
907
            return result
7✔
908
        elif isinstance(obj, Index):
7✔
909
            # NOTE: Index objects are not stored as references and are included in their parent object (i.e. root)
910
            state = obj.__getstate__()
7✔
911
            state = self._serialize_helper(state)
7✔
912
            return {"@renku_data_type": get_type_name(obj), "@renku_oid": obj._p_oid, **state}
7✔
913
        elif isinstance(obj, (OOTreeSet, Length, OOSet)):
7✔
914
            state = obj.__getstate__()
7✔
915
            state = self._serialize_helper(state)
7✔
916
            return {"@renku_data_type": get_type_name(obj), "@renku_data_value": state}
7✔
917
        elif isinstance(obj, persistent.Persistent):
7✔
918
            if not obj._p_oid:
7✔
919
                obj._p_oid = Database.generate_oid(obj)
7✔
920
            if obj._p_state not in [GHOST, UPTODATE] or (obj._p_state == UPTODATE and obj._p_serial == NEW):
7✔
921
                self._database.register(obj)
7✔
922
            return {"@renku_data_type": get_type_name(obj), "@renku_oid": obj._p_oid, "@renku_reference": True}
7✔
923
        elif isinstance(obj, datetime.datetime):
7✔
924
            value = obj.isoformat()
7✔
925
        elif isinstance(obj, tuple):
7✔
926
            value = tuple(self._serialize_helper(value) for value in obj)
7✔
927
        elif isinstance(obj, (InterfaceClass)):
7✔
928
            # NOTE: Zope interfaces are weird, they're a class with type InterfaceClass, but need to be deserialized
929
            # as the class (without instantiation)
930
            return {"@renku_data_type": TYPE_TYPE, "@renku_data_value": f"{obj.__module__}.{obj.__name__}"}
7✔
931
        elif isinstance(obj, type):
7✔
932
            # NOTE: We're storing a type, not an instance
933
            return {"@renku_data_type": TYPE_TYPE, "@renku_data_value": get_type_name(obj)}
7✔
934
        elif isinstance(obj, (FunctionType, BuiltinFunctionType)):
7✔
935
            name = obj.__name__
7✔
936
            module = getattr(obj, "__module__", None)
7✔
937
            return {"@renku_data_type": FUNCTION_TYPE, "@renku_data_value": f"{module}.{name}"}
7✔
938
        else:
939
            if id(obj) in self._serialization_cache:
7✔
940
                # NOTE: We already serialized this -> circular/repeat reference
941
                return {"@renku_data_type": REFERENCE_TYPE, "@renku_data_value": self._serialization_cache[id(obj)]}
4✔
942

943
            # NOTE: The reference used for circular reference is just the position in the serialization cache,
944
            # as the order is deterministic So the order in which objects are encoutered is their id for referencing.
945
            self._serialization_cache[id(obj)] = len(self._serialization_cache)
7✔
946
            if hasattr(obj, "__getstate__"):
7✔
947
                # NOTE: On Python 3.11+ this just returns __dict__ if __getstate__ isn't implemented.
948
                value = obj.__getstate__().copy()
7✔
949
            else:
950
                value = obj.__dict__.copy()
7✔
951
            value = {k: v for k, v in value.items() if not k.startswith("_v_")}
7✔
952
            value = self._serialize_helper(value)
7✔
953

954
        return {"@renku_data_type": get_type_name(obj), "@renku_data_value": value}
7✔
955

956

957
class ObjectReader:
7✔
958
    """Deserialize objects loaded from storage."""
959

960
    def __init__(self, database: Database):
7✔
961
        self._classes: Dict[str, type] = {}
7✔
962
        self._database = database
7✔
963

964
        # a cache for normal (non-persistent objects with an id) to deduplicate them on load
965
        self._normal_object_cache: Dict[str, Any] = {}
7✔
966
        self._deserialization_cache: List[Any] = []
7✔
967

968
    def _get_class(self, type_name: str) -> Optional[type]:
7✔
969
        cls = self._classes.get(type_name)
7✔
970
        if cls:
7✔
971
            return cls
7✔
972

973
        cls = get_class(type_name)
7✔
974

975
        if cls is None:
7✔
976
            return None
×
977

978
        self._classes[type_name] = cls
7✔
979
        return cls
7✔
980

981
    def set_ghost_state(self, object: persistent.Persistent, data: Dict):
7✔
982
        """Set state of a Persistent ghost object.
983

984
        Args:
985
            object(persistent.Persistent): Object to set state on.
986
            data(Dict): State to set on the object.
987
        """
988
        previous_cache = self._deserialization_cache
7✔
989
        self._deserialization_cache = []
7✔
990

991
        state = self._deserialize_helper(data, create=False)
7✔
992
        object.__setstate__(state)
7✔
993

994
        self._deserialization_cache = previous_cache
7✔
995

996
    def deserialize(self, data):
7✔
997
        """Convert JSON to Persistent object.
998

999
        Args:
1000
            data: Data to deserialize.
1001

1002
        Returns:
1003
            Deserialized object.
1004
        """
1005
        oid = data["@renku_oid"]
7✔
1006

1007
        self._deserialization_cache = []
7✔
1008

1009
        object = self._deserialize_helper(data)
7✔
1010

1011
        object._p_oid = oid
7✔
1012
        object._p_jar = self._database
7✔
1013

1014
        return object
7✔
1015

1016
    def _deserialize_helper(self, data, create=True):
7✔
1017
        if data is None:
7✔
1018
            return None
7✔
1019
        elif isinstance(data, (int, float, str, bool)):
7✔
1020
            return data
7✔
1021
        elif isinstance(data, list):
7✔
1022
            return [self._deserialize_helper(value) for value in data]
7✔
1023
        else:
1024
            assert isinstance(data, dict), f"Data must be a dict: '{type(data)}'"
7✔
1025

1026
            if "@renku_data_type" not in data:  # NOTE: A normal dict value
7✔
1027
                assert "@renku_oid" not in data
7✔
1028
                items = sorted(data.items(), key=lambda x: x[0])
7✔
1029
                for key, value in items:
7✔
1030
                    data[key] = self._deserialize_helper(value)
7✔
1031
                return data
7✔
1032

1033
            object_type = data.pop("@renku_data_type")
7✔
1034
            if object_type in (TYPE_TYPE, FUNCTION_TYPE):
7✔
1035
                # NOTE: if we stored a type (not instance), return the type
1036
                return self._get_class(data["@renku_data_value"])
7✔
1037
            elif object_type == REFERENCE_TYPE:
7✔
1038
                # NOTE: we had a circular reference, we return the (not yet finalized) class here
1039
                return self._deserialization_cache[data["@renku_data_value"]]
4✔
1040
            elif object_type == SET_TYPE:
7✔
1041
                return {self._deserialize_helper(value) for value in data["@renku_data_value"]}
×
1042
            elif object_type == FROZEN_SET_TYPE:
7✔
1043
                return frozenset([self._deserialize_helper(value) for value in data["@renku_data_value"]])
×
1044

1045
            cls = self._get_class(object_type)
7✔
1046

1047
            if cls is None:
7✔
1048
                raise TypeError(f"Couldn't find class '{object_type}'")
×
1049

1050
            if issubclass(cls, datetime.datetime):
7✔
1051
                assert create
7✔
1052
                data = data["@renku_data_value"]
7✔
1053
                return datetime.datetime.fromisoformat(data)
7✔
1054
            elif issubclass(cls, tuple):
7✔
1055
                data = data["@renku_data_value"]
7✔
1056
                return tuple(self._deserialize_helper(value) for value in data)
7✔
1057

1058
            oid: str = data.pop("@renku_oid", None)
7✔
1059
            if oid:
7✔
1060
                assert isinstance(oid, str)
7✔
1061

1062
                if "@renku_reference" in data and data["@renku_reference"]:  # A reference
7✔
1063
                    assert create, f"Cannot deserialize a reference without creating an instance {data}"
7✔
1064
                    new_object = self._database.get_cached(oid)
7✔
1065
                    if new_object is not None:
7✔
1066
                        return new_object
6✔
1067
                    assert issubclass(cls, persistent.Persistent)
7✔
1068
                    new_object = cls.__new__(cls)
7✔
1069
                    self._database.new_ghost(oid, new_object)
7✔
1070
                    return new_object
7✔
1071
                elif issubclass(cls, Index):
7✔
1072
                    new_object = self._database.get_cached(oid)
7✔
1073
                    if new_object:
7✔
1074
                        return new_object
×
1075
                    new_object = cls.__new__(cls)
7✔
1076
                    new_object._p_oid = oid
7✔
1077
                    self.set_ghost_state(new_object, data)
7✔
1078
                    return new_object
7✔
1079

1080
            if "@renku_data_value" in data:
7✔
1081
                data = data["@renku_data_value"]
7✔
1082

1083
            if not create:
7✔
1084
                data = self._deserialize_helper(data)
7✔
1085
                return data
7✔
1086

1087
            if issubclass(cls, persistent.Persistent):
7✔
1088
                new_object = cls.__new__(cls)
7✔
1089
                new_object._p_oid = oid  # type: ignore[attr-defined]
7✔
1090
                self.set_ghost_state(new_object, data)
7✔
1091
            elif issubclass(cls, Enum):
7✔
1092
                # NOTE: Enum replaces __new__ on classes with its own versions that validates entries
1093
                new_object = cls.__new__(cls, data["_value_"])
×
1094
                return new_object
×
1095
            else:
1096
                new_object = cls.__new__(cls)
7✔
1097

1098
                # NOTE: we deserialize in the same order as we serialized, so the two stacks here match
1099
                self._deserialization_cache.append(new_object)
7✔
1100
                cache_index = len(self._deserialization_cache) - 1
7✔
1101

1102
                data = self._deserialize_helper(data)
7✔
1103
                assert isinstance(data, dict)
7✔
1104

1105
                if "id" in data and data["id"] in self._normal_object_cache:
7✔
1106
                    existing_object = self._normal_object_cache[data["id"]]
6✔
1107

1108
                    # NOTE: replace uninitialized object in cache with actual object
1109
                    self._deserialization_cache[cache_index] = existing_object
6✔
1110
                    return existing_object
6✔
1111

1112
                if hasattr(new_object, "__setstate__"):
7✔
1113
                    new_object.__setstate__(data)
7✔
1114
                else:
1115
                    for name, value in data.items():
6✔
1116
                        object.__setattr__(new_object, name, value)
6✔
1117

1118
                if issubclass(cls, Immutable):
7✔
1119
                    new_object = cls.make_instance(new_object)
6✔
1120

1121
                if "id" in data and isinstance(data["id"], str) and data["id"].startswith("/"):
7✔
1122
                    self._normal_object_cache[data["id"]] = new_object
6✔
1123

1124
            return new_object
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc