• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6507801635

13 Oct 2023 11:46AM UTC coverage: 89.621% (+0.008%) from 89.613%
6507801635

push

gh-ci

jeroiraz
removed unnecessary dependency

33511 of 37392 relevant lines covered (89.62%)

144056.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.5
/pkg/database/database.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26
        "os"
27
        "path/filepath"
28
        "sync"
29
        "time"
30

31
        "github.com/codenotary/immudb/embedded/document"
32
        "github.com/codenotary/immudb/embedded/sql"
33
        "github.com/codenotary/immudb/embedded/store"
34

35
        "github.com/codenotary/immudb/embedded/logger"
36
        "github.com/codenotary/immudb/pkg/api/schema"
37
)
38

39
const MaxKeyResolutionLimit = 1
40
const MaxKeyScanLimit = 1000
41

42
var ErrKeyResolutionLimitReached = errors.New("key resolution limit reached. It may be due to cyclic references")
43
var ErrResultSizeLimitExceeded = errors.New("result size limit exceeded")
44
var ErrResultSizeLimitReached = errors.New("result size limit reached")
45
var ErrIllegalArguments = store.ErrIllegalArguments
46
var ErrIllegalState = store.ErrIllegalState
47
var ErrIsReplica = errors.New("database is read-only because it's a replica")
48
var ErrNotReplica = errors.New("database is NOT a replica")
49
var ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
50
var ErrInvalidRevision = errors.New("invalid key revision number")
51

52
type DB interface {
53
        GetName() string
54

55
        // Setttings
56
        GetOptions() *Options
57

58
        Path() string
59

60
        AsReplica(asReplica, syncReplication bool, syncAcks int)
61
        IsReplica() bool
62

63
        IsSyncReplicationEnabled() bool
64
        SetSyncReplication(enabled bool)
65

66
        MaxResultSize() int
67

68
        // State
69
        Health() (waitingCount int, lastReleaseAt time.Time)
70
        CurrentState() (*schema.ImmutableState, error)
71

72
        Size() (uint64, error)
73

74
        // Key-Value
75
        Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error)
76
        VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error)
77

78
        Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error)
79
        VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error)
80
        GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error)
81

82
        Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error)
83

84
        SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error)
85
        VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error)
86

87
        Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error)
88

89
        History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error)
90

91
        ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error)
92

93
        Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error)
94
        CountAll(ctx context.Context) (*schema.EntryCount, error)
95

96
        ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error)
97
        VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error)
98
        ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error)
99

100
        // SQL-related
101
        NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error)
102

103
        SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
104
        SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
105

106
        InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error)
107
        InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error)
108

109
        SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (*schema.SQLQueryResult, error)
110
        SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, namedParams []*schema.NamedParam) (*schema.SQLQueryResult, error)
111
        SQLQueryRowReader(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error)
112

113
        VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error)
114

115
        ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error)
116
        DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error)
117

118
        // Transactional layer
119
        WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error
120
        WaitForIndexingUpto(ctx context.Context, txID uint64) error
121

122
        TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error)
123
        ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error)
124
        ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error)
125
        AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error
126
        DiscardPrecommittedTxsSince(txID uint64) error
127

128
        VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error)
129
        TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error)
130

131
        // Maintenance
132
        FlushIndex(req *schema.FlushIndexRequest) error
133
        CompactIndex() error
134

135
        IsClosed() bool
136
        Close() error
137

138
        DocumentDatabase
139
}
140

141
type replicaState struct {
142
        precommittedTxID uint64
143
        precommittedAlh  [sha256.Size]byte
144
}
145

146
// IDB database instance
147
type db struct {
148
        st *store.ImmuStore
149

150
        sqlEngine      *sql.Engine
151
        documentEngine *document.Engine
152

153
        mutex        *instrumentedRWMutex
154
        closingMutex sync.Mutex
155

156
        Logger  logger.Logger
157
        options *Options
158

159
        name string
160

161
        maxResultSize int
162

163
        txPool store.TxPool
164

165
        replicaStates      map[string]*replicaState
166
        replicaStatesMutex sync.Mutex
167
}
168

169
// OpenDB Opens an existing Database from disk
170
func OpenDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log logger.Logger) (DB, error) {
37✔
171
        if dbName == "" {
38✔
172
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
1✔
173
        }
1✔
174

175
        log.Infof("opening database '%s' {replica = %v}...", dbName, op.replica)
36✔
176

36✔
177
        var replicaStates map[string]*replicaState
36✔
178
        // replica states are only managed in primary with synchronous replication
36✔
179
        if !op.replica && op.syncAcks > 0 {
37✔
180
                replicaStates = make(map[string]*replicaState, op.syncAcks)
1✔
181
        }
1✔
182

183
        dbi := &db{
36✔
184
                Logger:        log,
36✔
185
                options:       op,
36✔
186
                name:          dbName,
36✔
187
                replicaStates: replicaStates,
36✔
188
                maxResultSize: MaxKeyScanLimit,
36✔
189
                mutex:         &instrumentedRWMutex{},
36✔
190
        }
36✔
191

36✔
192
        dbDir := dbi.Path()
36✔
193
        _, err := os.Stat(dbDir)
36✔
194
        if os.IsNotExist(err) {
37✔
195
                return nil, fmt.Errorf("missing database directories: %s", dbDir)
1✔
196
        }
1✔
197

198
        stOpts := op.GetStoreOptions().
35✔
199
                WithLogger(log).
35✔
200
                WithMultiIndexing(true).
35✔
201
                WithExternalCommitAllowance(op.syncReplication)
35✔
202

35✔
203
        dbi.st, err = store.Open(dbDir, stOpts)
35✔
204
        if err != nil {
35✔
205
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
206
        }
×
207

208
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
105✔
209
                err := dbi.st.InitIndexing(&store.IndexSpec{
70✔
210
                        SourcePrefix: []byte{prefix},
70✔
211
                        TargetPrefix: []byte{prefix},
70✔
212
                })
70✔
213
                if err != nil {
70✔
214
                        dbi.st.Close()
×
215
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
216
                }
×
217
        }
218

219
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, op.replica)
35✔
220

35✔
221
        sqlOpts := sql.DefaultOptions().
35✔
222
                WithPrefix([]byte{SQLPrefix}).
35✔
223
                WithMultiDBHandler(multidbHandler)
35✔
224

35✔
225
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
35✔
226
        if err != nil {
35✔
227
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, op.replica, err)
×
228
                return nil, err
×
229
        }
×
230
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, op.replica)
35✔
231

35✔
232
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
35✔
233
        if err != nil {
35✔
234
                return nil, err
×
235
        }
×
236
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, op.replica)
35✔
237

35✔
238
        txPool, err := dbi.st.NewTxHolderPool(op.readTxPoolSize, false)
35✔
239
        if err != nil {
35✔
240
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
241
        }
×
242
        dbi.txPool = txPool
35✔
243

35✔
244
        if op.replica {
39✔
245
                dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, op.replica)
4✔
246
                return dbi, nil
4✔
247
        }
4✔
248

249
        dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, op.replica)
31✔
250

31✔
251
        return dbi, nil
31✔
252
}
253

254
func (d *db) Path() string {
38✔
255
        return filepath.Join(d.options.GetDBRootPath(), d.GetName())
38✔
256
}
38✔
257

258
func (d *db) allocTx() (*store.Tx, error) {
11,395✔
259
        tx, err := d.txPool.Alloc()
11,395✔
260
        if errors.Is(err, store.ErrTxPoolExhausted) {
11,395✔
261
                return nil, ErrTxReadPoolExhausted
×
262
        }
×
263
        return tx, err
11,395✔
264
}
265

266
func (d *db) releaseTx(tx *store.Tx) {
11,395✔
267
        d.txPool.Release(tx)
11,395✔
268
}
11,395✔
269

270
// NewDB Creates a new Database along with it's directories and files
271
func NewDB(dbName string, multidbHandler sql.MultiDBHandler, op *Options, log logger.Logger) (DB, error) {
640✔
272
        if dbName == "" {
640✔
273
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
×
274
        }
×
275

276
        log.Infof("creating database '%s' {replica = %v}...", dbName, op.replica)
640✔
277

640✔
278
        var replicaStates map[string]*replicaState
640✔
279
        // replica states are only managed in primary with synchronous replication
640✔
280
        if !op.replica && op.syncAcks > 0 {
651✔
281
                replicaStates = make(map[string]*replicaState, op.syncAcks)
11✔
282
        }
11✔
283

284
        dbi := &db{
640✔
285
                Logger:        log,
640✔
286
                options:       op,
640✔
287
                name:          dbName,
640✔
288
                replicaStates: replicaStates,
640✔
289
                maxResultSize: MaxKeyScanLimit,
640✔
290
                mutex:         &instrumentedRWMutex{},
640✔
291
        }
640✔
292

640✔
293
        dbDir := filepath.Join(op.GetDBRootPath(), dbName)
640✔
294

640✔
295
        _, err := os.Stat(dbDir)
640✔
296
        if err == nil {
641✔
297
                return nil, fmt.Errorf("database directories already exist: %s", dbDir)
1✔
298
        }
1✔
299

300
        if err = os.MkdirAll(dbDir, os.ModePerm); err != nil {
640✔
301
                return nil, logErr(dbi.Logger, "unable to create data folder: %s", err)
1✔
302
        }
1✔
303

304
        stOpts := op.GetStoreOptions().
638✔
305
                WithExternalCommitAllowance(op.syncReplication).
638✔
306
                WithMultiIndexing(true).
638✔
307
                WithLogger(log)
638✔
308

638✔
309
        dbi.st, err = store.Open(dbDir, stOpts)
638✔
310
        if err != nil {
638✔
311
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
312
        }
×
313

314
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,914✔
315
                err := dbi.st.InitIndexing(&store.IndexSpec{
1,276✔
316
                        SourcePrefix: []byte{prefix},
1,276✔
317
                        TargetPrefix: []byte{prefix},
1,276✔
318
                })
1,276✔
319
                if err != nil {
1,276✔
320
                        dbi.st.Close()
×
321
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
322
                }
×
323
        }
324

325
        txPool, err := dbi.st.NewTxHolderPool(op.readTxPoolSize, false)
638✔
326
        if err != nil {
638✔
327
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
328
        }
×
329
        dbi.txPool = txPool
638✔
330

638✔
331
        sqlOpts := sql.DefaultOptions().
638✔
332
                WithPrefix([]byte{SQLPrefix}).
638✔
333
                WithMultiDBHandler(multidbHandler)
638✔
334

638✔
335
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, op.replica)
638✔
336

638✔
337
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
638✔
338
        if err != nil {
638✔
339
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, op.replica, err)
×
340
                return nil, err
×
341
        }
×
342
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, op.replica)
638✔
343

638✔
344
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
638✔
345
        if err != nil {
638✔
346
                return nil, logErr(dbi.Logger, "Unable to open database: %s", err)
×
347
        }
×
348
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, op.replica)
638✔
349

638✔
350
        dbi.Logger.Infof("database '%s' successfully created {replica = %v}", dbName, op.replica)
638✔
351

638✔
352
        return dbi, nil
638✔
353
}
354

355
func (d *db) MaxResultSize() int {
24✔
356
        return d.maxResultSize
24✔
357
}
24✔
358

359
func (d *db) FlushIndex(req *schema.FlushIndexRequest) error {
6✔
360
        if req == nil {
7✔
361
                return store.ErrIllegalArguments
1✔
362
        }
1✔
363

364
        return d.st.FlushIndexes(req.CleanupPercentage, req.Synced)
5✔
365
}
366

367
// CompactIndex ...
368
func (d *db) CompactIndex() error {
4✔
369
        return d.st.CompactIndexes()
4✔
370
}
4✔
371

372
// Set ...
373
func (d *db) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,672✔
374
        d.mutex.RLock()
4,672✔
375
        defer d.mutex.RUnlock()
4,672✔
376

4,672✔
377
        if d.isReplica() {
4,675✔
378
                return nil, ErrIsReplica
3✔
379
        }
3✔
380

381
        return d.set(ctx, req)
4,669✔
382
}
383

384
func (d *db) set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,669✔
385
        if req == nil {
4,670✔
386
                return nil, ErrIllegalArguments
1✔
387
        }
1✔
388

389
        tx, err := d.st.NewWriteOnlyTx(ctx)
4,668✔
390
        if err != nil {
4,668✔
391
                return nil, err
×
392
        }
×
393
        defer tx.Cancel()
4,668✔
394

4,668✔
395
        keys := make(map[[sha256.Size]byte]struct{}, len(req.KVs))
4,668✔
396

4,668✔
397
        for _, kv := range req.KVs {
10,631✔
398
                if len(kv.Key) == 0 {
5,965✔
399
                        return nil, ErrIllegalArguments
2✔
400
                }
2✔
401

402
                kid := sha256.Sum256(kv.Key)
5,961✔
403
                _, ok := keys[kid]
5,961✔
404
                if ok {
5,962✔
405
                        return nil, schema.ErrDuplicatedKeysNotSupported
1✔
406
                }
1✔
407
                keys[kid] = struct{}{}
5,960✔
408

5,960✔
409
                e := EncodeEntrySpec(
5,960✔
410
                        kv.Key,
5,960✔
411
                        schema.KVMetadataFromProto(kv.Metadata),
5,960✔
412
                        kv.Value,
5,960✔
413
                )
5,960✔
414

5,960✔
415
                err = tx.Set(e.Key, e.Metadata, e.Value)
5,960✔
416
                if err != nil {
5,961✔
417
                        return nil, err
1✔
418
                }
1✔
419
        }
420

421
        for i := range req.Preconditions {
5,735✔
422

1,071✔
423
                c, err := PreconditionFromProto(req.Preconditions[i])
1,071✔
424
                if err != nil {
1,072✔
425
                        return nil, err
1✔
426
                }
1✔
427

428
                err = tx.AddPrecondition(c)
1,070✔
429
                if err != nil {
1,071✔
430
                        return nil, fmt.Errorf("%w: %v", store.ErrInvalidPrecondition, err)
1✔
431
                }
1✔
432
        }
433

434
        var hdr *store.TxHeader
4,662✔
435

4,662✔
436
        if req.NoWait {
4,662✔
437
                hdr, err = tx.AsyncCommit(ctx)
×
438
        } else {
4,662✔
439
                hdr, err = tx.Commit(ctx)
4,662✔
440
        }
4,662✔
441
        if err != nil {
4,693✔
442
                return nil, err
31✔
443
        }
31✔
444

445
        return schema.TxHeaderToProto(hdr), nil
4,631✔
446
}
447

448
func checkKeyRequest(req *schema.KeyRequest) error {
3,072✔
449
        if req == nil {
3,073✔
450
                return fmt.Errorf(
1✔
451
                        "%w: empty request",
1✔
452
                        ErrIllegalArguments,
1✔
453
                )
1✔
454
        }
1✔
455

456
        if len(req.Key) == 0 {
3,073✔
457
                return fmt.Errorf(
2✔
458
                        "%w: empty key",
2✔
459
                        ErrIllegalArguments,
2✔
460
                )
2✔
461
        }
2✔
462

463
        if req.AtTx > 0 {
3,094✔
464
                if req.SinceTx > 0 {
27✔
465
                        return fmt.Errorf(
2✔
466
                                "%w: SinceTx should not be specified when AtTx is used",
2✔
467
                                ErrIllegalArguments,
2✔
468
                        )
2✔
469
                }
2✔
470

471
                if req.AtRevision != 0 {
24✔
472
                        return fmt.Errorf(
1✔
473
                                "%w: AtRevision should not be specified when AtTx is used",
1✔
474
                                ErrIllegalArguments,
1✔
475
                        )
1✔
476
                }
1✔
477
        }
478

479
        return nil
3,066✔
480
}
481

482
// Get ...
483
func (d *db) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) {
3,068✔
484
        err := checkKeyRequest(req)
3,068✔
485
        if err != nil {
3,070✔
486
                return nil, err
2✔
487
        }
2✔
488

489
        currTxID, _ := d.st.CommittedAlh()
3,066✔
490
        if req.SinceTx > currTxID {
3,067✔
491
                return nil, fmt.Errorf(
1✔
492
                        "%w: SinceTx must not be greater than the current transaction ID",
1✔
493
                        ErrIllegalArguments,
1✔
494
                )
1✔
495
        }
1✔
496

497
        if !req.NoWait && req.AtTx == 0 {
6,108✔
498
                waitUntilTx := req.SinceTx
3,043✔
499
                if waitUntilTx == 0 {
4,989✔
500
                        waitUntilTx = currTxID
1,946✔
501
                }
1,946✔
502

503
                err := d.WaitForIndexingUpto(ctx, waitUntilTx)
3,043✔
504
                if err != nil {
3,045✔
505
                        return nil, err
2✔
506
                }
2✔
507
        }
508

509
        if req.AtRevision != 0 {
3,094✔
510
                return d.getAtRevision(ctx, EncodeKey(req.Key), req.AtRevision, true)
31✔
511
        }
31✔
512

513
        return d.getAtTx(ctx, EncodeKey(req.Key), req.AtTx, 0, d.st, 0, true)
3,032✔
514
}
515

516
func (d *db) get(ctx context.Context, key []byte, index store.KeyIndex, skipIntegrityCheck bool) (*schema.Entry, error) {
10✔
517
        return d.getAtTx(ctx, key, 0, 0, index, 0, skipIntegrityCheck)
10✔
518
}
10✔
519

520
func (d *db) getAtTx(
521
        ctx context.Context,
522
        key []byte,
523
        atTx uint64,
524
        resolved int,
525
        index store.KeyIndex,
526
        revision uint64,
527
        skipIntegrityCheck bool,
528
) (entry *schema.Entry, err error) {
5,580✔
529

5,580✔
530
        var txID uint64
5,580✔
531
        var val []byte
5,580✔
532
        var md *store.KVMetadata
5,580✔
533

5,580✔
534
        if atTx == 0 {
9,268✔
535
                valRef, err := index.Get(ctx, key)
3,688✔
536
                if err != nil {
3,762✔
537
                        return nil, err
74✔
538
                }
74✔
539

540
                txID = valRef.Tx()
3,614✔
541
                revision = valRef.HC()
3,614✔
542
                md = valRef.KVMetadata()
3,614✔
543

3,614✔
544
                val, err = valRef.Resolve()
3,614✔
545
                if err != nil {
3,618✔
546
                        return nil, err
4✔
547
                }
4✔
548
        } else {
1,892✔
549
                txID = atTx
1,892✔
550

1,892✔
551
                md, val, err = d.readMetadataAndValue(key, atTx, skipIntegrityCheck)
1,892✔
552
                if err != nil {
1,987✔
553
                        return nil, err
95✔
554
                }
95✔
555
        }
556

557
        return d.resolveValue(ctx, key, val, resolved, txID, md, index, revision, skipIntegrityCheck)
5,407✔
558
}
559

560
func (d *db) readMetadataAndValue(key []byte, atTx uint64, skipIntegrityCheck bool) (*store.KVMetadata, []byte, error) {
1,892✔
561
        entry, _, err := d.st.ReadTxEntry(atTx, key, skipIntegrityCheck)
1,892✔
562
        if err != nil {
1,978✔
563
                return nil, nil, err
86✔
564
        }
86✔
565

566
        v, err := d.st.ReadValue(entry)
1,806✔
567
        if err != nil {
1,815✔
568
                return nil, nil, err
9✔
569
        }
9✔
570

571
        return entry.Metadata(), v, nil
1,797✔
572
}
573

574
func (d *db) getAtRevision(ctx context.Context, key []byte, atRevision int64, skipIntegrityCheck bool) (entry *schema.Entry, err error) {
31✔
575
        var offset uint64
31✔
576
        var desc bool
31✔
577

31✔
578
        if atRevision > 0 {
47✔
579
                offset = uint64(atRevision) - 1
16✔
580
                desc = false
16✔
581
        } else {
31✔
582
                offset = -uint64(atRevision)
15✔
583
                desc = true
15✔
584
        }
15✔
585

586
        valRefs, hCount, err := d.st.History(key, offset, desc, 1)
31✔
587
        if errors.Is(err, store.ErrNoMoreEntries) || errors.Is(err, store.ErrOffsetOutOfRange) {
35✔
588
                return nil, ErrInvalidRevision
4✔
589
        }
4✔
590
        if err != nil {
28✔
591
                return nil, err
1✔
592
        }
1✔
593

594
        if atRevision < 0 {
39✔
595
                atRevision = int64(hCount) + atRevision
13✔
596
        }
13✔
597

598
        entry, err = d.getAtTx(ctx, key, valRefs[0].Tx(), 0, d.st, uint64(atRevision), skipIntegrityCheck)
26✔
599
        if err != nil {
27✔
600
                return nil, err
1✔
601
        }
1✔
602

603
        return entry, err
25✔
604
}
605

606
func (d *db) resolveValue(
607
        ctx context.Context,
608
        key []byte,
609
        val []byte,
610
        resolved int,
611
        txID uint64,
612
        md *store.KVMetadata,
613
        index store.KeyIndex,
614
        revision uint64,
615
        skipIntegrityCheck bool,
616
) (entry *schema.Entry, err error) {
5,416✔
617
        if md != nil && md.Deleted() {
5,418✔
618
                return nil, store.ErrKeyNotFound
2✔
619
        }
2✔
620

621
        if len(val) < 1 {
5,414✔
622
                return nil, fmt.Errorf("%w: internal value consistency error - missing value prefix", store.ErrCorruptedData)
×
623
        }
×
624

625
        // Reference lookup
626
        if val[0] == ReferenceValuePrefix {
5,626✔
627
                if len(val) < 1+8 {
212✔
628
                        return nil, fmt.Errorf("%w: internal value consistency error - invalid reference", store.ErrCorruptedData)
×
629
                }
×
630

631
                if resolved == MaxKeyResolutionLimit {
214✔
632
                        return nil, ErrKeyResolutionLimitReached
2✔
633
                }
2✔
634

635
                atTx := binary.BigEndian.Uint64(TrimPrefix(val))
210✔
636
                refKey := make([]byte, len(val)-1-8)
210✔
637
                copy(refKey, val[1+8:])
210✔
638

210✔
639
                if index != nil {
419✔
640
                        entry, err = d.getAtTx(ctx, refKey, atTx, resolved+1, index, 0, skipIntegrityCheck)
209✔
641
                        if err != nil {
210✔
642
                                return nil, err
1✔
643
                        }
1✔
644
                } else {
1✔
645
                        entry = &schema.Entry{
1✔
646
                                Key: TrimPrefix(refKey),
1✔
647
                                Tx:  atTx,
1✔
648
                        }
1✔
649
                }
1✔
650

651
                entry.ReferencedBy = &schema.Reference{
209✔
652
                        Tx:       txID,
209✔
653
                        Key:      TrimPrefix(key),
209✔
654
                        Metadata: schema.KVMetadataToProto(md),
209✔
655
                        AtTx:     atTx,
209✔
656
                        Revision: revision,
209✔
657
                }
209✔
658

209✔
659
                return entry, nil
209✔
660
        }
661

662
        return &schema.Entry{
5,202✔
663
                Tx:       txID,
5,202✔
664
                Key:      TrimPrefix(key),
5,202✔
665
                Metadata: schema.KVMetadataToProto(md),
5,202✔
666
                Value:    TrimPrefix(val),
5,202✔
667
                Revision: revision,
5,202✔
668
        }, nil
5,202✔
669
}
670

671
func (d *db) Health() (waitingCount int, lastReleaseAt time.Time) {
1✔
672
        return d.mutex.State()
1✔
673
}
1✔
674

675
// CurrentState ...
676
func (d *db) CurrentState() (*schema.ImmutableState, error) {
16,220✔
677
        lastTxID, lastTxAlh := d.st.CommittedAlh()
16,220✔
678
        lastPreTxID, lastPreTxAlh := d.st.PrecommittedAlh()
16,220✔
679

16,220✔
680
        return &schema.ImmutableState{
16,220✔
681
                TxId:               lastTxID,
16,220✔
682
                TxHash:             lastTxAlh[:],
16,220✔
683
                PrecommittedTxId:   lastPreTxID,
16,220✔
684
                PrecommittedTxHash: lastPreTxAlh[:],
16,220✔
685
        }, nil
16,220✔
686
}
16,220✔
687

688
// WaitForTx blocks caller until specified tx
689
func (d *db) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error {
8,877✔
690
        return d.st.WaitForTx(ctx, txID, allowPrecommitted)
8,877✔
691
}
8,877✔
692

693
// WaitForIndexingUpto blocks caller until specified tx gets indexed
694
func (d *db) WaitForIndexingUpto(ctx context.Context, txID uint64) error {
3,050✔
695
        return d.st.WaitForIndexingUpto(ctx, txID)
3,050✔
696
}
3,050✔
697

698
// VerifiableSet ...
699
func (d *db) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) {
48✔
700
        if req == nil {
49✔
701
                return nil, ErrIllegalArguments
1✔
702
        }
1✔
703

704
        lastTxID, _ := d.st.CommittedAlh()
47✔
705
        if lastTxID < req.ProveSinceTx {
48✔
706
                return nil, ErrIllegalState
1✔
707
        }
1✔
708

709
        // Preallocate tx buffers
710
        lastTx, err := d.allocTx()
46✔
711
        if err != nil {
46✔
712
                return nil, err
×
713
        }
×
714
        defer d.releaseTx(lastTx)
46✔
715

46✔
716
        txhdr, err := d.Set(ctx, req.SetRequest)
46✔
717
        if err != nil {
47✔
718
                return nil, err
1✔
719
        }
1✔
720

721
        err = d.st.ReadTx(uint64(txhdr.Id), false, lastTx)
45✔
722
        if err != nil {
45✔
723
                return nil, err
×
724
        }
×
725

726
        var prevTxHdr *store.TxHeader
45✔
727

45✔
728
        if req.ProveSinceTx == 0 {
65✔
729
                prevTxHdr = lastTx.Header()
20✔
730
        } else {
45✔
731
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
25✔
732
                if err != nil {
25✔
733
                        return nil, err
×
734
                }
×
735
        }
736

737
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
45✔
738
        if err != nil {
45✔
739
                return nil, err
×
740
        }
×
741

742
        return &schema.VerifiableTx{
45✔
743
                Tx:        schema.TxToProto(lastTx),
45✔
744
                DualProof: schema.DualProofToProto(dualProof),
45✔
745
        }, nil
45✔
746
}
747

748
// VerifiableGet ...
749
func (d *db) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) {
1,038✔
750
        if req == nil {
1,039✔
751
                return nil, ErrIllegalArguments
1✔
752
        }
1✔
753

754
        lastTxID, _ := d.st.CommittedAlh()
1,037✔
755
        if lastTxID < req.ProveSinceTx {
1,038✔
756
                return nil, ErrIllegalState
1✔
757
        }
1✔
758

759
        e, err := d.Get(ctx, req.KeyRequest)
1,036✔
760
        if err != nil {
1,040✔
761
                return nil, err
4✔
762
        }
4✔
763

764
        var vTxID uint64
1,032✔
765
        var vKey []byte
1,032✔
766

1,032✔
767
        if e.ReferencedBy == nil {
2,059✔
768
                vTxID = e.Tx
1,027✔
769
                vKey = e.Key
1,027✔
770
        } else {
1,032✔
771
                vTxID = e.ReferencedBy.Tx
5✔
772
                vKey = e.ReferencedBy.Key
5✔
773
        }
5✔
774

775
        // key-value inclusion proof
776
        tx, err := d.allocTx()
1,032✔
777
        if err != nil {
1,032✔
778
                return nil, err
×
779
        }
×
780
        defer d.releaseTx(tx)
1,032✔
781

1,032✔
782
        err = d.st.ReadTx(vTxID, false, tx)
1,032✔
783
        if err != nil {
1,032✔
784
                return nil, err
×
785
        }
×
786

787
        var rootTxHdr *store.TxHeader
1,032✔
788

1,032✔
789
        if req.ProveSinceTx == 0 {
2,043✔
790
                rootTxHdr = tx.Header()
1,011✔
791
        } else {
1,032✔
792
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
21✔
793
                if err != nil {
21✔
794
                        return nil, err
×
795
                }
×
796
        }
797

798
        inclusionProof, err := tx.Proof(EncodeKey(vKey))
1,032✔
799
        if err != nil {
1,032✔
800
                return nil, err
×
801
        }
×
802

803
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,032✔
804

1,032✔
805
        if req.ProveSinceTx <= vTxID {
2,058✔
806
                sourceTxHdr = rootTxHdr
1,026✔
807
                targetTxHdr = tx.Header()
1,026✔
808
        } else {
1,032✔
809
                sourceTxHdr = tx.Header()
6✔
810
                targetTxHdr = rootTxHdr
6✔
811
        }
6✔
812

813
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,032✔
814
        if err != nil {
1,032✔
815
                return nil, err
×
816
        }
×
817

818
        verifiableTx := &schema.VerifiableTx{
1,032✔
819
                Tx:        schema.TxToProto(tx),
1,032✔
820
                DualProof: schema.DualProofToProto(dualProof),
1,032✔
821
        }
1,032✔
822

1,032✔
823
        return &schema.VerifiableEntry{
1,032✔
824
                Entry:          e,
1,032✔
825
                VerifiableTx:   verifiableTx,
1,032✔
826
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
1,032✔
827
        }, nil
1,032✔
828
}
829

830
func (d *db) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) {
9✔
831
        if req == nil {
10✔
832
                return nil, ErrIllegalArguments
1✔
833
        }
1✔
834

835
        d.mutex.RLock()
8✔
836
        defer d.mutex.RUnlock()
8✔
837

8✔
838
        if d.isReplica() {
8✔
839
                return nil, ErrIsReplica
×
840
        }
×
841

842
        opts := store.DefaultTxOptions()
8✔
843

8✔
844
        if req.SinceTx > 0 {
9✔
845
                opts.WithSnapshotMustIncludeTxID(func(_ uint64) uint64 {
2✔
846
                        return req.SinceTx
1✔
847
                })
1✔
848
        }
849

850
        tx, err := d.st.NewTx(ctx, opts)
8✔
851
        if err != nil {
8✔
852
                return nil, err
×
853
        }
×
854
        defer tx.Cancel()
8✔
855

8✔
856
        for _, k := range req.Keys {
15✔
857
                if len(k) == 0 {
7✔
858
                        return nil, ErrIllegalArguments
×
859
                }
×
860

861
                md := store.NewKVMetadata()
7✔
862

7✔
863
                md.AsDeleted(true)
7✔
864

7✔
865
                e := EncodeEntrySpec(k, md, nil)
7✔
866

7✔
867
                err = tx.Delete(ctx, e.Key)
7✔
868
                if err != nil {
9✔
869
                        return nil, err
2✔
870
                }
2✔
871
        }
872

873
        var hdr *store.TxHeader
6✔
874
        if req.NoWait {
6✔
875
                hdr, err = tx.AsyncCommit(ctx)
×
876
        } else {
6✔
877
                hdr, err = tx.Commit(ctx)
6✔
878
        }
6✔
879
        if err != nil {
7✔
880
                return nil, err
1✔
881
        }
1✔
882

883
        return schema.TxHeaderToProto(hdr), nil
5✔
884
}
885

886
// GetAll ...
887
func (d *db) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) {
4✔
888
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
4✔
889
        if err != nil {
4✔
890
                return nil, err
×
891
        }
×
892
        defer snap.Close()
4✔
893

4✔
894
        list := &schema.Entries{}
4✔
895

4✔
896
        for _, key := range req.Keys {
14✔
897
                e, err := d.get(ctx, EncodeKey(key), snap, true)
10✔
898
                if err == nil || errors.Is(err, store.ErrKeyNotFound) {
20✔
899
                        if e != nil {
19✔
900
                                list.Entries = append(list.Entries, e)
9✔
901
                        }
9✔
902
                } else {
×
903
                        return nil, err
×
904
                }
×
905
        }
906

907
        return list, nil
4✔
908
}
909

910
// Size ...
911
func (d *db) Size() (uint64, error) {
218✔
912
        return d.st.TxCount(), nil
218✔
913
}
218✔
914

915
// Count ...
916
func (d *db) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) {
8✔
917
        if prefix == nil {
10✔
918
                return nil, ErrIllegalArguments
2✔
919
        }
2✔
920

921
        tx, err := d.st.NewTx(ctx, store.DefaultTxOptions().WithMode(store.ReadOnlyTx))
6✔
922
        if err != nil {
6✔
923
                return nil, err
×
924
        }
×
925
        defer tx.Cancel()
6✔
926

6✔
927
        keyReader, err := tx.NewKeyReader(store.KeyReaderSpec{
6✔
928
                Prefix: WrapWithPrefix(prefix.Prefix, SetKeyPrefix),
6✔
929
        })
6✔
930
        if err != nil {
6✔
931
                return nil, err
×
932
        }
×
933
        defer keyReader.Close()
6✔
934

6✔
935
        count := 0
6✔
936

6✔
937
        for {
19✔
938
                _, _, err := keyReader.Read(ctx)
13✔
939
                if errors.Is(err, store.ErrNoMoreEntries) {
19✔
940
                        break
6✔
941
                }
942
                if err != nil {
7✔
943
                        return nil, err
×
944
                }
×
945

946
                count++
7✔
947
        }
948

949
        return &schema.EntryCount{Count: uint64(count)}, nil
6✔
950
}
951

952
// CountAll ...
953
func (d *db) CountAll(ctx context.Context) (*schema.EntryCount, error) {
4✔
954
        return d.Count(ctx, &schema.KeyPrefix{})
4✔
955
}
4✔
956

957
// TxByID ...
958
func (d *db) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) {
61✔
959
        if req == nil {
62✔
960
                return nil, ErrIllegalArguments
1✔
961
        }
1✔
962

963
        var snap *store.Snapshot
60✔
964
        var err error
60✔
965

60✔
966
        tx, err := d.allocTx()
60✔
967
        if err != nil {
60✔
968
                return nil, err
×
969
        }
×
970
        defer d.releaseTx(tx)
60✔
971

60✔
972
        if !req.KeepReferencesUnresolved {
118✔
973
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
58✔
974
                if err != nil {
58✔
975
                        return nil, err
×
976
                }
×
977
                defer snap.Close()
58✔
978
        }
979

980
        // key-value inclusion proof
981
        err = d.st.ReadTx(req.Tx, false, tx)
60✔
982
        if err != nil {
60✔
983
                return nil, err
×
984
        }
×
985

986
        return d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
60✔
987
}
988

989
func (d *db) snapshotSince(ctx context.Context, prefix []byte, txID uint64) (*store.Snapshot, error) {
1,694✔
990
        currTxID, _ := d.st.CommittedAlh()
1,694✔
991

1,694✔
992
        if txID > currTxID {
1,694✔
993
                return nil, ErrIllegalArguments
×
994
        }
×
995

996
        waitUntilTx := txID
1,694✔
997
        if waitUntilTx == 0 {
3,322✔
998
                waitUntilTx = currTxID
1,628✔
999
        }
1,628✔
1000

1001
        return d.st.SnapshotMustIncludeTxID(ctx, prefix, waitUntilTx)
1,694✔
1002
}
1003

1004
func (d *db) serializeTx(ctx context.Context, tx *store.Tx, spec *schema.EntriesSpec, snap *store.Snapshot, skipIntegrityCheck bool) (*schema.Tx, error) {
1,374✔
1005
        if spec == nil {
2,238✔
1006
                return schema.TxToProto(tx), nil
864✔
1007
        }
864✔
1008

1009
        stx := &schema.Tx{
510✔
1010
                Header: schema.TxHeaderToProto(tx.Header()),
510✔
1011
        }
510✔
1012

510✔
1013
        for _, e := range tx.Entries() {
1,048✔
1014
                switch e.Key()[0] {
538✔
1015
                case SetKeyPrefix:
39✔
1016
                        {
78✔
1017
                                if spec.KvEntriesSpec == nil || spec.KvEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
57✔
1018
                                        break
18✔
1019
                                }
1020

1021
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
24✔
1022
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
3✔
1023
                                        break
3✔
1024
                                }
1025

1026
                                v, err := d.st.ReadValue(e)
18✔
1027
                                if errors.Is(err, store.ErrExpiredEntry) {
18✔
1028
                                        break
×
1029
                                }
1030
                                if err != nil {
18✔
1031
                                        return nil, err
×
1032
                                }
×
1033

1034
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
27✔
1035
                                        kve := schema.TxEntryToProto(e)
9✔
1036
                                        kve.Value = v
9✔
1037
                                        stx.Entries = append(stx.Entries, kve)
9✔
1038
                                        break
9✔
1039
                                }
1040

1041
                                // resolve entry
1042
                                var index store.KeyIndex
9✔
1043
                                if snap != nil {
16✔
1044
                                        index = snap
7✔
1045
                                }
7✔
1046

1047
                                kve, err := d.resolveValue(ctx, e.Key(), v, 0, tx.Header().ID, e.Metadata(), index, 0, skipIntegrityCheck)
9✔
1048
                                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
10✔
1049
                                        break // ignore deleted ones (referenced key may have been deleted)
1✔
1050
                                }
1051
                                if err != nil {
8✔
1052
                                        return nil, err
×
1053
                                }
×
1054

1055
                                stx.KvEntries = append(stx.KvEntries, kve)
8✔
1056
                        }
1057
                case SortedSetKeyPrefix:
14✔
1058
                        {
28✔
1059
                                if spec.ZEntriesSpec == nil || spec.ZEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
22✔
1060
                                        break
8✔
1061
                                }
1062

1063
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
7✔
1064
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1065
                                        break
1✔
1066
                                }
1067

1068
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
6✔
1069
                                        v, err := d.st.ReadValue(e)
1✔
1070
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1071
                                                break
×
1072
                                        }
1073
                                        if err != nil {
1✔
1074
                                                return nil, err
×
1075
                                        }
×
1076

1077
                                        kve := schema.TxEntryToProto(e)
1✔
1078
                                        kve.Value = v
1✔
1079
                                        stx.Entries = append(stx.Entries, kve)
1✔
1080
                                        break
1✔
1081
                                }
1082

1083
                                // zKey = [1+setLenLen+set+scoreLen+keyLenLen+1+key+txIDLen]
1084
                                zKey := e.Key()
4✔
1085

4✔
1086
                                setLen := int(binary.BigEndian.Uint64(zKey[1:]))
4✔
1087
                                set := make([]byte, setLen)
4✔
1088
                                copy(set, zKey[1+setLenLen:])
4✔
1089

4✔
1090
                                scoreOff := 1 + setLenLen + setLen
4✔
1091
                                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
4✔
1092
                                score := math.Float64frombits(scoreB)
4✔
1093

4✔
1094
                                keyOff := scoreOff + scoreLen + keyLenLen
4✔
1095
                                key := make([]byte, len(zKey)-keyOff-txIDLen)
4✔
1096
                                copy(key, zKey[keyOff:])
4✔
1097

4✔
1098
                                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
4✔
1099

4✔
1100
                                var entry *schema.Entry
4✔
1101
                                var err error
4✔
1102

4✔
1103
                                if snap != nil {
7✔
1104
                                        entry, err = d.getAtTx(ctx, key, atTx, 1, snap, 0, skipIntegrityCheck)
3✔
1105
                                        if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
3✔
1106
                                                break // ignore deleted ones (referenced key may have been deleted)
×
1107
                                        }
1108
                                        if err != nil {
3✔
1109
                                                return nil, err
×
1110
                                        }
×
1111
                                }
1112

1113
                                zentry := &schema.ZEntry{
4✔
1114
                                        Set:   set,
4✔
1115
                                        Key:   key[1:],
4✔
1116
                                        Entry: entry,
4✔
1117
                                        Score: score,
4✔
1118
                                        AtTx:  atTx,
4✔
1119
                                }
4✔
1120

4✔
1121
                                stx.ZEntries = append(stx.ZEntries, zentry)
4✔
1122
                        }
1123
                case SQLPrefix:
5✔
1124
                        {
10✔
1125
                                if spec.SqlEntriesSpec == nil || spec.SqlEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
7✔
1126
                                        break
2✔
1127
                                }
1128

1129
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
4✔
1130
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1131
                                        break
1✔
1132
                                }
1133

1134
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
3✔
1135
                                        v, err := d.st.ReadValue(e)
1✔
1136
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1137
                                                break
×
1138
                                        }
1139
                                        if err != nil {
1✔
1140
                                                return nil, err
×
1141
                                        }
×
1142

1143
                                        kve := schema.TxEntryToProto(e)
1✔
1144
                                        kve.Value = v
1✔
1145
                                        stx.Entries = append(stx.Entries, kve)
1✔
1146
                                        break
1✔
1147
                                }
1148

1149
                                return nil, fmt.Errorf("%w: sql entry resolution is not supported", ErrIllegalArguments)
1✔
1150
                        }
1151
                }
1152
        }
1153

1154
        return stx, nil
509✔
1155
}
1156

1157
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
7,227✔
1158
        d.replicaStatesMutex.Lock()
7,227✔
1159
        defer d.replicaStatesMutex.Unlock()
7,227✔
1160

7,227✔
1161
        // clean up replicaStates
7,227✔
1162
        // it's safe to remove up to latest tx committed in primary
7,227✔
1163
        for uuid, st := range d.replicaStates {
9,038✔
1164
                if st.precommittedTxID <= committedTxID {
2,187✔
1165
                        delete(d.replicaStates, uuid)
376✔
1166
                }
376✔
1167
        }
1168

1169
        if newReplicaState.PrecommittedTxID <= committedTxID {
12,717✔
1170
                // as far as the primary is concerned, nothing really new has happened
5,490✔
1171
                return nil
5,490✔
1172
        }
5,490✔
1173

1174
        newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)
1,737✔
1175

1,737✔
1176
        replicaSt, ok := d.replicaStates[newReplicaState.UUID]
1,737✔
1177
        if ok {
3,093✔
1178
                if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
1,356✔
1179
                        return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
×
1180
                }
×
1181

1182
                if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
2,710✔
1183
                        // as of the last informed replica status update, nothing has changed
1,354✔
1184
                        return nil
1,354✔
1185
                }
1,354✔
1186

1187
                // actual replication progress is informed by the replica
1188
                replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
2✔
1189
                replicaSt.precommittedAlh = newReplicaAlh
2✔
1190
        } else {
381✔
1191
                // replica informs first replication state
381✔
1192
                d.replicaStates[newReplicaState.UUID] = &replicaState{
381✔
1193
                        precommittedTxID: newReplicaState.PrecommittedTxID,
381✔
1194
                        precommittedAlh:  newReplicaAlh,
381✔
1195
                }
381✔
1196
        }
381✔
1197

1198
        // check up to which tx enough replicas ack replication and it's safe to commit
1199
        mayCommitUpToTxID := uint64(0)
383✔
1200
        if len(d.replicaStates) > 0 {
766✔
1201
                mayCommitUpToTxID = math.MaxUint64
383✔
1202
        }
383✔
1203

1204
        allowances := 0
383✔
1205

383✔
1206
        // we may clean up replicaStates from those who are lagging behind commit
383✔
1207
        for _, st := range d.replicaStates {
840✔
1208
                if st.precommittedTxID < mayCommitUpToTxID {
840✔
1209
                        mayCommitUpToTxID = st.precommittedTxID
383✔
1210
                }
383✔
1211
                allowances++
457✔
1212
        }
1213

1214
        if allowances >= d.options.syncAcks {
735✔
1215
                err := d.st.AllowCommitUpto(mayCommitUpToTxID)
352✔
1216
                if err != nil {
352✔
1217
                        return err
×
1218
                }
×
1219
        }
1220

1221
        return nil
383✔
1222
}
1223

1224
func (d *db) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) {
8,879✔
1225
        if req == nil {
8,879✔
1226
                return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
×
1227
        }
×
1228

1229
        if d.replicaStates == nil && req.ReplicaState != nil {
8,879✔
1230
                return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
×
1231
        }
×
1232

1233
        tx, err := d.allocTx()
8,879✔
1234
        if err != nil {
8,879✔
1235
                return nil, 0, mayCommitUpToAlh, err
×
1236
        }
×
1237
        defer d.releaseTx(tx)
8,879✔
1238

8,879✔
1239
        committedTxID, committedAlh := d.st.CommittedAlh()
8,879✔
1240
        preCommittedTxID, _ := d.st.PrecommittedAlh()
8,879✔
1241

8,879✔
1242
        if req.ReplicaState != nil {
16,108✔
1243
                if req.ReplicaState.CommittedTxID > 0 {
13,896✔
1244
                        // validate replica commit state
6,667✔
1245
                        if req.ReplicaState.CommittedTxID > committedTxID {
6,667✔
1246
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1247
                        }
×
1248

1249
                        // integrityCheck is currently required to validate Alh
1250
                        expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false, false)
6,667✔
1251
                        if err != nil {
6,667✔
1252
                                return nil, committedTxID, committedAlh, err
×
1253
                        }
×
1254

1255
                        replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
6,667✔
1256

6,667✔
1257
                        if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
6,667✔
1258
                                return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(), fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1259
                        }
×
1260
                }
1261

1262
                if req.ReplicaState.PrecommittedTxID > 0 {
14,064✔
1263
                        // validate replica precommit state
6,835✔
1264
                        if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
6,837✔
1265
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
2✔
1266
                        }
2✔
1267

1268
                        // integrityCheck is currently required to validate Alh
1269
                        expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true, false)
6,833✔
1270
                        if err != nil {
6,833✔
1271
                                return nil, committedTxID, committedAlh, err
×
1272
                        }
×
1273

1274
                        replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
6,833✔
1275

6,833✔
1276
                        if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
6,833✔
1277
                                return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(), fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1278
                        }
×
1279

1280
                        // primary will provide commit state to the replica so it can commit pre-committed transactions
1281
                        if req.ReplicaState.PrecommittedTxID < committedTxID {
8,508✔
1282
                                // if replica is behind current commit state in primary
1,675✔
1283
                                // return the alh up to the point known by the replica.
1,675✔
1284
                                // That way the replica is able to validate is following the right primary.
1,675✔
1285
                                mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
1,675✔
1286
                                mayCommitUpToAlh = replicaPreCommittedAlh
1,675✔
1287
                        } else {
6,833✔
1288
                                mayCommitUpToTxID = committedTxID
5,158✔
1289
                                mayCommitUpToAlh = committedAlh
5,158✔
1290
                        }
5,158✔
1291
                }
1292

1293
                err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
7,227✔
1294
                if err != nil {
7,227✔
1295
                        return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1296
                }
×
1297
        }
1298

1299
        // it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
1300
        // current timeout it's not a special value but at least a relative one
1301
        // note: primary might also be waiting ack from any replica (even this primary may do progress)
1302

1303
        // TODO: under some circumstances, replica might not be able to do further progress until primary
1304
        // has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
1305
        var cancel context.CancelFunc
8,877✔
1306

8,877✔
1307
        if req.ReplicaState != nil {
16,104✔
1308
                ctx, cancel = context.WithTimeout(ctx, d.options.storeOpts.SyncFrequency*4)
7,227✔
1309
                defer cancel()
7,227✔
1310
        }
7,227✔
1311

1312
        err = d.WaitForTx(ctx, req.Tx, req.AllowPreCommitted)
8,877✔
1313
        if ctx.Err() != nil {
9,497✔
1314
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, nil
620✔
1315
        }
620✔
1316
        if err != nil {
8,257✔
1317
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1318
        }
×
1319

1320
        txbs, err = d.st.ExportTx(req.Tx, req.AllowPreCommitted, req.SkipIntegrityCheck, tx)
8,257✔
1321
        if err != nil {
8,257✔
1322
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1323
        }
×
1324

1325
        return txbs, mayCommitUpToTxID, mayCommitUpToAlh, nil
8,257✔
1326
}
1327

1328
func (d *db) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) {
8,186✔
1329
        d.mutex.RLock()
8,186✔
1330
        defer d.mutex.RUnlock()
8,186✔
1331

8,186✔
1332
        if !d.isReplica() {
8,186✔
1333
                return nil, ErrNotReplica
×
1334
        }
×
1335

1336
        hdr, err := d.st.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing)
8,186✔
1337
        if err != nil {
8,215✔
1338
                return nil, err
29✔
1339
        }
29✔
1340

1341
        return schema.TxHeaderToProto(hdr), nil
8,157✔
1342
}
1343

1344
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
1345
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
6,548✔
1346
        d.mutex.RLock()
6,548✔
1347
        defer d.mutex.RUnlock()
6,548✔
1348

6,548✔
1349
        if !d.isReplica() {
6,548✔
1350
                return ErrNotReplica
×
1351
        }
×
1352

1353
        // replica pre-committed state must be consistent with primary
1354

1355
        committedTxID, committedAlh := d.st.CommittedAlh()
6,548✔
1356
        // handling a particular case in an optimized manner
6,548✔
1357
        if committedTxID == txID {
6,915✔
1358
                if committedAlh != alh {
367✔
1359
                        return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1360
                }
×
1361
                return nil
367✔
1362
        }
1363

1364
        hdr, err := d.st.ReadTxHeader(txID, true, false)
6,181✔
1365
        if err != nil {
6,181✔
1366
                return err
×
1367
        }
×
1368

1369
        if hdr.Alh() != alh {
6,181✔
1370
                return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1371
        }
×
1372

1373
        return d.st.AllowCommitUpto(txID)
6,181✔
1374
}
1375

1376
func (d *db) DiscardPrecommittedTxsSince(txID uint64) error {
2✔
1377
        d.mutex.RLock()
2✔
1378
        defer d.mutex.RUnlock()
2✔
1379

2✔
1380
        _, err := d.st.DiscardPrecommittedTxsSince(txID)
2✔
1381

2✔
1382
        return err
2✔
1383
}
2✔
1384

1385
// VerifiableTxByID ...
1386
func (d *db) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) {
1,297✔
1387
        if req == nil {
1,298✔
1388
                return nil, ErrIllegalArguments
1✔
1389
        }
1✔
1390

1391
        lastTxID, _ := d.st.CommittedAlh()
1,296✔
1392
        if lastTxID < req.ProveSinceTx {
1,296✔
1393
                return nil, fmt.Errorf("%w: latest txID=%d is lower than specified as initial tx=%d", ErrIllegalState, lastTxID, req.ProveSinceTx)
×
1394
        }
×
1395

1396
        var snap *store.Snapshot
1,296✔
1397
        var err error
1,296✔
1398

1,296✔
1399
        if !req.KeepReferencesUnresolved {
2,592✔
1400
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
1,296✔
1401
                if err != nil {
1,296✔
1402
                        return nil, err
×
1403
                }
×
1404
                defer snap.Close()
1,296✔
1405
        }
1406

1407
        reqTx, err := d.allocTx()
1,296✔
1408
        if err != nil {
1,296✔
1409
                return nil, err
×
1410
        }
×
1411
        defer d.releaseTx(reqTx)
1,296✔
1412

1,296✔
1413
        err = d.st.ReadTx(req.Tx, false, reqTx)
1,296✔
1414
        if err != nil {
1,296✔
1415
                return nil, err
×
1416
        }
×
1417

1418
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,296✔
1419
        var rootTxHdr *store.TxHeader
1,296✔
1420

1,296✔
1421
        if req.ProveSinceTx == 0 {
1,300✔
1422
                rootTxHdr = reqTx.Header()
4✔
1423
        } else {
1,296✔
1424
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
1,292✔
1425
                if err != nil {
1,292✔
1426
                        return nil, err
×
1427
                }
×
1428
        }
1429

1430
        if req.ProveSinceTx <= req.Tx {
2,591✔
1431
                sourceTxHdr = rootTxHdr
1,295✔
1432
                targetTxHdr = reqTx.Header()
1,295✔
1433
        } else {
1,296✔
1434
                sourceTxHdr = reqTx.Header()
1✔
1435
                targetTxHdr = rootTxHdr
1✔
1436
        }
1✔
1437

1438
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,296✔
1439
        if err != nil {
1,296✔
1440
                return nil, err
×
1441
        }
×
1442

1443
        sReqTx, err := d.serializeTx(ctx, reqTx, req.EntriesSpec, snap, true)
1,296✔
1444
        if err != nil {
1,296✔
1445
                return nil, err
×
1446
        }
×
1447

1448
        return &schema.VerifiableTx{
1,296✔
1449
                Tx:        sReqTx,
1,296✔
1450
                DualProof: schema.DualProofToProto(dualProof),
1,296✔
1451
        }, nil
1,296✔
1452
}
1453

1454
// TxScan ...
1455
func (d *db) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) {
10✔
1456
        if req == nil {
11✔
1457
                return nil, ErrIllegalArguments
1✔
1458
        }
1✔
1459

1460
        if int(req.Limit) > d.maxResultSize {
10✔
1461
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1462
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1463
        }
1✔
1464

1465
        tx, err := d.allocTx()
8✔
1466
        if err != nil {
8✔
1467
                return nil, err
×
1468
        }
×
1469
        defer d.releaseTx(tx)
8✔
1470

8✔
1471
        limit := int(req.Limit)
8✔
1472

8✔
1473
        if req.Limit == 0 {
13✔
1474
                limit = d.maxResultSize
5✔
1475
        }
5✔
1476

1477
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
8✔
1478
        if err != nil {
8✔
1479
                return nil, err
×
1480
        }
×
1481
        defer snap.Close()
8✔
1482

8✔
1483
        txReader, err := d.st.NewTxReader(req.InitialTx, req.Desc, tx)
8✔
1484
        if err != nil {
9✔
1485
                return nil, err
1✔
1486
        }
1✔
1487

1488
        txList := &schema.TxList{}
7✔
1489

7✔
1490
        for l := 1; l <= limit; l++ {
27✔
1491
                tx, err := txReader.Read()
20✔
1492
                if errors.Is(err, store.ErrNoMoreEntries) {
22✔
1493
                        break
2✔
1494
                }
1495
                if err != nil {
18✔
1496
                        return nil, err
×
1497
                }
×
1498

1499
                sTx, err := d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
18✔
1500
                if err != nil {
18✔
1501
                        return nil, err
×
1502
                }
×
1503

1504
                txList.Txs = append(txList.Txs, sTx)
18✔
1505

18✔
1506
                if l == d.maxResultSize {
20✔
1507
                        return txList,
2✔
1508
                                fmt.Errorf("%w: found at least %d entries (maximum limit). "+
2✔
1509
                                        "Pagination over large results can be achieved by using the limit and initialTx arguments",
2✔
1510
                                        ErrResultSizeLimitReached, d.maxResultSize)
2✔
1511
                }
2✔
1512
        }
1513

1514
        return txList, nil
5✔
1515
}
1516

1517
// History ...
1518
func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) {
9✔
1519
        if req == nil {
10✔
1520
                return nil, ErrIllegalArguments
1✔
1521
        }
1✔
1522

1523
        if int(req.Limit) > d.maxResultSize {
9✔
1524
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1525
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1526
        }
1✔
1527

1528
        currTxID, _ := d.st.CommittedAlh()
7✔
1529

7✔
1530
        if req.SinceTx > currTxID {
7✔
1531
                return nil, ErrIllegalArguments
×
1532
        }
×
1533

1534
        waitUntilTx := req.SinceTx
7✔
1535
        if waitUntilTx == 0 {
9✔
1536
                waitUntilTx = currTxID
2✔
1537
        }
2✔
1538

1539
        err := d.WaitForIndexingUpto(ctx, waitUntilTx)
7✔
1540
        if err != nil {
7✔
1541
                return nil, err
×
1542
        }
×
1543

1544
        limit := int(req.Limit)
7✔
1545

7✔
1546
        if req.Limit == 0 {
14✔
1547
                limit = d.maxResultSize
7✔
1548
        }
7✔
1549

1550
        key := EncodeKey(req.Key)
7✔
1551

7✔
1552
        valRefs, hCount, err := d.st.History(key, req.Offset, req.Desc, limit)
7✔
1553
        if err != nil && err != store.ErrOffsetOutOfRange {
8✔
1554
                return nil, err
1✔
1555
        }
1✔
1556

1557
        list := &schema.Entries{
6✔
1558
                Entries: make([]*schema.Entry, len(valRefs)),
6✔
1559
        }
6✔
1560

6✔
1561
        for i, valRef := range valRefs {
114✔
1562
                val, err := valRef.Resolve()
108✔
1563
                if err != nil && err != store.ErrExpiredEntry {
108✔
1564
                        return nil, err
×
1565
                }
×
1566
                if len(val) > 0 {
214✔
1567
                        val = TrimPrefix(val)
106✔
1568
                }
106✔
1569

1570
                list.Entries[i] = &schema.Entry{
108✔
1571
                        Tx:       valRef.Tx(),
108✔
1572
                        Key:      req.Key,
108✔
1573
                        Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
108✔
1574
                        Value:    val,
108✔
1575
                        Expired:  errors.Is(err, store.ErrExpiredEntry),
108✔
1576
                        Revision: valRef.HC(),
108✔
1577
                }
108✔
1578
        }
1579

1580
        if limit == d.maxResultSize && hCount >= uint64(d.maxResultSize) {
8✔
1581
                return list,
2✔
1582
                        fmt.Errorf("%w: found at least %d entries (the maximum limit). "+
2✔
1583
                                "Pagination over large results can be achieved by using the limit and initialTx arguments",
2✔
1584
                                ErrResultSizeLimitReached, d.maxResultSize)
2✔
1585
        }
2✔
1586

1587
        return list, nil
4✔
1588
}
1589

1590
func (d *db) IsClosed() bool {
201✔
1591
        d.closingMutex.Lock()
201✔
1592
        defer d.closingMutex.Unlock()
201✔
1593

201✔
1594
        return d.st.IsClosed()
201✔
1595
}
201✔
1596

1597
// Close ...
1598
func (d *db) Close() (err error) {
700✔
1599
        d.closingMutex.Lock()
700✔
1600
        defer d.closingMutex.Unlock()
700✔
1601

700✔
1602
        d.Logger.Infof("closing database '%s'...", d.name)
700✔
1603

700✔
1604
        defer func() {
1,400✔
1605
                if err == nil {
1,312✔
1606
                        d.Logger.Infof("database '%s' succesfully closed", d.name)
612✔
1607
                } else {
700✔
1608
                        d.Logger.Infof("%v: while closing database '%s'", err, d.name)
88✔
1609
                }
88✔
1610
        }()
1611

1612
        return d.st.Close()
700✔
1613
}
1614

1615
// GetName ...
1616
func (d *db) GetName() string {
56,426✔
1617
        return d.name
56,426✔
1618
}
56,426✔
1619

1620
// GetOptions ...
1621
func (d *db) GetOptions() *Options {
3,082✔
1622
        d.mutex.RLock()
3,082✔
1623
        defer d.mutex.RUnlock()
3,082✔
1624

3,082✔
1625
        return d.options
3,082✔
1626
}
3,082✔
1627

1628
func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
15✔
1629
        d.mutex.Lock()
15✔
1630
        defer d.mutex.Unlock()
15✔
1631

15✔
1632
        d.replicaStatesMutex.Lock()
15✔
1633
        defer d.replicaStatesMutex.Unlock()
15✔
1634

15✔
1635
        d.options.replica = asReplica
15✔
1636
        d.options.syncAcks = syncAcks
15✔
1637
        d.options.syncReplication = syncReplication
15✔
1638

15✔
1639
        if asReplica {
23✔
1640
                d.replicaStates = nil
8✔
1641
        } else if syncAcks > 0 {
18✔
1642
                d.replicaStates = make(map[string]*replicaState, syncAcks)
3✔
1643
        }
3✔
1644

1645
        d.st.SetExternalCommitAllowance(syncReplication)
15✔
1646
}
1647

1648
func (d *db) IsReplica() bool {
430✔
1649
        d.mutex.RLock()
430✔
1650
        defer d.mutex.RUnlock()
430✔
1651

430✔
1652
        return d.isReplica()
430✔
1653
}
430✔
1654

1655
func (d *db) isReplica() bool {
20,662✔
1656
        return d.options.replica
20,662✔
1657
}
20,662✔
1658

1659
func (d *db) IsSyncReplicationEnabled() bool {
8,877✔
1660
        d.mutex.RLock()
8,877✔
1661
        defer d.mutex.RUnlock()
8,877✔
1662

8,877✔
1663
        return d.options.syncReplication
8,877✔
1664
}
8,877✔
1665

1666
func (d *db) SetSyncReplication(enabled bool) {
208✔
1667
        d.mutex.Lock()
208✔
1668
        defer d.mutex.Unlock()
208✔
1669

208✔
1670
        d.st.SetExternalCommitAllowance(enabled)
208✔
1671

208✔
1672
        d.options.syncReplication = enabled
208✔
1673
}
208✔
1674

1675
func logErr(log logger.Logger, formattedMessage string, err error) error {
1✔
1676
        if err != nil {
2✔
1677
                log.Errorf(formattedMessage, err)
1✔
1678
        }
1✔
1679
        return err
1✔
1680
}
1681

1682
// CopyCatalog creates a copy of the sql catalog and returns a transaction
1683
// that can be used to commit the copy.
1684
func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
13✔
1685
        // copy the sql catalog
13✔
1686
        err := d.sqlEngine.CopyCatalogToTx(ctx, tx)
13✔
1687
        if err != nil {
13✔
1688
                return err
×
1689
        }
×
1690

1691
        // copy the document store catalog
1692
        err = d.documentEngine.CopyCatalogToTx(ctx, tx)
13✔
1693
        if err != nil {
13✔
1694
                return err
×
1695
        }
×
1696

1697
        return nil
13✔
1698
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc