• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6862042725

14 Nov 2023 10:04AM UTC coverage: 89.246% (+0.01%) from 89.235%
6862042725

push

gh-ci

jeroiraz
release: v1.9DOM.1-RC1

33842 of 37920 relevant lines covered (89.25%)

144337.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.29
/pkg/database/database.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26
        "os"
27
        "path/filepath"
28
        "sync"
29
        "time"
30

31
        "github.com/codenotary/immudb/embedded/document"
32
        "github.com/codenotary/immudb/embedded/sql"
33
        "github.com/codenotary/immudb/embedded/store"
34

35
        "github.com/codenotary/immudb/embedded/logger"
36
        "github.com/codenotary/immudb/pkg/api/schema"
37
)
38

39
const MaxKeyResolutionLimit = 1
40
const MaxKeyScanLimit = 1000
41

42
var ErrKeyResolutionLimitReached = errors.New("key resolution limit reached. It may be due to cyclic references")
43
var ErrResultSizeLimitExceeded = errors.New("result size limit exceeded")
44
var ErrResultSizeLimitReached = errors.New("result size limit reached")
45
var ErrIllegalArguments = store.ErrIllegalArguments
46
var ErrIllegalState = store.ErrIllegalState
47
var ErrIsReplica = errors.New("database is read-only because it's a replica")
48
var ErrNotReplica = errors.New("database is NOT a replica")
49
var ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
50
var ErrInvalidRevision = errors.New("invalid key revision number")
51

52
type DB interface {
53
        GetName() string
54

55
        // Setttings
56
        GetOptions() *Options
57

58
        Path() string
59

60
        AsReplica(asReplica, syncReplication bool, syncAcks int)
61
        IsReplica() bool
62

63
        IsSyncReplicationEnabled() bool
64
        SetSyncReplication(enabled bool)
65

66
        MaxResultSize() int
67

68
        // State
69
        Health() (waitingCount int, lastReleaseAt time.Time)
70
        CurrentState() (*schema.ImmutableState, error)
71

72
        Size() (uint64, error)
73

74
        // Key-Value
75
        Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error)
76
        VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error)
77

78
        Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error)
79
        VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error)
80
        GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error)
81

82
        Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error)
83

84
        SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error)
85
        VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error)
86

87
        Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error)
88

89
        History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error)
90

91
        ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error)
92

93
        Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error)
94
        CountAll(ctx context.Context) (*schema.EntryCount, error)
95

96
        ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error)
97
        VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error)
98
        ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error)
99

100
        // SQL-related
101
        NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error)
102

103
        SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
104
        SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
105

106
        InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error)
107
        InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error)
108

109
        SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (*schema.SQLQueryResult, error)
110
        SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, namedParams []*schema.NamedParam) (*schema.SQLQueryResult, error)
111
        SQLQueryRowReader(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error)
112

113
        VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error)
114

115
        ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error)
116
        DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error)
117

118
        // Transactional layer
119
        WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error
120
        WaitForIndexingUpto(ctx context.Context, txID uint64) error
121

122
        TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error)
123
        ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error)
124
        ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error)
125
        AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error
126
        DiscardPrecommittedTxsSince(txID uint64) error
127

128
        VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error)
129
        TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error)
130

131
        // Maintenance
132
        FlushIndex(req *schema.FlushIndexRequest) error
133
        CompactIndex() error
134

135
        IsClosed() bool
136
        Close() error
137

138
        DocumentDatabase
139
}
140

141
type replicaState struct {
142
        precommittedTxID uint64
143
        precommittedAlh  [sha256.Size]byte
144
}
145

146
// IDB database instance
147
type db struct {
148
        st *store.ImmuStore
149

150
        sqlEngine      *sql.Engine
151
        documentEngine *document.Engine
152

153
        mutex        *instrumentedRWMutex
154
        closingMutex sync.Mutex
155

156
        Logger  logger.Logger
157
        options *Options
158

159
        name string
160

161
        maxResultSize int
162

163
        txPool store.TxPool
164

165
        replicaStates      map[string]*replicaState
166
        replicaStatesMutex sync.Mutex
167
}
168

169
// OpenDB Opens an existing Database from disk
170
func OpenDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log logger.Logger) (DB, error) {
37✔
171
        if dbName == "" {
38✔
172
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
1✔
173
        }
1✔
174

175
        log.Infof("opening database '%s' {replica = %v}...", dbName, op.replica)
36✔
176

36✔
177
        var replicaStates map[string]*replicaState
36✔
178
        // replica states are only managed in primary with synchronous replication
36✔
179
        if !op.replica && op.syncAcks > 0 {
37✔
180
                replicaStates = make(map[string]*replicaState, op.syncAcks)
1✔
181
        }
1✔
182

183
        dbi := &db{
36✔
184
                Logger:        log,
36✔
185
                options:       op,
36✔
186
                name:          dbName,
36✔
187
                replicaStates: replicaStates,
36✔
188
                maxResultSize: MaxKeyScanLimit,
36✔
189
                mutex:         &instrumentedRWMutex{},
36✔
190
        }
36✔
191

36✔
192
        dbDir := dbi.Path()
36✔
193
        _, err := os.Stat(dbDir)
36✔
194
        if os.IsNotExist(err) {
37✔
195
                return nil, fmt.Errorf("missing database directories: %s", dbDir)
1✔
196
        }
1✔
197

198
        stOpts := op.GetStoreOptions().
35✔
199
                WithLogger(log).
35✔
200
                WithMultiIndexing(true).
35✔
201
                WithExternalCommitAllowance(op.syncReplication)
35✔
202

35✔
203
        dbi.st, err = store.Open(dbDir, stOpts)
35✔
204
        if err != nil {
35✔
205
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
206
        }
×
207

208
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
105✔
209
                err := dbi.st.InitIndexing(&store.IndexSpec{
70✔
210
                        SourcePrefix: []byte{prefix},
70✔
211
                        TargetPrefix: []byte{prefix},
70✔
212
                })
70✔
213
                if err != nil {
70✔
214
                        dbi.st.Close()
×
215
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
216
                }
×
217
        }
218

219
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, op.replica)
35✔
220

35✔
221
        sqlOpts := sql.DefaultOptions().
35✔
222
                WithPrefix([]byte{SQLPrefix}).
35✔
223
                WithMultiDBHandler(multidbHandler)
35✔
224

35✔
225
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
35✔
226
        if err != nil {
35✔
227
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, op.replica, err)
×
228
                return nil, err
×
229
        }
×
230
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, op.replica)
35✔
231

35✔
232
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
35✔
233
        if err != nil {
35✔
234
                return nil, err
×
235
        }
×
236
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, op.replica)
35✔
237

35✔
238
        txPool, err := dbi.st.NewTxHolderPool(op.readTxPoolSize, false)
35✔
239
        if err != nil {
35✔
240
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
241
        }
×
242
        dbi.txPool = txPool
35✔
243

35✔
244
        if op.replica {
39✔
245
                dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, op.replica)
4✔
246
                return dbi, nil
4✔
247
        }
4✔
248

249
        dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, op.replica)
31✔
250

31✔
251
        return dbi, nil
31✔
252
}
253

254
func (d *db) Path() string {
38✔
255
        return filepath.Join(d.options.GetDBRootPath(), d.GetName())
38✔
256
}
38✔
257

258
func (d *db) allocTx() (*store.Tx, error) {
10,957✔
259
        tx, err := d.txPool.Alloc()
10,957✔
260
        if errors.Is(err, store.ErrTxPoolExhausted) {
10,957✔
261
                return nil, ErrTxReadPoolExhausted
×
262
        }
×
263
        return tx, err
10,957✔
264
}
265

266
func (d *db) releaseTx(tx *store.Tx) {
10,957✔
267
        d.txPool.Release(tx)
10,957✔
268
}
10,957✔
269

270
// NewDB Creates a new Database along with it's directories and files
271
func NewDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log logger.Logger) (DB, error) {
649✔
272
        if dbName == "" {
649✔
273
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
×
274
        }
×
275

276
        log.Infof("creating database '%s' {replica = %v}...", dbName, op.replica)
649✔
277

649✔
278
        var replicaStates map[string]*replicaState
649✔
279
        // replica states are only managed in primary with synchronous replication
649✔
280
        if !op.replica && op.syncAcks > 0 {
660✔
281
                replicaStates = make(map[string]*replicaState, op.syncAcks)
11✔
282
        }
11✔
283

284
        dbi := &db{
649✔
285
                Logger:        log,
649✔
286
                options:       op,
649✔
287
                name:          dbName,
649✔
288
                replicaStates: replicaStates,
649✔
289
                maxResultSize: MaxKeyScanLimit,
649✔
290
                mutex:         &instrumentedRWMutex{},
649✔
291
        }
649✔
292

649✔
293
        dbDir := filepath.Join(op.GetDBRootPath(), dbName)
649✔
294

649✔
295
        _, err := os.Stat(dbDir)
649✔
296
        if err == nil {
650✔
297
                return nil, fmt.Errorf("database directories already exist: %s", dbDir)
1✔
298
        }
1✔
299

300
        if err = os.MkdirAll(dbDir, os.ModePerm); err != nil {
649✔
301
                return nil, logErr(dbi.Logger, "unable to create data folder: %s", err)
1✔
302
        }
1✔
303

304
        stOpts := op.GetStoreOptions().
647✔
305
                WithExternalCommitAllowance(op.syncReplication).
647✔
306
                WithMultiIndexing(true).
647✔
307
                WithLogger(log)
647✔
308

647✔
309
        dbi.st, err = store.Open(dbDir, stOpts)
647✔
310
        if err != nil {
647✔
311
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
312
        }
×
313

314
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,941✔
315
                err := dbi.st.InitIndexing(&store.IndexSpec{
1,294✔
316
                        SourcePrefix: []byte{prefix},
1,294✔
317
                        TargetPrefix: []byte{prefix},
1,294✔
318
                })
1,294✔
319
                if err != nil {
1,294✔
320
                        dbi.st.Close()
×
321
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
322
                }
×
323
        }
324

325
        txPool, err := dbi.st.NewTxHolderPool(op.readTxPoolSize, false)
647✔
326
        if err != nil {
647✔
327
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
328
        }
×
329
        dbi.txPool = txPool
647✔
330

647✔
331
        sqlOpts := sql.DefaultOptions().
647✔
332
                WithPrefix([]byte{SQLPrefix}).
647✔
333
                WithMultiDBHandler(multidbHandler)
647✔
334

647✔
335
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, op.replica)
647✔
336

647✔
337
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
647✔
338
        if err != nil {
647✔
339
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, op.replica, err)
×
340
                return nil, err
×
341
        }
×
342
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, op.replica)
647✔
343

647✔
344
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
647✔
345
        if err != nil {
647✔
346
                return nil, logErr(dbi.Logger, "Unable to open database: %s", err)
×
347
        }
×
348
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, op.replica)
647✔
349

647✔
350
        dbi.Logger.Infof("database '%s' successfully created {replica = %v}", dbName, op.replica)
647✔
351

647✔
352
        return dbi, nil
647✔
353
}
354

355
func (d *db) MaxResultSize() int {
24✔
356
        return d.maxResultSize
24✔
357
}
24✔
358

359
func (d *db) FlushIndex(req *schema.FlushIndexRequest) error {
6✔
360
        if req == nil {
7✔
361
                return store.ErrIllegalArguments
1✔
362
        }
1✔
363

364
        return d.st.FlushIndexes(req.CleanupPercentage, req.Synced)
5✔
365
}
366

367
// CompactIndex ...
368
func (d *db) CompactIndex() error {
4✔
369
        return d.st.CompactIndexes()
4✔
370
}
4✔
371

372
// Set ...
373
func (d *db) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,443✔
374
        d.mutex.RLock()
4,443✔
375
        defer d.mutex.RUnlock()
4,443✔
376

4,443✔
377
        if d.isReplica() {
4,446✔
378
                return nil, ErrIsReplica
3✔
379
        }
3✔
380

381
        return d.set(ctx, req)
4,440✔
382
}
383

384
func (d *db) set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,440✔
385
        if req == nil {
4,441✔
386
                return nil, ErrIllegalArguments
1✔
387
        }
1✔
388

389
        tx, err := d.st.NewWriteOnlyTx(ctx)
4,439✔
390
        if err != nil {
4,439✔
391
                return nil, err
×
392
        }
×
393
        defer tx.Cancel()
4,439✔
394

4,439✔
395
        keys := make(map[[sha256.Size]byte]struct{}, len(req.KVs))
4,439✔
396

4,439✔
397
        for _, kv := range req.KVs {
10,173✔
398
                if len(kv.Key) == 0 {
5,736✔
399
                        return nil, ErrIllegalArguments
2✔
400
                }
2✔
401

402
                kid := sha256.Sum256(kv.Key)
5,732✔
403
                _, ok := keys[kid]
5,732✔
404
                if ok {
5,733✔
405
                        return nil, schema.ErrDuplicatedKeysNotSupported
1✔
406
                }
1✔
407
                keys[kid] = struct{}{}
5,731✔
408

5,731✔
409
                e := EncodeEntrySpec(
5,731✔
410
                        kv.Key,
5,731✔
411
                        schema.KVMetadataFromProto(kv.Metadata),
5,731✔
412
                        kv.Value,
5,731✔
413
                )
5,731✔
414

5,731✔
415
                err = tx.Set(e.Key, e.Metadata, e.Value)
5,731✔
416
                if err != nil {
5,732✔
417
                        return nil, err
1✔
418
                }
1✔
419
        }
420

421
        for i := range req.Preconditions {
5,506✔
422

1,071✔
423
                c, err := PreconditionFromProto(req.Preconditions[i])
1,071✔
424
                if err != nil {
1,072✔
425
                        return nil, err
1✔
426
                }
1✔
427

428
                err = tx.AddPrecondition(c)
1,070✔
429
                if err != nil {
1,071✔
430
                        return nil, fmt.Errorf("%w: %v", store.ErrInvalidPrecondition, err)
1✔
431
                }
1✔
432
        }
433

434
        var hdr *store.TxHeader
4,433✔
435

4,433✔
436
        if req.NoWait {
4,433✔
437
                hdr, err = tx.AsyncCommit(ctx)
×
438
        } else {
4,433✔
439
                hdr, err = tx.Commit(ctx)
4,433✔
440
        }
4,433✔
441
        if err != nil {
4,464✔
442
                return nil, err
31✔
443
        }
31✔
444

445
        return schema.TxHeaderToProto(hdr), nil
4,402✔
446
}
447

448
func checkKeyRequest(req *schema.KeyRequest) error {
3,089✔
449
        if req == nil {
3,090✔
450
                return fmt.Errorf(
1✔
451
                        "%w: empty request",
1✔
452
                        ErrIllegalArguments,
1✔
453
                )
1✔
454
        }
1✔
455

456
        if len(req.Key) == 0 {
3,090✔
457
                return fmt.Errorf(
2✔
458
                        "%w: empty key",
2✔
459
                        ErrIllegalArguments,
2✔
460
                )
2✔
461
        }
2✔
462

463
        if req.AtTx > 0 {
3,111✔
464
                if req.SinceTx > 0 {
27✔
465
                        return fmt.Errorf(
2✔
466
                                "%w: SinceTx should not be specified when AtTx is used",
2✔
467
                                ErrIllegalArguments,
2✔
468
                        )
2✔
469
                }
2✔
470

471
                if req.AtRevision != 0 {
24✔
472
                        return fmt.Errorf(
1✔
473
                                "%w: AtRevision should not be specified when AtTx is used",
1✔
474
                                ErrIllegalArguments,
1✔
475
                        )
1✔
476
                }
1✔
477
        }
478

479
        return nil
3,083✔
480
}
481

482
// Get ...
483
func (d *db) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) {
3,085✔
484
        err := checkKeyRequest(req)
3,085✔
485
        if err != nil {
3,087✔
486
                return nil, err
2✔
487
        }
2✔
488

489
        currTxID, _ := d.st.CommittedAlh()
3,083✔
490
        if req.SinceTx > currTxID {
3,084✔
491
                return nil, fmt.Errorf(
1✔
492
                        "%w: SinceTx must not be greater than the current transaction ID",
1✔
493
                        ErrIllegalArguments,
1✔
494
                )
1✔
495
        }
1✔
496

497
        if !req.NoWait && req.AtTx == 0 {
6,142✔
498
                waitUntilTx := req.SinceTx
3,060✔
499
                if waitUntilTx == 0 {
5,023✔
500
                        waitUntilTx = currTxID
1,963✔
501
                }
1,963✔
502

503
                err := d.WaitForIndexingUpto(ctx, waitUntilTx)
3,060✔
504
                if err != nil {
3,062✔
505
                        return nil, err
2✔
506
                }
2✔
507
        }
508

509
        if req.AtRevision != 0 {
3,111✔
510
                return d.getAtRevision(ctx, EncodeKey(req.Key), req.AtRevision, true)
31✔
511
        }
31✔
512

513
        return d.getAtTx(ctx, EncodeKey(req.Key), req.AtTx, 0, d.st, 0, true)
3,049✔
514
}
515

516
func (d *db) get(ctx context.Context, key []byte, index store.KeyIndex, skipIntegrityCheck bool) (*schema.Entry, error) {
10✔
517
        return d.getAtTx(ctx, key, 0, 0, index, 0, skipIntegrityCheck)
10✔
518
}
10✔
519

520
func (d *db) getAtTx(
521
        ctx context.Context,
522
        key []byte,
523
        atTx uint64,
524
        resolved int,
525
        index store.KeyIndex,
526
        revision uint64,
527
        skipIntegrityCheck bool,
528
) (entry *schema.Entry, err error) {
5,617✔
529

5,617✔
530
        var txID uint64
5,617✔
531
        var val []byte
5,617✔
532
        var md *store.KVMetadata
5,617✔
533

5,617✔
534
        if atTx == 0 {
9,322✔
535
                valRef, err := index.Get(ctx, key)
3,705✔
536
                if err != nil {
3,786✔
537
                        return nil, err
81✔
538
                }
81✔
539

540
                txID = valRef.Tx()
3,624✔
541
                revision = valRef.HC()
3,624✔
542
                md = valRef.KVMetadata()
3,624✔
543

3,624✔
544
                val, err = valRef.Resolve()
3,624✔
545
                if err != nil {
3,628✔
546
                        return nil, err
4✔
547
                }
4✔
548
        } else {
1,912✔
549
                txID = atTx
1,912✔
550

1,912✔
551
                md, val, err = d.readMetadataAndValue(key, atTx, skipIntegrityCheck)
1,912✔
552
                if err != nil {
2,007✔
553
                        return nil, err
95✔
554
                }
95✔
555
        }
556

557
        return d.resolveValue(ctx, key, val, resolved, txID, md, index, revision, skipIntegrityCheck)
5,437✔
558
}
559

560
func (d *db) readMetadataAndValue(key []byte, atTx uint64, skipIntegrityCheck bool) (*store.KVMetadata, []byte, error) {
1,912✔
561
        entry, _, err := d.st.ReadTxEntry(atTx, key, skipIntegrityCheck)
1,912✔
562
        if err != nil {
1,998✔
563
                return nil, nil, err
86✔
564
        }
86✔
565

566
        v, err := d.st.ReadValue(entry)
1,826✔
567
        if err != nil {
1,835✔
568
                return nil, nil, err
9✔
569
        }
9✔
570

571
        return entry.Metadata(), v, nil
1,817✔
572
}
573

574
func (d *db) getAtRevision(ctx context.Context, key []byte, atRevision int64, skipIntegrityCheck bool) (entry *schema.Entry, err error) {
31✔
575
        var offset uint64
31✔
576
        var desc bool
31✔
577

31✔
578
        if atRevision > 0 {
47✔
579
                offset = uint64(atRevision) - 1
16✔
580
                desc = false
16✔
581
        } else {
31✔
582
                offset = -uint64(atRevision)
15✔
583
                desc = true
15✔
584
        }
15✔
585

586
        valRefs, hCount, err := d.st.History(key, offset, desc, 1)
31✔
587
        if errors.Is(err, store.ErrNoMoreEntries) || errors.Is(err, store.ErrOffsetOutOfRange) {
35✔
588
                return nil, ErrInvalidRevision
4✔
589
        }
4✔
590
        if err != nil {
28✔
591
                return nil, err
1✔
592
        }
1✔
593

594
        if atRevision < 0 {
39✔
595
                atRevision = int64(hCount) + atRevision
13✔
596
        }
13✔
597

598
        entry, err = d.getAtTx(ctx, key, valRefs[0].Tx(), 0, d.st, uint64(atRevision), skipIntegrityCheck)
26✔
599
        if err != nil {
27✔
600
                return nil, err
1✔
601
        }
1✔
602

603
        return entry, err
25✔
604
}
605

606
func (d *db) resolveValue(
607
        ctx context.Context,
608
        key []byte,
609
        val []byte,
610
        resolved int,
611
        txID uint64,
612
        md *store.KVMetadata,
613
        index store.KeyIndex,
614
        revision uint64,
615
        skipIntegrityCheck bool,
616
) (entry *schema.Entry, err error) {
5,446✔
617
        if md != nil && md.Deleted() {
5,448✔
618
                return nil, store.ErrKeyNotFound
2✔
619
        }
2✔
620

621
        if len(val) < 1 {
5,444✔
622
                return nil, fmt.Errorf("%w: internal value consistency error - missing value prefix", store.ErrCorruptedData)
×
623
        }
×
624

625
        // Reference lookup
626
        if val[0] == ReferenceValuePrefix {
5,656✔
627
                if len(val) < 1+8 {
212✔
628
                        return nil, fmt.Errorf("%w: internal value consistency error - invalid reference", store.ErrCorruptedData)
×
629
                }
×
630

631
                if resolved == MaxKeyResolutionLimit {
214✔
632
                        return nil, ErrKeyResolutionLimitReached
2✔
633
                }
2✔
634

635
                atTx := binary.BigEndian.Uint64(TrimPrefix(val))
210✔
636
                refKey := make([]byte, len(val)-1-8)
210✔
637
                copy(refKey, val[1+8:])
210✔
638

210✔
639
                if index != nil {
419✔
640
                        entry, err = d.getAtTx(ctx, refKey, atTx, resolved+1, index, 0, skipIntegrityCheck)
209✔
641
                        if err != nil {
210✔
642
                                return nil, err
1✔
643
                        }
1✔
644
                } else {
1✔
645
                        entry = &schema.Entry{
1✔
646
                                Key: TrimPrefix(refKey),
1✔
647
                                Tx:  atTx,
1✔
648
                        }
1✔
649
                }
1✔
650

651
                entry.ReferencedBy = &schema.Reference{
209✔
652
                        Tx:       txID,
209✔
653
                        Key:      TrimPrefix(key),
209✔
654
                        Metadata: schema.KVMetadataToProto(md),
209✔
655
                        AtTx:     atTx,
209✔
656
                        Revision: revision,
209✔
657
                }
209✔
658

209✔
659
                return entry, nil
209✔
660
        }
661

662
        return &schema.Entry{
5,232✔
663
                Tx:       txID,
5,232✔
664
                Key:      TrimPrefix(key),
5,232✔
665
                Metadata: schema.KVMetadataToProto(md),
5,232✔
666
                Value:    TrimPrefix(val),
5,232✔
667
                Revision: revision,
5,232✔
668
        }, nil
5,232✔
669
}
670

671
func (d *db) Health() (waitingCount int, lastReleaseAt time.Time) {
1✔
672
        return d.mutex.State()
1✔
673
}
1✔
674

675
// CurrentState ...
676
func (d *db) CurrentState() (*schema.ImmutableState, error) {
15,341✔
677
        lastTxID, lastTxAlh := d.st.CommittedAlh()
15,341✔
678
        lastPreTxID, lastPreTxAlh := d.st.PrecommittedAlh()
15,341✔
679

15,341✔
680
        return &schema.ImmutableState{
15,341✔
681
                TxId:               lastTxID,
15,341✔
682
                TxHash:             lastTxAlh[:],
15,341✔
683
                PrecommittedTxId:   lastPreTxID,
15,341✔
684
                PrecommittedTxHash: lastPreTxAlh[:],
15,341✔
685
        }, nil
15,341✔
686
}
15,341✔
687

688
// WaitForTx blocks caller until specified tx
689
func (d *db) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error {
8,439✔
690
        return d.st.WaitForTx(ctx, txID, allowPrecommitted)
8,439✔
691
}
8,439✔
692

693
// WaitForIndexingUpto blocks caller until specified tx gets indexed
694
func (d *db) WaitForIndexingUpto(ctx context.Context, txID uint64) error {
3,067✔
695
        return d.st.WaitForIndexingUpto(ctx, txID)
3,067✔
696
}
3,067✔
697

698
// VerifiableSet ...
699
func (d *db) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) {
48✔
700
        if req == nil {
49✔
701
                return nil, ErrIllegalArguments
1✔
702
        }
1✔
703

704
        lastTxID, _ := d.st.CommittedAlh()
47✔
705
        if lastTxID < req.ProveSinceTx {
48✔
706
                return nil, ErrIllegalState
1✔
707
        }
1✔
708

709
        // Preallocate tx buffers
710
        lastTx, err := d.allocTx()
46✔
711
        if err != nil {
46✔
712
                return nil, err
×
713
        }
×
714
        defer d.releaseTx(lastTx)
46✔
715

46✔
716
        txhdr, err := d.Set(ctx, req.SetRequest)
46✔
717
        if err != nil {
47✔
718
                return nil, err
1✔
719
        }
1✔
720

721
        err = d.st.ReadTx(uint64(txhdr.Id), false, lastTx)
45✔
722
        if err != nil {
45✔
723
                return nil, err
×
724
        }
×
725

726
        var prevTxHdr *store.TxHeader
45✔
727

45✔
728
        if req.ProveSinceTx == 0 {
65✔
729
                prevTxHdr = lastTx.Header()
20✔
730
        } else {
45✔
731
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
25✔
732
                if err != nil {
25✔
733
                        return nil, err
×
734
                }
×
735
        }
736

737
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
45✔
738
        if err != nil {
45✔
739
                return nil, err
×
740
        }
×
741

742
        return &schema.VerifiableTx{
45✔
743
                Tx:        schema.TxToProto(lastTx),
45✔
744
                DualProof: schema.DualProofToProto(dualProof),
45✔
745
        }, nil
45✔
746
}
747

748
// VerifiableGet ...
749
func (d *db) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) {
1,038✔
750
        if req == nil {
1,039✔
751
                return nil, ErrIllegalArguments
1✔
752
        }
1✔
753

754
        lastTxID, _ := d.st.CommittedAlh()
1,037✔
755
        if lastTxID < req.ProveSinceTx {
1,038✔
756
                return nil, ErrIllegalState
1✔
757
        }
1✔
758

759
        e, err := d.Get(ctx, req.KeyRequest)
1,036✔
760
        if err != nil {
1,040✔
761
                return nil, err
4✔
762
        }
4✔
763

764
        var vTxID uint64
1,032✔
765
        var vKey []byte
1,032✔
766

1,032✔
767
        if e.ReferencedBy == nil {
2,059✔
768
                vTxID = e.Tx
1,027✔
769
                vKey = e.Key
1,027✔
770
        } else {
1,032✔
771
                vTxID = e.ReferencedBy.Tx
5✔
772
                vKey = e.ReferencedBy.Key
5✔
773
        }
5✔
774

775
        // key-value inclusion proof
776
        tx, err := d.allocTx()
1,032✔
777
        if err != nil {
1,032✔
778
                return nil, err
×
779
        }
×
780
        defer d.releaseTx(tx)
1,032✔
781

1,032✔
782
        err = d.st.ReadTx(vTxID, false, tx)
1,032✔
783
        if err != nil {
1,032✔
784
                return nil, err
×
785
        }
×
786

787
        var rootTxHdr *store.TxHeader
1,032✔
788

1,032✔
789
        if req.ProveSinceTx == 0 {
2,043✔
790
                rootTxHdr = tx.Header()
1,011✔
791
        } else {
1,032✔
792
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
21✔
793
                if err != nil {
21✔
794
                        return nil, err
×
795
                }
×
796
        }
797

798
        inclusionProof, err := tx.Proof(EncodeKey(vKey))
1,032✔
799
        if err != nil {
1,032✔
800
                return nil, err
×
801
        }
×
802

803
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,032✔
804

1,032✔
805
        if req.ProveSinceTx <= vTxID {
2,058✔
806
                sourceTxHdr = rootTxHdr
1,026✔
807
                targetTxHdr = tx.Header()
1,026✔
808
        } else {
1,032✔
809
                sourceTxHdr = tx.Header()
6✔
810
                targetTxHdr = rootTxHdr
6✔
811
        }
6✔
812

813
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,032✔
814
        if err != nil {
1,032✔
815
                return nil, err
×
816
        }
×
817

818
        verifiableTx := &schema.VerifiableTx{
1,032✔
819
                Tx:        schema.TxToProto(tx),
1,032✔
820
                DualProof: schema.DualProofToProto(dualProof),
1,032✔
821
        }
1,032✔
822

1,032✔
823
        return &schema.VerifiableEntry{
1,032✔
824
                Entry:          e,
1,032✔
825
                VerifiableTx:   verifiableTx,
1,032✔
826
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
1,032✔
827
        }, nil
1,032✔
828
}
829

830
func (d *db) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) {
9✔
831
        if req == nil {
10✔
832
                return nil, ErrIllegalArguments
1✔
833
        }
1✔
834

835
        d.mutex.RLock()
8✔
836
        defer d.mutex.RUnlock()
8✔
837

8✔
838
        if d.isReplica() {
8✔
839
                return nil, ErrIsReplica
×
840
        }
×
841

842
        opts := store.DefaultTxOptions()
8✔
843

8✔
844
        if req.SinceTx > 0 {
9✔
845
                if req.SinceTx > d.st.LastPrecommittedTxID() {
2✔
846
                        return nil, store.ErrTxNotFound
1✔
847
                }
1✔
848

849
                opts.WithSnapshotMustIncludeTxID(func(_ uint64) uint64 {
×
850
                        return req.SinceTx
×
851
                })
×
852
        }
853

854
        tx, err := d.st.NewTx(ctx, opts)
7✔
855
        if err != nil {
7✔
856
                return nil, err
×
857
        }
×
858
        defer tx.Cancel()
7✔
859

7✔
860
        for _, k := range req.Keys {
13✔
861
                if len(k) == 0 {
6✔
862
                        return nil, ErrIllegalArguments
×
863
                }
×
864

865
                md := store.NewKVMetadata()
6✔
866

6✔
867
                md.AsDeleted(true)
6✔
868

6✔
869
                e := EncodeEntrySpec(k, md, nil)
6✔
870

6✔
871
                err = tx.Delete(ctx, e.Key)
6✔
872
                if err != nil {
7✔
873
                        return nil, err
1✔
874
                }
1✔
875
        }
876

877
        var hdr *store.TxHeader
6✔
878
        if req.NoWait {
6✔
879
                hdr, err = tx.AsyncCommit(ctx)
×
880
        } else {
6✔
881
                hdr, err = tx.Commit(ctx)
6✔
882
        }
6✔
883
        if err != nil {
7✔
884
                return nil, err
1✔
885
        }
1✔
886

887
        return schema.TxHeaderToProto(hdr), nil
5✔
888
}
889

890
// GetAll ...
891
func (d *db) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) {
4✔
892
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
4✔
893
        if err != nil {
4✔
894
                return nil, err
×
895
        }
×
896
        defer snap.Close()
4✔
897

4✔
898
        list := &schema.Entries{}
4✔
899

4✔
900
        for _, key := range req.Keys {
14✔
901
                e, err := d.get(ctx, EncodeKey(key), snap, true)
10✔
902
                if err == nil || errors.Is(err, store.ErrKeyNotFound) {
20✔
903
                        if e != nil {
19✔
904
                                list.Entries = append(list.Entries, e)
9✔
905
                        }
9✔
906
                } else {
×
907
                        return nil, err
×
908
                }
×
909
        }
910

911
        return list, nil
4✔
912
}
913

914
// Size ...
915
func (d *db) Size() (uint64, error) {
221✔
916
        return d.st.TxCount(), nil
221✔
917
}
221✔
918

919
// Count ...
920
func (d *db) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) {
8✔
921
        if prefix == nil {
10✔
922
                return nil, ErrIllegalArguments
2✔
923
        }
2✔
924

925
        tx, err := d.st.NewTx(ctx, store.DefaultTxOptions().WithMode(store.ReadOnlyTx))
6✔
926
        if err != nil {
6✔
927
                return nil, err
×
928
        }
×
929
        defer tx.Cancel()
6✔
930

6✔
931
        keyReader, err := tx.NewKeyReader(store.KeyReaderSpec{
6✔
932
                Prefix: WrapWithPrefix(prefix.Prefix, SetKeyPrefix),
6✔
933
        })
6✔
934
        if err != nil {
6✔
935
                return nil, err
×
936
        }
×
937
        defer keyReader.Close()
6✔
938

6✔
939
        count := 0
6✔
940

6✔
941
        for {
19✔
942
                _, _, err := keyReader.Read(ctx)
13✔
943
                if errors.Is(err, store.ErrNoMoreEntries) {
19✔
944
                        break
6✔
945
                }
946
                if err != nil {
7✔
947
                        return nil, err
×
948
                }
×
949

950
                count++
7✔
951
        }
952

953
        return &schema.EntryCount{Count: uint64(count)}, nil
6✔
954
}
955

956
// CountAll ...
957
func (d *db) CountAll(ctx context.Context) (*schema.EntryCount, error) {
4✔
958
        return d.Count(ctx, &schema.KeyPrefix{})
4✔
959
}
4✔
960

961
// TxByID ...
962
func (d *db) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) {
61✔
963
        if req == nil {
62✔
964
                return nil, ErrIllegalArguments
1✔
965
        }
1✔
966

967
        var snap *store.Snapshot
60✔
968
        var err error
60✔
969

60✔
970
        tx, err := d.allocTx()
60✔
971
        if err != nil {
60✔
972
                return nil, err
×
973
        }
×
974
        defer d.releaseTx(tx)
60✔
975

60✔
976
        if !req.KeepReferencesUnresolved {
118✔
977
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
58✔
978
                if err != nil {
58✔
979
                        return nil, err
×
980
                }
×
981
                defer snap.Close()
58✔
982
        }
983

984
        // key-value inclusion proof
985
        err = d.st.ReadTx(req.Tx, false, tx)
60✔
986
        if err != nil {
60✔
987
                return nil, err
×
988
        }
×
989

990
        return d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
60✔
991
}
992

993
func (d *db) snapshotSince(ctx context.Context, prefix []byte, txID uint64) (*store.Snapshot, error) {
1,702✔
994
        currTxID, _ := d.st.CommittedAlh()
1,702✔
995

1,702✔
996
        if txID > currTxID {
1,702✔
997
                return nil, ErrIllegalArguments
×
998
        }
×
999

1000
        waitUntilTx := txID
1,702✔
1001
        if waitUntilTx == 0 {
3,338✔
1002
                waitUntilTx = currTxID
1,636✔
1003
        }
1,636✔
1004

1005
        return d.st.SnapshotMustIncludeTxID(ctx, prefix, waitUntilTx)
1,702✔
1006
}
1007

1008
func (d *db) serializeTx(ctx context.Context, tx *store.Tx, spec *schema.EntriesSpec, snap *store.Snapshot, skipIntegrityCheck bool) (*schema.Tx, error) {
1,374✔
1009
        if spec == nil {
2,238✔
1010
                return schema.TxToProto(tx), nil
864✔
1011
        }
864✔
1012

1013
        stx := &schema.Tx{
510✔
1014
                Header: schema.TxHeaderToProto(tx.Header()),
510✔
1015
        }
510✔
1016

510✔
1017
        for _, e := range tx.Entries() {
1,048✔
1018
                switch e.Key()[0] {
538✔
1019
                case SetKeyPrefix:
39✔
1020
                        {
78✔
1021
                                if spec.KvEntriesSpec == nil || spec.KvEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
57✔
1022
                                        break
18✔
1023
                                }
1024

1025
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
24✔
1026
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
3✔
1027
                                        break
3✔
1028
                                }
1029

1030
                                v, err := d.st.ReadValue(e)
18✔
1031
                                if errors.Is(err, store.ErrExpiredEntry) {
18✔
1032
                                        break
×
1033
                                }
1034
                                if err != nil {
18✔
1035
                                        return nil, err
×
1036
                                }
×
1037

1038
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
27✔
1039
                                        kve := schema.TxEntryToProto(e)
9✔
1040
                                        kve.Value = v
9✔
1041
                                        stx.Entries = append(stx.Entries, kve)
9✔
1042
                                        break
9✔
1043
                                }
1044

1045
                                // resolve entry
1046
                                var index store.KeyIndex
9✔
1047
                                if snap != nil {
16✔
1048
                                        index = snap
7✔
1049
                                }
7✔
1050

1051
                                kve, err := d.resolveValue(ctx, e.Key(), v, 0, tx.Header().ID, e.Metadata(), index, 0, skipIntegrityCheck)
9✔
1052
                                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
10✔
1053
                                        break // ignore deleted ones (referenced key may have been deleted)
1✔
1054
                                }
1055
                                if err != nil {
8✔
1056
                                        return nil, err
×
1057
                                }
×
1058

1059
                                stx.KvEntries = append(stx.KvEntries, kve)
8✔
1060
                        }
1061
                case SortedSetKeyPrefix:
14✔
1062
                        {
28✔
1063
                                if spec.ZEntriesSpec == nil || spec.ZEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
22✔
1064
                                        break
8✔
1065
                                }
1066

1067
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
7✔
1068
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1069
                                        break
1✔
1070
                                }
1071

1072
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
6✔
1073
                                        v, err := d.st.ReadValue(e)
1✔
1074
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1075
                                                break
×
1076
                                        }
1077
                                        if err != nil {
1✔
1078
                                                return nil, err
×
1079
                                        }
×
1080

1081
                                        kve := schema.TxEntryToProto(e)
1✔
1082
                                        kve.Value = v
1✔
1083
                                        stx.Entries = append(stx.Entries, kve)
1✔
1084
                                        break
1✔
1085
                                }
1086

1087
                                // zKey = [1+setLenLen+set+scoreLen+keyLenLen+1+key+txIDLen]
1088
                                zKey := e.Key()
4✔
1089

4✔
1090
                                setLen := int(binary.BigEndian.Uint64(zKey[1:]))
4✔
1091
                                set := make([]byte, setLen)
4✔
1092
                                copy(set, zKey[1+setLenLen:])
4✔
1093

4✔
1094
                                scoreOff := 1 + setLenLen + setLen
4✔
1095
                                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
4✔
1096
                                score := math.Float64frombits(scoreB)
4✔
1097

4✔
1098
                                keyOff := scoreOff + scoreLen + keyLenLen
4✔
1099
                                key := make([]byte, len(zKey)-keyOff-txIDLen)
4✔
1100
                                copy(key, zKey[keyOff:])
4✔
1101

4✔
1102
                                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
4✔
1103

4✔
1104
                                var entry *schema.Entry
4✔
1105
                                var err error
4✔
1106

4✔
1107
                                if snap != nil {
7✔
1108
                                        entry, err = d.getAtTx(ctx, key, atTx, 1, snap, 0, skipIntegrityCheck)
3✔
1109
                                        if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
3✔
1110
                                                break // ignore deleted ones (referenced key may have been deleted)
×
1111
                                        }
1112
                                        if err != nil {
3✔
1113
                                                return nil, err
×
1114
                                        }
×
1115
                                }
1116

1117
                                zentry := &schema.ZEntry{
4✔
1118
                                        Set:   set,
4✔
1119
                                        Key:   key[1:],
4✔
1120
                                        Entry: entry,
4✔
1121
                                        Score: score,
4✔
1122
                                        AtTx:  atTx,
4✔
1123
                                }
4✔
1124

4✔
1125
                                stx.ZEntries = append(stx.ZEntries, zentry)
4✔
1126
                        }
1127
                case SQLPrefix:
5✔
1128
                        {
10✔
1129
                                if spec.SqlEntriesSpec == nil || spec.SqlEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
7✔
1130
                                        break
2✔
1131
                                }
1132

1133
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
4✔
1134
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1135
                                        break
1✔
1136
                                }
1137

1138
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
3✔
1139
                                        v, err := d.st.ReadValue(e)
1✔
1140
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1141
                                                break
×
1142
                                        }
1143
                                        if err != nil {
1✔
1144
                                                return nil, err
×
1145
                                        }
×
1146

1147
                                        kve := schema.TxEntryToProto(e)
1✔
1148
                                        kve.Value = v
1✔
1149
                                        stx.Entries = append(stx.Entries, kve)
1✔
1150
                                        break
1✔
1151
                                }
1152

1153
                                return nil, fmt.Errorf("%w: sql entry resolution is not supported", ErrIllegalArguments)
1✔
1154
                        }
1155
                }
1156
        }
1157

1158
        return stx, nil
509✔
1159
}
1160

1161
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
6,759✔
1162
        d.replicaStatesMutex.Lock()
6,759✔
1163
        defer d.replicaStatesMutex.Unlock()
6,759✔
1164

6,759✔
1165
        // clean up replicaStates
6,759✔
1166
        // it's safe to remove up to latest tx committed in primary
6,759✔
1167
        for uuid, st := range d.replicaStates {
7,197✔
1168
                if st.precommittedTxID <= committedTxID {
746✔
1169
                        delete(d.replicaStates, uuid)
308✔
1170
                }
308✔
1171
        }
1172

1173
        if newReplicaState.PrecommittedTxID <= committedTxID {
13,164✔
1174
                // as far as the primary is concerned, nothing really new has happened
6,405✔
1175
                return nil
6,405✔
1176
        }
6,405✔
1177

1178
        newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)
354✔
1179

354✔
1180
        replicaSt, ok := d.replicaStates[newReplicaState.UUID]
354✔
1181
        if ok {
395✔
1182
                if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
41✔
1183
                        return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
×
1184
                }
×
1185

1186
                if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
81✔
1187
                        // as of the last informed replica status update, nothing has changed
40✔
1188
                        return nil
40✔
1189
                }
40✔
1190

1191
                // actual replication progress is informed by the replica
1192
                replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
1✔
1193
                replicaSt.precommittedAlh = newReplicaAlh
1✔
1194
        } else {
313✔
1195
                // replica informs first replication state
313✔
1196
                d.replicaStates[newReplicaState.UUID] = &replicaState{
313✔
1197
                        precommittedTxID: newReplicaState.PrecommittedTxID,
313✔
1198
                        precommittedAlh:  newReplicaAlh,
313✔
1199
                }
313✔
1200
        }
313✔
1201

1202
        // check up to which tx enough replicas ack replication and it's safe to commit
1203
        mayCommitUpToTxID := uint64(0)
314✔
1204
        if len(d.replicaStates) > 0 {
628✔
1205
                mayCommitUpToTxID = math.MaxUint64
314✔
1206
        }
314✔
1207

1208
        allowances := 0
314✔
1209

314✔
1210
        // we may clean up replicaStates from those who are lagging behind commit
314✔
1211
        for _, st := range d.replicaStates {
706✔
1212
                if st.precommittedTxID < mayCommitUpToTxID {
706✔
1213
                        mayCommitUpToTxID = st.precommittedTxID
314✔
1214
                }
314✔
1215
                allowances++
392✔
1216
        }
1217

1218
        if allowances >= d.options.syncAcks {
597✔
1219
                err := d.st.AllowCommitUpto(mayCommitUpToTxID)
283✔
1220
                if err != nil {
283✔
1221
                        return err
×
1222
                }
×
1223
        }
1224

1225
        return nil
314✔
1226
}
1227

1228
func (d *db) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) {
8,441✔
1229
        if req == nil {
8,441✔
1230
                return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
×
1231
        }
×
1232

1233
        if d.replicaStates == nil && req.ReplicaState != nil {
8,441✔
1234
                return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
×
1235
        }
×
1236

1237
        tx, err := d.allocTx()
8,441✔
1238
        if err != nil {
8,441✔
1239
                return nil, 0, mayCommitUpToAlh, err
×
1240
        }
×
1241
        defer d.releaseTx(tx)
8,441✔
1242

8,441✔
1243
        committedTxID, committedAlh := d.st.CommittedAlh()
8,441✔
1244
        preCommittedTxID, _ := d.st.PrecommittedAlh()
8,441✔
1245

8,441✔
1246
        if req.ReplicaState != nil {
15,202✔
1247
                if req.ReplicaState.CommittedTxID > 0 {
12,967✔
1248
                        // validate replica commit state
6,206✔
1249
                        if req.ReplicaState.CommittedTxID > committedTxID {
6,206✔
1250
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1251
                        }
×
1252

1253
                        // integrityCheck is currently required to validate Alh
1254
                        expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false, false)
6,206✔
1255
                        if err != nil {
6,206✔
1256
                                return nil, committedTxID, committedAlh, err
×
1257
                        }
×
1258

1259
                        replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
6,206✔
1260

6,206✔
1261
                        if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
6,206✔
1262
                                return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(), fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1263
                        }
×
1264
                }
1265

1266
                if req.ReplicaState.PrecommittedTxID > 0 {
13,130✔
1267
                        // validate replica precommit state
6,369✔
1268
                        if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
6,371✔
1269
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
2✔
1270
                        }
2✔
1271

1272
                        // integrityCheck is currently required to validate Alh
1273
                        expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true, false)
6,367✔
1274
                        if err != nil {
6,367✔
1275
                                return nil, committedTxID, committedAlh, err
×
1276
                        }
×
1277

1278
                        replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
6,367✔
1279

6,367✔
1280
                        if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
6,367✔
1281
                                return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(), fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1282
                        }
×
1283

1284
                        // primary will provide commit state to the replica so it can commit pre-committed transactions
1285
                        if req.ReplicaState.PrecommittedTxID < committedTxID {
7,761✔
1286
                                // if replica is behind current commit state in primary
1,394✔
1287
                                // return the alh up to the point known by the replica.
1,394✔
1288
                                // That way the replica is able to validate is following the right primary.
1,394✔
1289
                                mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
1,394✔
1290
                                mayCommitUpToAlh = replicaPreCommittedAlh
1,394✔
1291
                        } else {
6,367✔
1292
                                mayCommitUpToTxID = committedTxID
4,973✔
1293
                                mayCommitUpToAlh = committedAlh
4,973✔
1294
                        }
4,973✔
1295
                }
1296

1297
                err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
6,759✔
1298
                if err != nil {
6,759✔
1299
                        return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1300
                }
×
1301
        }
1302

1303
        // it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
1304
        // current timeout it's not a special value but at least a relative one
1305
        // note: primary might also be waiting ack from any replica (even this primary may do progress)
1306

1307
        // TODO: under some circumstances, replica might not be able to do further progress until primary
1308
        // has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
1309
        var cancel context.CancelFunc
8,439✔
1310

8,439✔
1311
        if req.ReplicaState != nil {
15,198✔
1312
                ctx, cancel = context.WithTimeout(ctx, d.options.storeOpts.SyncFrequency*4)
6,759✔
1313
                defer cancel()
6,759✔
1314
        }
6,759✔
1315

1316
        err = d.WaitForTx(ctx, req.Tx, req.AllowPreCommitted)
8,439✔
1317
        if ctx.Err() != nil {
9,083✔
1318
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, nil
644✔
1319
        }
644✔
1320
        if err != nil {
7,795✔
1321
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1322
        }
×
1323

1324
        txbs, err = d.st.ExportTx(req.Tx, req.AllowPreCommitted, req.SkipIntegrityCheck, tx)
7,795✔
1325
        if err != nil {
7,795✔
1326
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1327
        }
×
1328

1329
        return txbs, mayCommitUpToTxID, mayCommitUpToAlh, nil
7,795✔
1330
}
1331

1332
func (d *db) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) {
7,725✔
1333
        d.mutex.RLock()
7,725✔
1334
        defer d.mutex.RUnlock()
7,725✔
1335

7,725✔
1336
        if !d.isReplica() {
7,725✔
1337
                return nil, ErrNotReplica
×
1338
        }
×
1339

1340
        hdr, err := d.st.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing)
7,725✔
1341
        if err != nil {
7,755✔
1342
                return nil, err
30✔
1343
        }
30✔
1344

1345
        return schema.TxHeaderToProto(hdr), nil
7,695✔
1346
}
1347

1348
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
1349
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
6,072✔
1350
        d.mutex.RLock()
6,072✔
1351
        defer d.mutex.RUnlock()
6,072✔
1352

6,072✔
1353
        if !d.isReplica() {
6,072✔
1354
                return ErrNotReplica
×
1355
        }
×
1356

1357
        // replica pre-committed state must be consistent with primary
1358

1359
        committedTxID, committedAlh := d.st.CommittedAlh()
6,072✔
1360
        // handling a particular case in an optimized manner
6,072✔
1361
        if committedTxID == txID {
6,377✔
1362
                if committedAlh != alh {
305✔
1363
                        return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1364
                }
×
1365
                return nil
305✔
1366
        }
1367

1368
        hdr, err := d.st.ReadTxHeader(txID, true, false)
5,767✔
1369
        if err != nil {
5,767✔
1370
                return err
×
1371
        }
×
1372

1373
        if hdr.Alh() != alh {
5,767✔
1374
                return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1375
        }
×
1376

1377
        return d.st.AllowCommitUpto(txID)
5,767✔
1378
}
1379

1380
func (d *db) DiscardPrecommittedTxsSince(txID uint64) error {
2✔
1381
        d.mutex.RLock()
2✔
1382
        defer d.mutex.RUnlock()
2✔
1383

2✔
1384
        _, err := d.st.DiscardPrecommittedTxsSince(txID)
2✔
1385

2✔
1386
        return err
2✔
1387
}
2✔
1388

1389
// VerifiableTxByID ...
1390
func (d *db) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) {
1,297✔
1391
        if req == nil {
1,298✔
1392
                return nil, ErrIllegalArguments
1✔
1393
        }
1✔
1394

1395
        lastTxID, _ := d.st.CommittedAlh()
1,296✔
1396
        if lastTxID < req.ProveSinceTx {
1,296✔
1397
                return nil, fmt.Errorf("%w: latest txID=%d is lower than specified as initial tx=%d", ErrIllegalState, lastTxID, req.ProveSinceTx)
×
1398
        }
×
1399

1400
        var snap *store.Snapshot
1,296✔
1401
        var err error
1,296✔
1402

1,296✔
1403
        if !req.KeepReferencesUnresolved {
2,592✔
1404
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
1,296✔
1405
                if err != nil {
1,296✔
1406
                        return nil, err
×
1407
                }
×
1408
                defer snap.Close()
1,296✔
1409
        }
1410

1411
        reqTx, err := d.allocTx()
1,296✔
1412
        if err != nil {
1,296✔
1413
                return nil, err
×
1414
        }
×
1415
        defer d.releaseTx(reqTx)
1,296✔
1416

1,296✔
1417
        err = d.st.ReadTx(req.Tx, false, reqTx)
1,296✔
1418
        if err != nil {
1,296✔
1419
                return nil, err
×
1420
        }
×
1421

1422
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,296✔
1423
        var rootTxHdr *store.TxHeader
1,296✔
1424

1,296✔
1425
        if req.ProveSinceTx == 0 {
1,300✔
1426
                rootTxHdr = reqTx.Header()
4✔
1427
        } else {
1,296✔
1428
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
1,292✔
1429
                if err != nil {
1,292✔
1430
                        return nil, err
×
1431
                }
×
1432
        }
1433

1434
        if req.ProveSinceTx <= req.Tx {
2,591✔
1435
                sourceTxHdr = rootTxHdr
1,295✔
1436
                targetTxHdr = reqTx.Header()
1,295✔
1437
        } else {
1,296✔
1438
                sourceTxHdr = reqTx.Header()
1✔
1439
                targetTxHdr = rootTxHdr
1✔
1440
        }
1✔
1441

1442
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,296✔
1443
        if err != nil {
1,296✔
1444
                return nil, err
×
1445
        }
×
1446

1447
        sReqTx, err := d.serializeTx(ctx, reqTx, req.EntriesSpec, snap, true)
1,296✔
1448
        if err != nil {
1,296✔
1449
                return nil, err
×
1450
        }
×
1451

1452
        return &schema.VerifiableTx{
1,296✔
1453
                Tx:        sReqTx,
1,296✔
1454
                DualProof: schema.DualProofToProto(dualProof),
1,296✔
1455
        }, nil
1,296✔
1456
}
1457

1458
// TxScan ...
1459
func (d *db) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) {
10✔
1460
        if req == nil {
11✔
1461
                return nil, ErrIllegalArguments
1✔
1462
        }
1✔
1463

1464
        if int(req.Limit) > d.maxResultSize {
10✔
1465
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1466
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1467
        }
1✔
1468

1469
        tx, err := d.allocTx()
8✔
1470
        if err != nil {
8✔
1471
                return nil, err
×
1472
        }
×
1473
        defer d.releaseTx(tx)
8✔
1474

8✔
1475
        limit := int(req.Limit)
8✔
1476

8✔
1477
        if req.Limit == 0 {
13✔
1478
                limit = d.maxResultSize
5✔
1479
        }
5✔
1480

1481
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
8✔
1482
        if err != nil {
8✔
1483
                return nil, err
×
1484
        }
×
1485
        defer snap.Close()
8✔
1486

8✔
1487
        txReader, err := d.st.NewTxReader(req.InitialTx, req.Desc, tx)
8✔
1488
        if err != nil {
9✔
1489
                return nil, err
1✔
1490
        }
1✔
1491

1492
        txList := &schema.TxList{}
7✔
1493

7✔
1494
        for l := 1; l <= limit; l++ {
27✔
1495
                tx, err := txReader.Read()
20✔
1496
                if errors.Is(err, store.ErrNoMoreEntries) {
22✔
1497
                        break
2✔
1498
                }
1499
                if err != nil {
18✔
1500
                        return nil, err
×
1501
                }
×
1502

1503
                sTx, err := d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
18✔
1504
                if err != nil {
18✔
1505
                        return nil, err
×
1506
                }
×
1507

1508
                txList.Txs = append(txList.Txs, sTx)
18✔
1509

18✔
1510
                if l == d.maxResultSize {
20✔
1511
                        return txList,
2✔
1512
                                fmt.Errorf("%w: found at least %d entries (maximum limit). "+
2✔
1513
                                        "Pagination over large results can be achieved by using the limit and initialTx arguments",
2✔
1514
                                        ErrResultSizeLimitReached, d.maxResultSize)
2✔
1515
                }
2✔
1516
        }
1517

1518
        return txList, nil
5✔
1519
}
1520

1521
// History ...
1522
func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) {
9✔
1523
        if req == nil {
10✔
1524
                return nil, ErrIllegalArguments
1✔
1525
        }
1✔
1526

1527
        if int(req.Limit) > d.maxResultSize {
9✔
1528
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1529
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1530
        }
1✔
1531

1532
        currTxID, _ := d.st.CommittedAlh()
7✔
1533

7✔
1534
        if req.SinceTx > currTxID {
7✔
1535
                return nil, ErrIllegalArguments
×
1536
        }
×
1537

1538
        waitUntilTx := req.SinceTx
7✔
1539
        if waitUntilTx == 0 {
9✔
1540
                waitUntilTx = currTxID
2✔
1541
        }
2✔
1542

1543
        err := d.WaitForIndexingUpto(ctx, waitUntilTx)
7✔
1544
        if err != nil {
7✔
1545
                return nil, err
×
1546
        }
×
1547

1548
        limit := int(req.Limit)
7✔
1549

7✔
1550
        if req.Limit == 0 {
14✔
1551
                limit = d.maxResultSize
7✔
1552
        }
7✔
1553

1554
        key := EncodeKey(req.Key)
7✔
1555

7✔
1556
        valRefs, hCount, err := d.st.History(key, req.Offset, req.Desc, limit)
7✔
1557
        if err != nil && err != store.ErrOffsetOutOfRange {
8✔
1558
                return nil, err
1✔
1559
        }
1✔
1560

1561
        list := &schema.Entries{
6✔
1562
                Entries: make([]*schema.Entry, len(valRefs)),
6✔
1563
        }
6✔
1564

6✔
1565
        for i, valRef := range valRefs {
114✔
1566
                val, err := valRef.Resolve()
108✔
1567
                if err != nil && err != store.ErrExpiredEntry {
108✔
1568
                        return nil, err
×
1569
                }
×
1570
                if len(val) > 0 {
214✔
1571
                        val = TrimPrefix(val)
106✔
1572
                }
106✔
1573

1574
                list.Entries[i] = &schema.Entry{
108✔
1575
                        Tx:       valRef.Tx(),
108✔
1576
                        Key:      req.Key,
108✔
1577
                        Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
108✔
1578
                        Value:    val,
108✔
1579
                        Expired:  errors.Is(err, store.ErrExpiredEntry),
108✔
1580
                        Revision: valRef.HC(),
108✔
1581
                }
108✔
1582
        }
1583

1584
        if limit == d.maxResultSize && hCount >= uint64(d.maxResultSize) {
8✔
1585
                return list,
2✔
1586
                        fmt.Errorf("%w: found at least %d entries (the maximum limit). "+
2✔
1587
                                "Pagination over large results can be achieved by using the limit and initialTx arguments",
2✔
1588
                                ErrResultSizeLimitReached, d.maxResultSize)
2✔
1589
        }
2✔
1590

1591
        return list, nil
4✔
1592
}
1593

1594
func (d *db) IsClosed() bool {
202✔
1595
        d.closingMutex.Lock()
202✔
1596
        defer d.closingMutex.Unlock()
202✔
1597

202✔
1598
        return d.st.IsClosed()
202✔
1599
}
202✔
1600

1601
// Close ...
1602
func (d *db) Close() (err error) {
709✔
1603
        d.closingMutex.Lock()
709✔
1604
        defer d.closingMutex.Unlock()
709✔
1605

709✔
1606
        d.Logger.Infof("closing database '%s'...", d.name)
709✔
1607

709✔
1608
        defer func() {
1,418✔
1609
                if err == nil {
1,329✔
1610
                        d.Logger.Infof("database '%s' succesfully closed", d.name)
620✔
1611
                } else {
709✔
1612
                        d.Logger.Infof("%v: while closing database '%s'", err, d.name)
89✔
1613
                }
89✔
1614
        }()
1615

1616
        return d.st.Close()
709✔
1617
}
1618

1619
// GetName ...
1620
func (d *db) GetName() string {
54,196✔
1621
        return d.name
54,196✔
1622
}
54,196✔
1623

1624
// GetOptions ...
1625
func (d *db) GetOptions() *Options {
3,082✔
1626
        d.mutex.RLock()
3,082✔
1627
        defer d.mutex.RUnlock()
3,082✔
1628

3,082✔
1629
        return d.options
3,082✔
1630
}
3,082✔
1631

1632
func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
15✔
1633
        d.mutex.Lock()
15✔
1634
        defer d.mutex.Unlock()
15✔
1635

15✔
1636
        d.replicaStatesMutex.Lock()
15✔
1637
        defer d.replicaStatesMutex.Unlock()
15✔
1638

15✔
1639
        d.options.replica = asReplica
15✔
1640
        d.options.syncAcks = syncAcks
15✔
1641
        d.options.syncReplication = syncReplication
15✔
1642

15✔
1643
        if asReplica {
23✔
1644
                d.replicaStates = nil
8✔
1645
        } else if syncAcks > 0 {
18✔
1646
                d.replicaStates = make(map[string]*replicaState, syncAcks)
3✔
1647
        }
3✔
1648

1649
        d.st.SetExternalCommitAllowance(syncReplication)
15✔
1650
}
1651

1652
func (d *db) IsReplica() bool {
436✔
1653
        d.mutex.RLock()
436✔
1654
        defer d.mutex.RUnlock()
436✔
1655

436✔
1656
        return d.isReplica()
436✔
1657
}
436✔
1658

1659
func (d *db) isReplica() bool {
19,511✔
1660
        return d.options.replica
19,511✔
1661
}
19,511✔
1662

1663
func (d *db) IsSyncReplicationEnabled() bool {
8,439✔
1664
        d.mutex.RLock()
8,439✔
1665
        defer d.mutex.RUnlock()
8,439✔
1666

8,439✔
1667
        return d.options.syncReplication
8,439✔
1668
}
8,439✔
1669

1670
func (d *db) SetSyncReplication(enabled bool) {
211✔
1671
        d.mutex.Lock()
211✔
1672
        defer d.mutex.Unlock()
211✔
1673

211✔
1674
        d.st.SetExternalCommitAllowance(enabled)
211✔
1675

211✔
1676
        d.options.syncReplication = enabled
211✔
1677
}
211✔
1678

1679
func logErr(log logger.Logger, formattedMessage string, err error) error {
1✔
1680
        if err != nil {
2✔
1681
                log.Errorf(formattedMessage, err)
1✔
1682
        }
1✔
1683
        return err
1✔
1684
}
1685

1686
// CopyCatalog creates a copy of the sql catalog and returns a transaction
1687
// that can be used to commit the copy.
1688
func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
13✔
1689
        // copy the sql catalog
13✔
1690
        err := d.sqlEngine.CopyCatalogToTx(ctx, tx)
13✔
1691
        if err != nil {
13✔
1692
                return err
×
1693
        }
×
1694

1695
        // copy the document store catalog
1696
        err = d.documentEngine.CopyCatalogToTx(ctx, tx)
13✔
1697
        if err != nil {
13✔
1698
                return err
×
1699
        }
×
1700

1701
        return nil
13✔
1702
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc