• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 5595761575

pending completion
5595761575

push

github

web-flow
Drop support for Python < 3.7 (#386)

* Bumped version for breaking release.

* Drop support for Python 2.7, 3.5, 3.6.

---------

Co-authored-by: Jens Vagelpohl <jens@plyp.com>

2877 of 4050 branches covered (71.04%)

554 of 554 new or added lines in 89 files covered. (100.0%)

13323 of 15914 relevant lines covered (83.72%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.55
/src/ZODB/tests/testFileStorage.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
import os
1✔
15

16

17
if os.environ.get('USE_ZOPE_TESTING_DOCTEST'):
1!
18
    from zope.testing import doctest
×
19
else:
20
    import doctest
1✔
21

22
import sys
1✔
23
import unittest
1✔
24

25
import transaction
1✔
26
import zope.testing.setupstack
1✔
27

28
import ZODB.FileStorage
1✔
29
import ZODB.tests.hexstorage
1✔
30
import ZODB.tests.testblob
1✔
31
from ZODB import DB
1✔
32
from ZODB import POSException
1✔
33
from ZODB._compat import _protocol
1✔
34
from ZODB._compat import dump
1✔
35
from ZODB._compat import dumps
1✔
36
from ZODB.Connection import TransactionMetaData
1✔
37
from ZODB.fsIndex import fsIndex
1✔
38
from ZODB.tests import BasicStorage
1✔
39
from ZODB.tests import ConflictResolution
1✔
40
from ZODB.tests import Corruption
1✔
41
from ZODB.tests import HistoryStorage
1✔
42
from ZODB.tests import IteratorStorage
1✔
43
from ZODB.tests import MTStorage
1✔
44
from ZODB.tests import PackableStorage
1✔
45
from ZODB.tests import PersistentStorage
1✔
46
from ZODB.tests import ReadOnlyStorage
1✔
47
from ZODB.tests import RecoveryStorage
1✔
48
from ZODB.tests import RevisionStorage
1✔
49
from ZODB.tests import StorageTestBase
1✔
50
from ZODB.tests import Synchronization
1✔
51
from ZODB.tests import TransactionalUndoStorage
1✔
52
from ZODB.tests.StorageTestBase import MinPO
1✔
53
from ZODB.tests.StorageTestBase import zodb_pickle
1✔
54
from ZODB.utils import U64
1✔
55
from ZODB.utils import load_current
1✔
56
from ZODB.utils import p64
1✔
57
from ZODB.utils import z64
1✔
58

59
from . import util
1✔
60

61

62
class FileStorageTests(
1✔
63
    StorageTestBase.StorageTestBase,
64
    BasicStorage.BasicStorage,
65
    TransactionalUndoStorage.TransactionalUndoStorage,
66
    RevisionStorage.RevisionStorage,
67
    PackableStorage.PackableStorageWithOptionalGC,
68
    PackableStorage.PackableUndoStorage,
69
    Synchronization.SynchronizedStorage,
70
    ConflictResolution.ConflictResolvingStorage,
71
    ConflictResolution.ConflictResolvingTransUndoStorage,
72
    HistoryStorage.HistoryStorage,
73
    IteratorStorage.IteratorStorage,
74
    IteratorStorage.ExtendedIteratorStorage,
75
    PersistentStorage.PersistentStorage,
76
    MTStorage.MTStorage,
77
    ReadOnlyStorage.ReadOnlyStorage
78
):
79

80
    use_extension_bytes = True
1✔
81

82
    def open(self, **kwargs):
1✔
83
        self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
1✔
84
                                                     **kwargs)
85

86
    def setUp(self):
1✔
87
        StorageTestBase.StorageTestBase.setUp(self)
1✔
88
        self.open(create=1)
1✔
89

90
    def testLongMetadata(self):
1✔
91
        s = "X" * 75000
1✔
92
        try:
1✔
93
            self._dostore(user=s)
1✔
94
        except POSException.StorageError:
1✔
95
            pass
1✔
96
        else:
97
            self.fail("expect long user field to raise error")
98
        try:
1✔
99
            self._dostore(description=s)
1✔
100
        except POSException.StorageError:
1✔
101
            pass
1✔
102
        else:
103
            self.fail("expect long description field to raise error")
104
        try:
1✔
105
            self._dostore(extension={s: 1})
1✔
106
        except POSException.StorageError:
1✔
107
            pass
1✔
108
        else:
109
            self.fail("expect long extension field to raise error")
110

111
    def test_use_fsIndex(self):
1✔
112

113
        self.assertEqual(self._storage._index.__class__, fsIndex)
1✔
114

115
    # A helper for checking that when an .index contains a dict for the
116
    # index, it's converted to an fsIndex when the file is opened.
117
    def convert_index_to_dict(self):
1✔
118
        # Convert the index in the current .index file to a Python dict.
119
        # Return the index originally found.
120
        data = fsIndex.load('FileStorageTests.fs.index')
1✔
121
        index = data['index']
1✔
122

123
        newindex = dict(index)
1✔
124
        data['index'] = newindex
1✔
125

126
        with open('FileStorageTests.fs.index', 'wb') as fp:
1✔
127
            dump(data, fp, _protocol)
1✔
128
        return index
1✔
129

130
    def test_conversion_to_fsIndex(self, read_only=False):
1✔
131
        from ZODB.fsIndex import fsIndex
1✔
132

133
        # Create some data, and remember the index.
134
        for i in range(10):
1✔
135
            self._dostore()
1✔
136
        oldindex_as_dict = dict(self._storage._index)
1✔
137

138
        # Save the index.
139
        self._storage.close()
1✔
140

141
        # Convert it to a dict.
142
        old_index = self.convert_index_to_dict()
1✔
143
        self.assertTrue(isinstance(old_index, fsIndex))
1✔
144
        new_index = self.convert_index_to_dict()
1✔
145
        self.assertTrue(isinstance(new_index, dict))
1✔
146

147
        # Verify it's converted to fsIndex in memory upon open.
148
        self.open(read_only=read_only)
1✔
149
        self.assertTrue(isinstance(self._storage._index, fsIndex))
1✔
150

151
        # Verify it has the right content.
152
        newindex_as_dict = dict(self._storage._index)
1✔
153
        self.assertEqual(oldindex_as_dict, newindex_as_dict)
1✔
154

155
        # Check that the type on disk has changed iff read_only is False.
156
        self._storage.close()
1✔
157
        current_index = self.convert_index_to_dict()
1✔
158
        if read_only:
1✔
159
            self.assertTrue(isinstance(current_index, dict))
1✔
160
        else:
161
            self.assertTrue(isinstance(current_index, fsIndex))
1✔
162

163
    def test_conversion_to_fsIndex_readonly(self):
1✔
164
        # Same thing, but the disk .index should continue to hold a
165
        # Python dict.
166
        self.test_conversion_to_fsIndex(read_only=True)
1✔
167

168
    def test_conversion_from_dict_to_btree_data_in_fsIndex(self):
1✔
169
        # To support efficient range searches on its keys as part of
170
        # implementing a record iteration protocol in FileStorage, we
171
        # converted the fsIndex class from using a dictionary as its
172
        # self._data attribute to using an OOBTree in its stead.
173

174
        from BTrees.OOBTree import OOBTree
1✔
175

176
        from ZODB.fsIndex import fsIndex
1✔
177

178
        # Create some data, and remember the index.
179
        for i in range(10):
1✔
180
            self._dostore()
1✔
181
        data_dict = dict(self._storage._index._data)
1✔
182

183
        # Replace the OOBTree with a dictionary and commit it.
184
        self._storage._index._data = data_dict
1✔
185
        transaction.commit()
1✔
186

187
        # Save the index.
188
        self._storage.close()
1✔
189

190
        # Verify it's converted to fsIndex in memory upon open.
191
        self.open()
1✔
192
        self.assertTrue(isinstance(self._storage._index, fsIndex))
1✔
193
        self.assertTrue(isinstance(self._storage._index._data, OOBTree))
1✔
194

195
        # Verify it has the right content.
196
        new_data_dict = dict(self._storage._index._data)
1✔
197
        self.assertEqual(len(data_dict), len(new_data_dict))
1✔
198

199
        for k in data_dict:
1✔
200
            old_tree = data_dict[k]
1✔
201
            new_tree = new_data_dict[k]
1✔
202
            self.assertEqual(list(old_tree.items()), list(new_tree.items()))
1✔
203

204
    def test_save_after_load_with_no_index(self):
1✔
205
        for i in range(10):
1✔
206
            self._dostore()
1✔
207
        self._storage.close()
1✔
208
        os.remove('FileStorageTests.fs.index')
1✔
209
        self.open()
1✔
210
        self.assertEqual(self._storage._saved, 1)
1✔
211

212
    def testStoreBumpsOid(self):
1✔
213
        # If .store() is handed an oid bigger than the storage knows
214
        # about already, it's crucial that the storage bump its notion
215
        # of the largest oid in use.
216
        t = TransactionMetaData()
1✔
217
        self._storage.tpc_begin(t)
1✔
218
        giant_oid = b'\xee' * 8
1✔
219
        # Store an object.
220
        # oid, serial, data, version, transaction
221
        self._storage.store(giant_oid, b'\0'*8, b'data', b'', t)
1✔
222
        # Finish the transaction.
223
        self._storage.tpc_vote(t)
1✔
224
        self._storage.tpc_finish(t)
1✔
225
        # Before ZODB 3.2.6, this failed, with ._oid == z64.
226
        self.assertEqual(self._storage._oid, giant_oid)
1✔
227

228
    def testRestoreBumpsOid(self):
1✔
229
        # As above, if .restore() is handed an oid bigger than the storage
230
        # knows about already, it's crucial that the storage bump its notion
231
        # of the largest oid in use.  Because copyTransactionsFrom(), and
232
        # ZRS recovery, use the .restore() method, this is plain critical.
233
        t = TransactionMetaData()
1✔
234
        self._storage.tpc_begin(t)
1✔
235
        giant_oid = b'\xee' * 8
1✔
236
        # Store an object.
237
        # oid, serial, data, version, prev_txn, transaction
238
        self._storage.restore(giant_oid, b'\0'*8, b'data', b'', None, t)
1✔
239
        # Finish the transaction.
240
        self._storage.tpc_vote(t)
1✔
241
        self._storage.tpc_finish(t)
1✔
242
        # Before ZODB 3.2.6, this failed, with ._oid == z64.
243
        self.assertEqual(self._storage._oid, giant_oid)
1✔
244

245
    def testCorruptionInPack(self):
1✔
246
        # This sets up a corrupt .fs file, with a redundant transaction
247
        # length mismatch.  The implementation of pack in many releases of
248
        # ZODB blew up if the .fs file had such damage:  it detected the
249
        # damage, but the code to raise CorruptedError referenced an undefined
250
        # global.
251
        import time
1✔
252

253
        from ZODB.FileStorage.format import CorruptedError
1✔
254
        from ZODB.serialize import referencesf
1✔
255

256
        db = DB(self._storage)
1✔
257
        conn = db.open()
1✔
258
        conn.root()['xyz'] = 1
1✔
259
        transaction.commit()
1✔
260

261
        # Ensure it's all on disk.
262
        db.close()
1✔
263
        self._storage.close()
1✔
264

265
        # Reopen before damaging.
266
        self.open()
1✔
267

268
        # Open .fs directly, and damage content.
269
        with open('FileStorageTests.fs', 'r+b') as f:
1✔
270
            f.seek(0, 2)
1✔
271
            pos2 = f.tell() - 8
1✔
272
            f.seek(pos2)
1✔
273
            tlen2 = U64(f.read(8))  # length-8 of the last transaction
1✔
274
            pos1 = pos2 - tlen2 + 8  # skip over the tid at the start
1✔
275
            f.seek(pos1)
1✔
276
            tlen1 = U64(f.read(8))  # should be redundant length-8
1✔
277
            self.assertEqual(tlen1, tlen2)  # verify that it is redundant
1✔
278

279
            # Now damage the second copy.
280
            f.seek(pos2)
1✔
281
            f.write(p64(tlen2 - 1))
1✔
282

283
        # Try to pack.  This used to yield
284
        #     NameError: global name 's' is not defined
285
        try:
1✔
286
            self._storage.pack(time.time(), referencesf)
1✔
287
        except CorruptedError as detail:
1✔
288
            self.assertTrue("redundant transaction length does not match "
1✔
289
                            "initial transaction length" in str(detail))
290
        else:
291
            self.fail("expected CorruptedError")
292

293
    def test_record_iternext(self):
1✔
294

295
        db = DB(self._storage)
1✔
296
        conn = db.open()
1✔
297
        conn.root()['abc'] = MinPO('abc')
1✔
298
        conn.root()['xyz'] = MinPO('xyz')
1✔
299
        transaction.commit()
1✔
300

301
        # Ensure it's all on disk.
302
        db.close()
1✔
303
        self._storage.close()
1✔
304

305
        self.open()
1✔
306

307
        key = None
1✔
308
        for x in (b'\000', b'\001', b'\002'):
1✔
309
            oid, tid, data, next_oid = self._storage.record_iternext(key)
1✔
310
            self.assertEqual(oid, (b'\000' * 7) + x)
1✔
311
            key = next_oid
1✔
312
            expected_data, expected_tid = load_current(self._storage, oid)
1✔
313
            self.assertEqual(expected_data, data)
1✔
314
            self.assertEqual(expected_tid, tid)
1✔
315
            if x == b'\002':
1✔
316
                self.assertEqual(next_oid, None)
1✔
317
            else:
318
                self.assertNotEqual(next_oid, None)
1✔
319

320
    def testFlushAfterTruncate(self, fail=False):
1✔
321
        r0 = self._dostore(z64)
1✔
322
        storage = self._storage
1✔
323
        t = TransactionMetaData()
1✔
324
        storage.tpc_begin(t)
1✔
325
        storage.store(z64, r0, b'foo', b'', t)
1✔
326
        storage.tpc_vote(t)
1✔
327
        # Read operations are done with separate 'file' objects with their
328
        # own buffers: here, the buffer also includes voted data.
329
        load_current(storage, z64)
1✔
330
        # This must invalidate all read buffers.
331
        storage.tpc_abort(t)
1✔
332
        self._dostore(z64, r0, b'bar', 1)
1✔
333
        # In the case that read buffers were not invalidated, return value
334
        # is based on what was cached during the first load.
335
        self.assertEqual(load_current(storage, z64)[0],
1✔
336
                         b'foo' if fail else b'bar')
337

338
    # We want to be sure that the above test detects any regression
339
    # in the code it checks, because any bug here is like a time bomb: not
340
    # obvious, hard to reproduce, with possible data corruption.
341
    # It's even more important that FilePool.flush() is quite aggressive and
342
    # we'd like to optimize it when Python gets an API to flush read buffers.
343
    # Therefore, 'testFlushAfterTruncate' is tested in turn by another unit
344
    # test.
345
    # On Windows, flushing explicitely is not (always?) necessary.
346
    if sys.platform != 'win32':
1!
347
        def testFlushNeededAfterTruncate(self):
1✔
348
            self._storage._files.flush = lambda: None
1✔
349
            self.testFlushAfterTruncate(True)
1✔
350

351
    def testCommitWithEmptyData(self):
1✔
352
        """
353
        Verify that transaction is persisted even if it has no data, or even
354
        both no data and empty metadata.
355
        """
356

357
        # verify:
358
        # - commit with empty data but non-empty metadata
359
        # - commit with empty data and empty metadata
360
        #   (the fact of commit carries information by itself)
361
        stor = self._storage
1✔
362
        for description in ('commit with empty data', ''):
1✔
363
            t = TransactionMetaData(description=description)
1✔
364
            stor.tpc_begin(t)
1✔
365
            stor.tpc_vote(t)
1✔
366
            head = stor.tpc_finish(t)
1✔
367
            self.assertEqual(head, stor.lastTransaction())
1✔
368

369
            v = list(stor.iterator(start=head, stop=head))
1✔
370
            self.assertEqual(len(v), 1)
1✔
371
            # FileStorage.TransactionRecord or hexstorage.Transaction
372
            trec = v[0]
1✔
373
            self.assertEqual(trec.tid, head)
1✔
374
            self.assertEqual(trec.user,          b'')
1✔
375
            self.assertEqual(trec.description,   description.encode('utf-8'))
1✔
376
            self.assertEqual(trec.extension,     {})
1✔
377
            drecv = list(trec)
1✔
378
            self.assertEqual(drecv, [])
1✔
379

380

381
class FileStorageHexTests(FileStorageTests):
1✔
382

383
    def open(self, **kwargs):
1✔
384
        self._storage = ZODB.tests.hexstorage.HexStorage(
1✔
385
            ZODB.FileStorage.FileStorage('FileStorageTests.fs', **kwargs))
386

387

388
class FileStorageTestsWithBlobsEnabled(FileStorageTests):
1✔
389

390
    def open(self, **kwargs):
1✔
391
        if 'blob_dir' not in kwargs:
1!
392
            kwargs = kwargs.copy()
1✔
393
            kwargs['blob_dir'] = 'blobs'
1✔
394
        FileStorageTests.open(self, **kwargs)
1✔
395

396

397
class FileStorageHexTestsWithBlobsEnabled(FileStorageTests):
1✔
398

399
    def open(self, **kwargs):
1✔
400
        if 'blob_dir' not in kwargs:
1!
401
            kwargs = kwargs.copy()
1✔
402
            kwargs['blob_dir'] = 'blobs'
1✔
403
        FileStorageTests.open(self, **kwargs)
1✔
404
        self._storage = ZODB.tests.hexstorage.HexStorage(self._storage)
1✔
405

406

407
class FileStorageRecoveryTest(
1✔
408
    StorageTestBase.StorageTestBase,
409
    RecoveryStorage.RecoveryStorage,
410
):
411

412
    def setUp(self):
1✔
413
        StorageTestBase.StorageTestBase.setUp(self)
1✔
414
        self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
1✔
415
        self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
1✔
416

417
    def tearDown(self):
1✔
418
        self._dst.close()
1✔
419
        StorageTestBase.StorageTestBase.tearDown(self)
1✔
420

421
    def new_dest(self):
1✔
422
        return ZODB.FileStorage.FileStorage('Dest.fs')
×
423

424

425
class FileStorageHexRecoveryTest(FileStorageRecoveryTest):
1✔
426

427
    def setUp(self):
1✔
428
        StorageTestBase.StorageTestBase.setUp(self)
1✔
429
        self._storage = ZODB.tests.hexstorage.HexStorage(
1✔
430
            ZODB.FileStorage.FileStorage("Source.fs", create=True))
431
        self._dst = ZODB.tests.hexstorage.HexStorage(
1✔
432
            ZODB.FileStorage.FileStorage("Dest.fs", create=True))
433

434

435
class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
1✔
436

437
    @property
1✔
438
    def restore(self):
1✔
439
        raise Exception
1✔
440

441

442
class FileStorageNoRestoreRecoveryTest(FileStorageRecoveryTest):
1✔
443
    # This test actually verifies a code path of
444
    # BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
445
    # use a FileStorage deprived of its restore method.
446

447
    def setUp(self):
1✔
448
        StorageTestBase.StorageTestBase.setUp(self)
1✔
449
        self._storage = FileStorageNoRestore("Source.fs", create=True)
1✔
450
        self._dst = FileStorageNoRestore("Dest.fs", create=True)
1✔
451

452
    def new_dest(self):
1✔
453
        return FileStorageNoRestore('Dest.fs')
×
454

455
    def testRestoreAcrossPack(self):
1✔
456
        # Skip this check as it calls restore directly.
457
        pass
1✔
458

459

460
class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
1✔
461

462
    def setUp(self):
1✔
463
        StorageTestBase.StorageTestBase.setUp(self)
1✔
464
        self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
1✔
465

466
    def testanalyze(self):
1✔
467
        import types
1✔
468

469
        from BTrees.OOBTree import OOBTree
1✔
470

471
        from ZODB.scripts import analyze
1✔
472

473
        # Set up a module to act as a broken import
474
        module_name = 'brokenmodule'
1✔
475
        module = types.ModuleType(module_name)
1✔
476
        sys.modules[module_name] = module
1✔
477

478
        class Broken(MinPO):
1✔
479
            __module__ = module_name
1✔
480
        module.Broken = Broken
1✔
481

482
        oids = [[self._storage.new_oid(), None] for i in range(3)]
1✔
483

484
        def store(i, data):
1✔
485
            oid, revid = oids[i]
1✔
486
            self._storage.store(oid, revid, data, "", t)
1✔
487

488
        for i in range(2):
1✔
489
            t = TransactionMetaData()
1✔
490
            self._storage.tpc_begin(t)
1✔
491

492
            # sometimes data is in this format
493
            store(0, dumps(OOBTree, _protocol))
1✔
494
            # and it could be from a broken module
495
            store(1, dumps(Broken, _protocol))
1✔
496
            # but mostly it looks like this
497
            store(2, zodb_pickle(MinPO(2)))
1✔
498

499
            self._storage.tpc_vote(t)
1✔
500
            tid = self._storage.tpc_finish(t)
1✔
501
            for oid_revid in oids:
1✔
502
                oid_revid[1] = tid
1✔
503

504
        # now break the import of the Broken class
505
        del sys.modules[module_name]
1✔
506

507
        # from ZODB.scripts.analyze.analyze
508
        fsi = self._storage.iterator()
1✔
509
        rep = analyze.Report()
1✔
510
        for txn in fsi:
1✔
511
            analyze.analyze_trans(rep, txn)
1✔
512

513
        # from ZODB.scripts.analyze.report
514
        typemap = sorted(rep.TYPEMAP.keys())
1✔
515
        cumpct = 0.0
1✔
516
        for t in typemap:
1✔
517
            pct = rep.TYPESIZE[t] * 100.0 / rep.DBYTES
1✔
518
            cumpct += pct
1✔
519

520
        self.assertAlmostEqual(cumpct, 100.0, 0,
1✔
521
                               "Failed to analyze some records")
522

523

524
def checkIncreasingTids(fs):
1✔
525
    # Raise an exception if the tids in FileStorage fs aren't
526
    # strictly increasing.
527
    lasttid = b'\0' * 8
1✔
528
    for txn in fs.iterator():
1✔
529
        if lasttid >= txn.tid:
1!
530
            raise ValueError(f"tids out of order {lasttid!r} >= {txn.tid!r}")
×
531
        lasttid = txn.tid
1✔
532

533

534
def timestamp(minutes):
1✔
535
    """Return a TimeStamp object 'minutes' minutes in the future."""
536
    import time
1✔
537

538
    from persistent.TimeStamp import TimeStamp
1✔
539

540
    t = time.time() + 60 * minutes
1✔
541
    return TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
1✔
542

543

544
def testTimeTravelOnOpen():
1✔
545
    """
546
    >>> from ZODB.FileStorage import FileStorage
547
    >>> from zope.testing.loggingsupport import InstalledHandler
548

549
    Arrange to capture log messages -- they're an important part of
550
    this test!
551

552
    >>> handler = InstalledHandler('ZODB.FileStorage')
553

554
    Create a new file storage.
555

556
    >>> st = FileStorage('temp.fs', create=True)
557
    >>> db = DB(st)
558
    >>> db.close()
559

560
    First check the normal case:  transactions are recorded with
561
    increasing tids, and time doesn't run backwards.
562

563
    >>> st = FileStorage('temp.fs')
564
    >>> db = DB(st)
565
    >>> conn = db.open()
566
    >>> conn.root()['xyz'] = 1
567
    >>> transaction.get().commit()
568
    >>> checkIncreasingTids(st)
569
    >>> db.close()
570
    >>> st.cleanup() # remove .fs, .index, etc files
571
    >>> handler.records   # i.e., no log messages
572
    []
573

574
    Now force the database to have transaction records with tids from
575
    the future.
576

577
    >>> st = FileStorage('temp.fs', create=True)
578
    >>> st._ts = timestamp(15)  # 15 minutes in the future
579
    >>> db = DB(st)
580
    >>> db.close()
581

582
    >>> st = FileStorage('temp.fs') # this should log a warning
583
    >>> db = DB(st)
584
    >>> conn = db.open()
585
    >>> conn.root()['xyz'] = 1
586
    >>> transaction.get().commit()
587
    >>> checkIncreasingTids(st)
588
    >>> db.close()
589
    >>> st.cleanup()
590

591
    >>> [record.levelname for record in handler.records]
592
    ['WARNING']
593
    >>> handler.clear()
594

595
    And one more time, with transaction records far in the future.
596
    We expect to log a critical error then, as a time so far in the
597
    future probably indicates a real problem with the system.  Shorter
598
    spans may be due to clock drift.
599

600
    >>> st = FileStorage('temp.fs', create=True)
601
    >>> st._ts = timestamp(60)  # an hour in the future
602
    >>> db = DB(st)
603
    >>> db.close()
604

605
    >>> st = FileStorage('temp.fs') # this should log a critical error
606
    >>> db = DB(st)
607
    >>> conn = db.open()
608
    >>> conn.root()['xyz'] = 1
609
    >>> transaction.get().commit()
610
    >>> checkIncreasingTids(st)
611
    >>> db.close()
612
    >>> st.cleanup()
613

614
    >>> [record.levelname for record in handler.records]
615
    ['CRITICAL']
616
    >>> handler.clear()
617
    >>> handler.uninstall()
618
    """
619

620

621
def lastInvalidations():
1✔
622
    """
623

624
The last invalidations method is used by a storage server to populate
625
it's data structure of recent invalidations.  The lastInvalidations
626
method is passed a count and must return up to count number of the
627
most recent transactions.
628

629
We'll create a FileStorage and populate it with some data, keeping
630
track of the transactions along the way:
631

632
    >>> fs = ZODB.FileStorage.FileStorage('t.fs', create=True)
633
    >>> db = DB(fs)
634
    >>> conn = db.open()
635
    >>> from persistent.mapping import PersistentMapping
636
    >>> last = []
637
    >>> for i in range(100):
638
    ...     conn.root()[i] = PersistentMapping()
639
    ...     transaction.commit()
640
    ...     last.append(fs.lastTransaction())
641

642
Now, we can call lastInvalidations on it:
643

644
    >>> invalidations = fs.lastInvalidations(10)
645
    >>> [t for (t, oids) in invalidations] == last[-10:]
646
    True
647

648
    >>> from ZODB.utils import u64
649
    >>> [[int(u64(oid)) for oid in oids]
650
    ...  for (i, oids) in invalidations]
651
    ... # doctest: +NORMALIZE_WHITESPACE
652
    [[0, 91], [0, 92], [0, 93], [0, 94], [0, 95],
653
     [0, 96], [0, 97], [0, 98], [0, 99], [0, 100]]
654

655
If we ask for more transactions than there are, we'll get as many as
656
there are:
657

658
    >>> len(fs.lastInvalidations(1000))
659
    101
660

661
Of course, calling lastInvalidations on an empty storage refturns no data:
662

663
    >>> db.close()
664
    >>> fs = ZODB.FileStorage.FileStorage('t.fs', create=True)
665
    >>> list(fs.lastInvalidations(10))
666
    []
667

668
    >>> fs.close()
669
    """
670

671

672
def deal_with_finish_failures():
1✔
673
    r"""
674

675
    It's really bad to get errors in FileStorage's _finish method, as
676
    that can cause the file storage to be in an inconsistent
677
    state. The data file will be fine, but the internal data
678
    structures might be hosed. For this reason, FileStorage will close
679
    if there is an error after it has finished writing transaction
680
    data.  It bothers to do very little after writing this data, so
681
    this should rarely, if ever, happen.
682

683
    >>> fs = ZODB.FileStorage.FileStorage('data.fs')
684
    >>> db = DB(fs)
685
    >>> conn = db.open()
686
    >>> conn.root()[1] = 1
687
    >>> transaction.commit()
688

689
    Now, we'll indentially break the file storage. It provides a hook
690
    for this purpose. :)
691

692
    >>> fs._finish_finish = lambda : None
693
    >>> conn.root()[1] = 1
694

695
    >>> import zope.testing.loggingsupport
696
    >>> handler = zope.testing.loggingsupport.InstalledHandler(
697
    ...     'ZODB.FileStorage')
698
    >>> transaction.commit() # doctest: +ELLIPSIS
699
    Traceback (most recent call last):
700
    ...
701
    TypeError: <lambda>() takes ...
702

703

704
    >>> print(handler)
705
    ZODB.FileStorage CRITICAL
706
      Failure in _finish. Closing.
707

708
    >>> handler.uninstall()
709

710
    >>> load_current(fs, b'\0'*8) # doctest: +ELLIPSIS
711
    Traceback (most recent call last):
712
    ...
713
    ValueError: ...
714

715
    >>> db.close()
716
    >>> fs = ZODB.FileStorage.FileStorage('data.fs')
717
    >>> db = DB(fs)
718
    >>> conn = db.open()
719
    >>> conn.root()
720
    {1: 1}
721

722
    >>> transaction.abort()
723
    >>> db.close()
724
    """
725

726

727
def pack_with_open_blob_files():
1✔
728
    """
729
    Make sure packing works while there are open blob files.
730

731
    >>> fs = ZODB.FileStorage.FileStorage('data.fs', blob_dir='blobs')
732
    >>> db = ZODB.DB(fs)
733
    >>> tm1 = transaction.TransactionManager()
734
    >>> conn1 = db.open(tm1)
735
    >>> import ZODB.blob
736
    >>> conn1.root()[1] = ZODB.blob.Blob()
737
    >>> conn1.add(conn1.root()[1])
738
    >>> with conn1.root()[1].open('w') as file:
739
    ...     _ = file.write(b'some data')
740
    >>> tm1.commit()
741

742
    >>> tm2 = transaction.TransactionManager()
743
    >>> conn2 = db.open(tm2)
744
    >>> f = conn1.root()[1].open()
745
    >>> conn1.root()[2] = ZODB.blob.Blob()
746
    >>> conn1.add(conn1.root()[2])
747
    >>> with conn1.root()[2].open('w') as file:
748
    ...     _ = file.write(b'some more data')
749

750
    >>> db.pack()
751
    >>> f.read()
752
    b'some data'
753
    >>> f.close()
754

755
    >>> tm1.commit()
756
    >>> conn2.sync()
757
    >>> with conn2.root()[2].open() as fp: fp.read()
758
    b'some more data'
759

760
    >>> db.close()
761
    """
762

763

764
def readonly_open_nonexistent_file():
1✔
765
    """
766
    Make sure error is reported when non-existent file is tried to be opened
767
    read-only.
768

769
    >>> try:
770
    ...     fs = ZODB.FileStorage.FileStorage('nonexistent.fs', read_only=True)
771
    ... except Exception as e:
772
    ...     # Python2 raises IOError; Python3 - FileNotFoundError
773
    ...     print("error: %s" % str(e)) # doctest: +ELLIPSIS
774
    error: ... No such file or directory: 'nonexistent.fs'
775
    """
776

777

778
def test_suite():
1✔
779
    suite = unittest.TestSuite()
1✔
780
    for klass in [
1✔
781
        FileStorageTests, FileStorageHexTests,
782
        Corruption.FileStorageCorruptTests,
783
        FileStorageRecoveryTest, FileStorageHexRecoveryTest,
784
        FileStorageNoRestoreRecoveryTest,
785
        FileStorageTestsWithBlobsEnabled, FileStorageHexTestsWithBlobsEnabled,
786
        AnalyzeDotPyTest,
787
    ]:
788
        suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(klass))
1✔
789
    suite.addTest(doctest.DocTestSuite(
1✔
790
        setUp=zope.testing.setupstack.setUpDirectory,
791
        tearDown=util.tearDown,
792
        checker=util.checker))
793
    suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
1✔
794
        'BlobFileStorage',
795
        lambda name, blob_dir:
796
        ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir),
797
        test_blob_storage_recovery=True,
798
        test_packing=True,
799
    ))
800
    suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
1✔
801
        'BlobFileHexStorage',
802
        lambda name, blob_dir:
803
        ZODB.tests.hexstorage.HexStorage(
804
            ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir)),
805
        test_blob_storage_recovery=True,
806
        test_packing=True,
807
    ))
808
    suite.addTest(PackableStorage.IExternalGC_suite(
1✔
809
        lambda: ZODB.FileStorage.FileStorage(
810
            'data.fs', blob_dir='blobs', pack_gc=False)))
811
    suite.layer = util.MininalTestLayer('testFileStorage')
1✔
812
    return suite
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc