• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9367064922

04 Jun 2024 12:27PM UTC coverage: 89.43% (-0.02%) from 89.451%
9367064922

push

gh-ci

ostafen
Add support for JSON type

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

521 of 575 new or added lines in 14 files covered. (90.61%)

12 existing lines in 5 files now uncovered.

35172 of 39329 relevant lines covered (89.43%)

160547.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.75
/pkg/database/database.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26
        "os"
27
        "path/filepath"
28
        "sync"
29
        "time"
30

31
        "github.com/codenotary/immudb/embedded/document"
32
        "github.com/codenotary/immudb/embedded/sql"
33
        "github.com/codenotary/immudb/embedded/store"
34

35
        "github.com/codenotary/immudb/embedded/logger"
36
        "github.com/codenotary/immudb/pkg/api/schema"
37
)
38

39
const (
40
        MaxKeyResolutionLimit = 1
41
        MaxKeyScanLimit       = 2500
42
)
43

44
var (
45
        ErrKeyResolutionLimitReached  = errors.New("key resolution limit reached. It may be due to cyclic references")
46
        ErrResultSizeLimitExceeded    = errors.New("result size limit exceeded")
47
        ErrResultSizeLimitReached     = errors.New("result size limit reached")
48
        ErrIllegalArguments           = store.ErrIllegalArguments
49
        ErrIllegalState               = store.ErrIllegalState
50
        ErrIsReplica                  = errors.New("database is read-only because it's a replica")
51
        ErrNotReplica                 = errors.New("database is NOT a replica")
52
        ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
53
        ErrInvalidRevision            = errors.New("invalid key revision number")
54
)
55

56
type DB interface {
57
        GetName() string
58

59
        // Setttings
60
        GetOptions() *Options
61

62
        Path() string
63

64
        AsReplica(asReplica, syncReplication bool, syncAcks int)
65
        IsReplica() bool
66

67
        IsSyncReplicationEnabled() bool
68
        SetSyncReplication(enabled bool)
69

70
        MaxResultSize() int
71

72
        // State
73
        Health() (waitingCount int, lastReleaseAt time.Time)
74
        CurrentState() (*schema.ImmutableState, error)
75

76
        Size() (uint64, error)
77

78
        TxCount() (uint64, error)
79

80
        // Key-Value
81
        Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error)
82
        VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error)
83

84
        Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error)
85
        VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error)
86
        GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error)
87

88
        Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error)
89

90
        SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error)
91
        VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error)
92

93
        Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error)
94

95
        History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error)
96

97
        ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error)
98

99
        Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error)
100
        CountAll(ctx context.Context) (*schema.EntryCount, error)
101

102
        ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error)
103
        VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error)
104
        ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error)
105

106
        // SQL-related
107
        NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error)
108

109
        SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
110
        SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
111

112
        InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error)
113
        InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error)
114

115
        SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (sql.RowReader, error)
116
        SQLQueryAll(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) ([]*sql.Row, error)
117
        SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error)
118

119
        VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error)
120

121
        ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error)
122
        DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error)
123

124
        // Transactional layer
125
        WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error
126
        WaitForIndexingUpto(ctx context.Context, txID uint64) error
127

128
        TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error)
129
        ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error)
130
        ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error)
131
        AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error
132
        DiscardPrecommittedTxsSince(txID uint64) error
133

134
        VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error)
135
        TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error)
136

137
        // Maintenance
138
        FlushIndex(req *schema.FlushIndexRequest) error
139
        CompactIndex() error
140

141
        IsClosed() bool
142
        Close() error
143

144
        DocumentDatabase
145
}
146

147
type replicaState struct {
148
        precommittedTxID uint64
149
        precommittedAlh  [sha256.Size]byte
150
}
151

152
// IDB database instance
153
type db struct {
154
        st *store.ImmuStore
155

156
        sqlEngine      *sql.Engine
157
        documentEngine *document.Engine
158

159
        mutex        *instrumentedRWMutex
160
        closingMutex sync.Mutex
161

162
        Logger  logger.Logger
163
        options *Options
164

165
        name string
166

167
        maxResultSize int
168

169
        txPool store.TxPool
170

171
        replicaStates      map[string]*replicaState
172
        replicaStatesMutex sync.Mutex
173
}
174

175
// OpenDB Opens an existing Database from disk
176
func OpenDB(dbName string, multidbHandler sql.MultiDBHandler, opts *Options, log logger.Logger) (DB, error) {
37✔
177
        if dbName == "" {
38✔
178
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
1✔
179
        }
1✔
180

181
        log.Infof("opening database '%s' {replica = %v}...", dbName, opts.replica)
36✔
182

36✔
183
        var replicaStates map[string]*replicaState
36✔
184
        // replica states are only managed in primary with synchronous replication
36✔
185
        if !opts.replica && opts.syncAcks > 0 {
37✔
186
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
1✔
187
        }
1✔
188

189
        dbi := &db{
36✔
190
                Logger:        log,
36✔
191
                options:       opts,
36✔
192
                name:          dbName,
36✔
193
                replicaStates: replicaStates,
36✔
194
                maxResultSize: opts.maxResultSize,
36✔
195
                mutex:         &instrumentedRWMutex{},
36✔
196
        }
36✔
197

36✔
198
        dbDir := dbi.Path()
36✔
199
        _, err := os.Stat(dbDir)
36✔
200
        if os.IsNotExist(err) {
37✔
201
                return nil, fmt.Errorf("missing database directories: %s", dbDir)
1✔
202
        }
1✔
203

204
        stOpts := opts.GetStoreOptions().
35✔
205
                WithLogger(log).
35✔
206
                WithMultiIndexing(true).
35✔
207
                WithExternalCommitAllowance(opts.syncReplication)
35✔
208

35✔
209
        dbi.st, err = store.Open(dbDir, stOpts)
35✔
210
        if err != nil {
35✔
211
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
212
        }
×
213

214
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
105✔
215
                err := dbi.st.InitIndexing(&store.IndexSpec{
70✔
216
                        SourcePrefix: []byte{prefix},
70✔
217
                        TargetPrefix: []byte{prefix},
70✔
218
                })
70✔
219
                if err != nil {
70✔
220
                        dbi.st.Close()
×
221
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
222
                }
×
223
        }
224

225
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
35✔
226

35✔
227
        sqlOpts := sql.DefaultOptions().
35✔
228
                WithPrefix([]byte{SQLPrefix}).
35✔
229
                WithMultiDBHandler(multidbHandler).
35✔
230
                WithParseTxMetadataFunc(parseTxMetadata)
35✔
231

35✔
232
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
35✔
233
        if err != nil {
35✔
234
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
235
                return nil, err
×
236
        }
×
237
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
35✔
238

35✔
239
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
35✔
240
        if err != nil {
35✔
241
                return nil, err
×
242
        }
×
243
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
35✔
244

35✔
245
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
35✔
246
        if err != nil {
35✔
247
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
248
        }
×
249
        dbi.txPool = txPool
35✔
250

35✔
251
        if opts.replica {
39✔
252
                dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
4✔
253
                return dbi, nil
4✔
254
        }
4✔
255

256
        dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
31✔
257

31✔
258
        return dbi, nil
31✔
259
}
260

261
func parseTxMetadata(data []byte) (map[string]interface{}, error) {
1✔
262
        md := schema.Metadata{}
1✔
263
        if err := md.Unmarshal(data); err != nil {
1✔
NEW
264
                return nil, err
×
NEW
265
        }
×
266

267
        meta := make(map[string]interface{}, len(md))
1✔
268
        for k, v := range md {
3✔
269
                meta[k] = v
2✔
270
        }
2✔
271
        return meta, nil
1✔
272
}
273

274
func (d *db) Path() string {
38✔
275
        return filepath.Join(d.options.GetDBRootPath(), d.GetName())
38✔
276
}
38✔
277

278
func (d *db) allocTx() (*store.Tx, error) {
10,922✔
279
        tx, err := d.txPool.Alloc()
10,922✔
280
        if errors.Is(err, store.ErrTxPoolExhausted) {
10,922✔
281
                return nil, ErrTxReadPoolExhausted
×
282
        }
×
283
        return tx, err
10,922✔
284
}
285

286
func (d *db) releaseTx(tx *store.Tx) {
10,922✔
287
        d.txPool.Release(tx)
10,922✔
288
}
10,922✔
289

290
// NewDB Creates a new Database along with it's directories and files
291
func NewDB(dbName string, multidbHandler sql.MultiDBHandler, opts *Options, log logger.Logger) (DB, error) {
662✔
292
        if dbName == "" {
662✔
293
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
×
294
        }
×
295

296
        log.Infof("creating database '%s' {replica = %v}...", dbName, opts.replica)
662✔
297

662✔
298
        var replicaStates map[string]*replicaState
662✔
299
        // replica states are only managed in primary with synchronous replication
662✔
300
        if !opts.replica && opts.syncAcks > 0 {
673✔
301
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
11✔
302
        }
11✔
303

304
        dbi := &db{
662✔
305
                Logger:        log,
662✔
306
                options:       opts,
662✔
307
                name:          dbName,
662✔
308
                replicaStates: replicaStates,
662✔
309
                maxResultSize: opts.maxResultSize,
662✔
310
                mutex:         &instrumentedRWMutex{},
662✔
311
        }
662✔
312

662✔
313
        dbDir := filepath.Join(opts.GetDBRootPath(), dbName)
662✔
314

662✔
315
        _, err := os.Stat(dbDir)
662✔
316
        if err == nil {
663✔
317
                return nil, fmt.Errorf("database directories already exist: %s", dbDir)
1✔
318
        }
1✔
319

320
        if err = os.MkdirAll(dbDir, os.ModePerm); err != nil {
662✔
321
                return nil, logErr(dbi.Logger, "unable to create data folder: %s", err)
1✔
322
        }
1✔
323

324
        stOpts := opts.GetStoreOptions().
660✔
325
                WithExternalCommitAllowance(opts.syncReplication).
660✔
326
                WithMultiIndexing(true).
660✔
327
                WithLogger(log)
660✔
328

660✔
329
        dbi.st, err = store.Open(dbDir, stOpts)
660✔
330
        if err != nil {
660✔
331
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
332
        }
×
333

334
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,980✔
335
                err := dbi.st.InitIndexing(&store.IndexSpec{
1,320✔
336
                        SourcePrefix: []byte{prefix},
1,320✔
337
                        TargetPrefix: []byte{prefix},
1,320✔
338
                })
1,320✔
339
                if err != nil {
1,320✔
340
                        dbi.st.Close()
×
341
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
342
                }
×
343
        }
344

345
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
660✔
346
        if err != nil {
660✔
347
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
348
        }
×
349
        dbi.txPool = txPool
660✔
350

660✔
351
        sqlOpts := sql.DefaultOptions().
660✔
352
                WithPrefix([]byte{SQLPrefix}).
660✔
353
                WithMultiDBHandler(multidbHandler).
660✔
354
                WithParseTxMetadataFunc(parseTxMetadata)
660✔
355

660✔
356
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
660✔
357

660✔
358
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
660✔
359
        if err != nil {
660✔
360
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
361
                return nil, err
×
362
        }
×
363
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
660✔
364

660✔
365
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
660✔
366
        if err != nil {
660✔
367
                return nil, logErr(dbi.Logger, "Unable to open database: %s", err)
×
368
        }
×
369
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
660✔
370

660✔
371
        dbi.Logger.Infof("database '%s' successfully created {replica = %v}", dbName, opts.replica)
660✔
372

660✔
373
        return dbi, nil
660✔
374
}
375

376
func (d *db) MaxResultSize() int {
79✔
377
        return d.maxResultSize
79✔
378
}
79✔
379

380
func (d *db) FlushIndex(req *schema.FlushIndexRequest) error {
6✔
381
        if req == nil {
7✔
382
                return store.ErrIllegalArguments
1✔
383
        }
1✔
384

385
        return d.st.FlushIndexes(req.CleanupPercentage, req.Synced)
5✔
386
}
387

388
// CompactIndex ...
389
func (d *db) CompactIndex() error {
4✔
390
        return d.st.CompactIndexes()
4✔
391
}
4✔
392

393
// Set ...
394
func (d *db) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,455✔
395
        d.mutex.RLock()
4,455✔
396
        defer d.mutex.RUnlock()
4,455✔
397

4,455✔
398
        if d.isReplica() {
4,458✔
399
                return nil, ErrIsReplica
3✔
400
        }
3✔
401

402
        return d.set(ctx, req)
4,452✔
403
}
404

405
func (d *db) set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,452✔
406
        if req == nil {
4,453✔
407
                return nil, ErrIllegalArguments
1✔
408
        }
1✔
409

410
        tx, err := d.newWriteOnlyTx(ctx)
4,451✔
411
        if err != nil {
4,451✔
412
                return nil, err
×
413
        }
×
414
        defer tx.Cancel()
4,451✔
415

4,451✔
416
        keys := make(map[[sha256.Size]byte]struct{}, len(req.KVs))
4,451✔
417

4,451✔
418
        for _, kv := range req.KVs {
10,197✔
419
                if len(kv.Key) == 0 {
5,748✔
420
                        return nil, ErrIllegalArguments
2✔
421
                }
2✔
422

423
                kid := sha256.Sum256(kv.Key)
5,744✔
424
                _, ok := keys[kid]
5,744✔
425
                if ok {
5,745✔
426
                        return nil, schema.ErrDuplicatedKeysNotSupported
1✔
427
                }
1✔
428
                keys[kid] = struct{}{}
5,743✔
429

5,743✔
430
                e := EncodeEntrySpec(
5,743✔
431
                        kv.Key,
5,743✔
432
                        schema.KVMetadataFromProto(kv.Metadata),
5,743✔
433
                        kv.Value,
5,743✔
434
                )
5,743✔
435

5,743✔
436
                err = tx.Set(e.Key, e.Metadata, e.Value)
5,743✔
437
                if err != nil {
5,744✔
438
                        return nil, err
1✔
439
                }
1✔
440
        }
441

442
        for i := range req.Preconditions {
5,518✔
443
                c, err := PreconditionFromProto(req.Preconditions[i])
1,071✔
444
                if err != nil {
1,072✔
445
                        return nil, err
1✔
446
                }
1✔
447

448
                err = tx.AddPrecondition(c)
1,070✔
449
                if err != nil {
1,071✔
450
                        return nil, fmt.Errorf("%w: %v", store.ErrInvalidPrecondition, err)
1✔
451
                }
1✔
452
        }
453

454
        var hdr *store.TxHeader
4,445✔
455

4,445✔
456
        if req.NoWait {
4,445✔
457
                hdr, err = tx.AsyncCommit(ctx)
×
458
        } else {
4,445✔
459
                hdr, err = tx.Commit(ctx)
4,445✔
460
        }
4,445✔
461
        if err != nil {
4,476✔
462
                return nil, err
31✔
463
        }
31✔
464

465
        return schema.TxHeaderToProto(hdr), nil
4,414✔
466
}
467

468
func (d *db) newWriteOnlyTx(ctx context.Context) (*store.OngoingTx, error) {
4,451✔
469
        tx, err := d.st.NewWriteOnlyTx(ctx)
4,451✔
470
        if err != nil {
4,451✔
471
                return nil, err
×
472
        }
×
473
        return d.txWithMetadata(ctx, tx)
4,451✔
474
}
475

476
func (d *db) newTx(ctx context.Context, opts *store.TxOptions) (*store.OngoingTx, error) {
7✔
477
        tx, err := d.st.NewTx(ctx, opts)
7✔
478
        if err != nil {
7✔
479
                return nil, err
×
480
        }
×
481
        return d.txWithMetadata(ctx, tx)
7✔
482
}
483

484
func (d *db) txWithMetadata(ctx context.Context, tx *store.OngoingTx) (*store.OngoingTx, error) {
4,458✔
485
        meta := schema.MetadataFromContext(ctx)
4,458✔
486
        if len(meta) > 0 {
4,518✔
487
                txmd := store.NewTxMetadata()
60✔
488

60✔
489
                data, err := meta.Marshal()
60✔
490
                if err != nil {
60✔
491
                        return nil, err
×
492
                }
×
493

494
                if err := txmd.WithExtra(data); err != nil {
60✔
495
                        return nil, err
×
496
                }
×
497
                return tx.WithMetadata(txmd), nil
60✔
498
        }
499
        return tx, nil
4,398✔
500
}
501

502
func checkKeyRequest(req *schema.KeyRequest) error {
3,103✔
503
        if req == nil {
3,104✔
504
                return fmt.Errorf(
1✔
505
                        "%w: empty request",
1✔
506
                        ErrIllegalArguments,
1✔
507
                )
1✔
508
        }
1✔
509

510
        if len(req.Key) == 0 {
3,104✔
511
                return fmt.Errorf(
2✔
512
                        "%w: empty key",
2✔
513
                        ErrIllegalArguments,
2✔
514
                )
2✔
515
        }
2✔
516

517
        if req.AtTx > 0 {
3,125✔
518
                if req.SinceTx > 0 {
27✔
519
                        return fmt.Errorf(
2✔
520
                                "%w: SinceTx should not be specified when AtTx is used",
2✔
521
                                ErrIllegalArguments,
2✔
522
                        )
2✔
523
                }
2✔
524

525
                if req.AtRevision != 0 {
24✔
526
                        return fmt.Errorf(
1✔
527
                                "%w: AtRevision should not be specified when AtTx is used",
1✔
528
                                ErrIllegalArguments,
1✔
529
                        )
1✔
530
                }
1✔
531
        }
532

533
        return nil
3,097✔
534
}
535

536
// Get ...
537
func (d *db) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) {
3,099✔
538
        err := checkKeyRequest(req)
3,099✔
539
        if err != nil {
3,101✔
540
                return nil, err
2✔
541
        }
2✔
542

543
        currTxID, _ := d.st.CommittedAlh()
3,097✔
544
        if req.SinceTx > currTxID {
3,098✔
545
                return nil, fmt.Errorf(
1✔
546
                        "%w: SinceTx must not be greater than the current transaction ID",
1✔
547
                        ErrIllegalArguments,
1✔
548
                )
1✔
549
        }
1✔
550

551
        if !req.NoWait && req.AtTx == 0 {
6,170✔
552
                waitUntilTx := req.SinceTx
3,074✔
553
                if waitUntilTx == 0 {
5,051✔
554
                        waitUntilTx = currTxID
1,977✔
555
                }
1,977✔
556

557
                err := d.WaitForIndexingUpto(ctx, waitUntilTx)
3,074✔
558
                if err != nil {
3,076✔
559
                        return nil, err
2✔
560
                }
2✔
561
        }
562

563
        if req.AtRevision != 0 {
3,125✔
564
                return d.getAtRevision(ctx, EncodeKey(req.Key), req.AtRevision, true)
31✔
565
        }
31✔
566

567
        return d.getAtTx(ctx, EncodeKey(req.Key), req.AtTx, 0, d.st, 0, true)
3,063✔
568
}
569

570
func (d *db) get(ctx context.Context, key []byte, index store.KeyIndex, skipIntegrityCheck bool) (*schema.Entry, error) {
10✔
571
        return d.getAtTx(ctx, key, 0, 0, index, 0, skipIntegrityCheck)
10✔
572
}
10✔
573

574
func (d *db) getAtTx(
575
        ctx context.Context,
576
        key []byte,
577
        atTx uint64,
578
        resolved int,
579
        index store.KeyIndex,
580
        revision uint64,
581
        skipIntegrityCheck bool,
582
) (entry *schema.Entry, err error) {
5,638✔
583

5,638✔
584
        var txID uint64
5,638✔
585
        var val []byte
5,638✔
586
        var md *store.KVMetadata
5,638✔
587

5,638✔
588
        if atTx == 0 {
9,357✔
589
                valRef, err := index.Get(ctx, key)
3,719✔
590
                if err != nil {
3,800✔
591
                        return nil, err
81✔
592
                }
81✔
593

594
                txID = valRef.Tx()
3,638✔
595
                revision = valRef.HC()
3,638✔
596
                md = valRef.KVMetadata()
3,638✔
597

3,638✔
598
                val, err = valRef.Resolve()
3,638✔
599
                if err != nil {
3,642✔
600
                        return nil, err
4✔
601
                }
4✔
602
        } else {
1,919✔
603
                txID = atTx
1,919✔
604

1,919✔
605
                md, val, err = d.readMetadataAndValue(key, atTx, skipIntegrityCheck)
1,919✔
606
                if err != nil {
2,014✔
607
                        return nil, err
95✔
608
                }
95✔
609
        }
610

611
        return d.resolveValue(ctx, key, val, resolved, txID, md, index, revision, skipIntegrityCheck)
5,458✔
612
}
613

614
func (d *db) readMetadataAndValue(key []byte, atTx uint64, skipIntegrityCheck bool) (*store.KVMetadata, []byte, error) {
1,919✔
615
        entry, _, err := d.st.ReadTxEntry(atTx, key, skipIntegrityCheck)
1,919✔
616
        if err != nil {
2,005✔
617
                return nil, nil, err
86✔
618
        }
86✔
619

620
        v, err := d.st.ReadValue(entry)
1,833✔
621
        if err != nil {
1,842✔
622
                return nil, nil, err
9✔
623
        }
9✔
624

625
        return entry.Metadata(), v, nil
1,824✔
626
}
627

628
func (d *db) getAtRevision(ctx context.Context, key []byte, atRevision int64, skipIntegrityCheck bool) (entry *schema.Entry, err error) {
31✔
629
        var offset uint64
31✔
630
        var desc bool
31✔
631

31✔
632
        if atRevision > 0 {
47✔
633
                offset = uint64(atRevision) - 1
16✔
634
                desc = false
16✔
635
        } else {
31✔
636
                offset = -uint64(atRevision)
15✔
637
                desc = true
15✔
638
        }
15✔
639

640
        valRefs, hCount, err := d.st.History(key, offset, desc, 1)
31✔
641
        if errors.Is(err, store.ErrNoMoreEntries) || errors.Is(err, store.ErrOffsetOutOfRange) {
35✔
642
                return nil, ErrInvalidRevision
4✔
643
        }
4✔
644
        if err != nil {
28✔
645
                return nil, err
1✔
646
        }
1✔
647

648
        if atRevision < 0 {
39✔
649
                atRevision = int64(hCount) + atRevision
13✔
650
        }
13✔
651

652
        entry, err = d.getAtTx(ctx, key, valRefs[0].Tx(), 0, d.st, uint64(atRevision), skipIntegrityCheck)
26✔
653
        if err != nil {
27✔
654
                return nil, err
1✔
655
        }
1✔
656

657
        return entry, err
25✔
658
}
659

660
func (d *db) resolveValue(
661
        ctx context.Context,
662
        key []byte,
663
        val []byte,
664
        resolved int,
665
        txID uint64,
666
        md *store.KVMetadata,
667
        index store.KeyIndex,
668
        revision uint64,
669
        skipIntegrityCheck bool,
670
) (entry *schema.Entry, err error) {
5,467✔
671
        if md != nil && md.Deleted() {
5,469✔
672
                return nil, store.ErrKeyNotFound
2✔
673
        }
2✔
674

675
        if len(val) < 1 {
5,465✔
676
                return nil, fmt.Errorf("%w: internal value consistency error - missing value prefix", store.ErrCorruptedData)
×
677
        }
×
678

679
        // Reference lookup
680
        if val[0] == ReferenceValuePrefix {
5,677✔
681
                if len(val) < 1+8 {
212✔
682
                        return nil, fmt.Errorf("%w: internal value consistency error - invalid reference", store.ErrCorruptedData)
×
683
                }
×
684

685
                if resolved == MaxKeyResolutionLimit {
214✔
686
                        return nil, ErrKeyResolutionLimitReached
2✔
687
                }
2✔
688

689
                atTx := binary.BigEndian.Uint64(TrimPrefix(val))
210✔
690
                refKey := make([]byte, len(val)-1-8)
210✔
691
                copy(refKey, val[1+8:])
210✔
692

210✔
693
                if index != nil {
419✔
694
                        entry, err = d.getAtTx(ctx, refKey, atTx, resolved+1, index, 0, skipIntegrityCheck)
209✔
695
                        if err != nil {
210✔
696
                                return nil, err
1✔
697
                        }
1✔
698
                } else {
1✔
699
                        entry = &schema.Entry{
1✔
700
                                Key: TrimPrefix(refKey),
1✔
701
                                Tx:  atTx,
1✔
702
                        }
1✔
703
                }
1✔
704

705
                entry.ReferencedBy = &schema.Reference{
209✔
706
                        Tx:       txID,
209✔
707
                        Key:      TrimPrefix(key),
209✔
708
                        Metadata: schema.KVMetadataToProto(md),
209✔
709
                        AtTx:     atTx,
209✔
710
                        Revision: revision,
209✔
711
                }
209✔
712

209✔
713
                return entry, nil
209✔
714
        }
715

716
        return &schema.Entry{
5,253✔
717
                Tx:       txID,
5,253✔
718
                Key:      TrimPrefix(key),
5,253✔
719
                Metadata: schema.KVMetadataToProto(md),
5,253✔
720
                Value:    TrimPrefix(val),
5,253✔
721
                Revision: revision,
5,253✔
722
        }, nil
5,253✔
723
}
724

725
func (d *db) Health() (waitingCount int, lastReleaseAt time.Time) {
1✔
726
        return d.mutex.State()
1✔
727
}
1✔
728

729
// CurrentState ...
730
func (d *db) CurrentState() (*schema.ImmutableState, error) {
15,272✔
731
        lastTxID, lastTxAlh := d.st.CommittedAlh()
15,272✔
732
        lastPreTxID, lastPreTxAlh := d.st.PrecommittedAlh()
15,272✔
733

15,272✔
734
        return &schema.ImmutableState{
15,272✔
735
                TxId:               lastTxID,
15,272✔
736
                TxHash:             lastTxAlh[:],
15,272✔
737
                PrecommittedTxId:   lastPreTxID,
15,272✔
738
                PrecommittedTxHash: lastPreTxAlh[:],
15,272✔
739
        }, nil
15,272✔
740
}
15,272✔
741

742
// WaitForTx blocks caller until specified tx
743
func (d *db) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error {
8,405✔
744
        return d.st.WaitForTx(ctx, txID, allowPrecommitted)
8,405✔
745
}
8,405✔
746

747
// WaitForIndexingUpto blocks caller until specified tx gets indexed
748
func (d *db) WaitForIndexingUpto(ctx context.Context, txID uint64) error {
3,081✔
749
        return d.st.WaitForIndexingUpto(ctx, txID)
3,081✔
750
}
3,081✔
751

752
// VerifiableSet ...
753
func (d *db) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) {
49✔
754
        if req == nil {
50✔
755
                return nil, ErrIllegalArguments
1✔
756
        }
1✔
757

758
        lastTxID, _ := d.st.CommittedAlh()
48✔
759
        if lastTxID < req.ProveSinceTx {
49✔
760
                return nil, ErrIllegalState
1✔
761
        }
1✔
762

763
        // Preallocate tx buffers
764
        lastTx, err := d.allocTx()
47✔
765
        if err != nil {
47✔
766
                return nil, err
×
767
        }
×
768
        defer d.releaseTx(lastTx)
47✔
769

47✔
770
        txhdr, err := d.Set(ctx, req.SetRequest)
47✔
771
        if err != nil {
48✔
772
                return nil, err
1✔
773
        }
1✔
774

775
        err = d.st.ReadTx(uint64(txhdr.Id), false, lastTx)
46✔
776
        if err != nil {
46✔
777
                return nil, err
×
778
        }
×
779

780
        var prevTxHdr *store.TxHeader
46✔
781

46✔
782
        if req.ProveSinceTx == 0 {
66✔
783
                prevTxHdr = lastTx.Header()
20✔
784
        } else {
46✔
785
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
26✔
786
                if err != nil {
26✔
787
                        return nil, err
×
788
                }
×
789
        }
790

791
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
46✔
792
        if err != nil {
46✔
793
                return nil, err
×
794
        }
×
795

796
        return &schema.VerifiableTx{
46✔
797
                Tx:        schema.TxToProto(lastTx),
46✔
798
                DualProof: schema.DualProofToProto(dualProof),
46✔
799
        }, nil
46✔
800
}
801

802
// VerifiableGet ...
803
func (d *db) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) {
1,038✔
804
        if req == nil {
1,039✔
805
                return nil, ErrIllegalArguments
1✔
806
        }
1✔
807

808
        lastTxID, _ := d.st.CommittedAlh()
1,037✔
809
        if lastTxID < req.ProveSinceTx {
1,038✔
810
                return nil, ErrIllegalState
1✔
811
        }
1✔
812

813
        e, err := d.Get(ctx, req.KeyRequest)
1,036✔
814
        if err != nil {
1,040✔
815
                return nil, err
4✔
816
        }
4✔
817

818
        var vTxID uint64
1,032✔
819
        var vKey []byte
1,032✔
820

1,032✔
821
        if e.ReferencedBy == nil {
2,059✔
822
                vTxID = e.Tx
1,027✔
823
                vKey = e.Key
1,027✔
824
        } else {
1,032✔
825
                vTxID = e.ReferencedBy.Tx
5✔
826
                vKey = e.ReferencedBy.Key
5✔
827
        }
5✔
828

829
        // key-value inclusion proof
830
        tx, err := d.allocTx()
1,032✔
831
        if err != nil {
1,032✔
832
                return nil, err
×
833
        }
×
834
        defer d.releaseTx(tx)
1,032✔
835

1,032✔
836
        err = d.st.ReadTx(vTxID, false, tx)
1,032✔
837
        if err != nil {
1,032✔
838
                return nil, err
×
839
        }
×
840

841
        var rootTxHdr *store.TxHeader
1,032✔
842

1,032✔
843
        if req.ProveSinceTx == 0 {
2,043✔
844
                rootTxHdr = tx.Header()
1,011✔
845
        } else {
1,032✔
846
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
21✔
847
                if err != nil {
21✔
848
                        return nil, err
×
849
                }
×
850
        }
851

852
        inclusionProof, err := tx.Proof(EncodeKey(vKey))
1,032✔
853
        if err != nil {
1,032✔
854
                return nil, err
×
855
        }
×
856

857
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,032✔
858

1,032✔
859
        if req.ProveSinceTx <= vTxID {
2,058✔
860
                sourceTxHdr = rootTxHdr
1,026✔
861
                targetTxHdr = tx.Header()
1,026✔
862
        } else {
1,032✔
863
                sourceTxHdr = tx.Header()
6✔
864
                targetTxHdr = rootTxHdr
6✔
865
        }
6✔
866

867
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,032✔
868
        if err != nil {
1,032✔
869
                return nil, err
×
870
        }
×
871

872
        verifiableTx := &schema.VerifiableTx{
1,032✔
873
                Tx:        schema.TxToProto(tx),
1,032✔
874
                DualProof: schema.DualProofToProto(dualProof),
1,032✔
875
        }
1,032✔
876

1,032✔
877
        return &schema.VerifiableEntry{
1,032✔
878
                Entry:          e,
1,032✔
879
                VerifiableTx:   verifiableTx,
1,032✔
880
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
1,032✔
881
        }, nil
1,032✔
882
}
883

884
func (d *db) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) {
9✔
885
        if req == nil {
10✔
886
                return nil, ErrIllegalArguments
1✔
887
        }
1✔
888

889
        d.mutex.RLock()
8✔
890
        defer d.mutex.RUnlock()
8✔
891

8✔
892
        if d.isReplica() {
8✔
893
                return nil, ErrIsReplica
×
894
        }
×
895

896
        opts := store.DefaultTxOptions()
8✔
897

8✔
898
        if req.SinceTx > 0 {
9✔
899
                if req.SinceTx > d.st.LastPrecommittedTxID() {
2✔
900
                        return nil, store.ErrTxNotFound
1✔
901
                }
1✔
902

903
                opts.WithSnapshotMustIncludeTxID(func(_ uint64) uint64 {
×
904
                        return req.SinceTx
×
905
                })
×
906
        }
907

908
        tx, err := d.newTx(ctx, opts)
7✔
909
        if err != nil {
7✔
910
                return nil, err
×
911
        }
×
912
        defer tx.Cancel()
7✔
913

7✔
914
        for _, k := range req.Keys {
13✔
915
                if len(k) == 0 {
6✔
916
                        return nil, ErrIllegalArguments
×
917
                }
×
918

919
                md := store.NewKVMetadata()
6✔
920

6✔
921
                md.AsDeleted(true)
6✔
922

6✔
923
                e := EncodeEntrySpec(k, md, nil)
6✔
924

6✔
925
                err = tx.Delete(ctx, e.Key)
6✔
926
                if err != nil {
7✔
927
                        return nil, err
1✔
928
                }
1✔
929
        }
930

931
        var hdr *store.TxHeader
6✔
932
        if req.NoWait {
6✔
933
                hdr, err = tx.AsyncCommit(ctx)
×
934
        } else {
6✔
935
                hdr, err = tx.Commit(ctx)
6✔
936
        }
6✔
937
        if err != nil {
7✔
938
                return nil, err
1✔
939
        }
1✔
940

941
        return schema.TxHeaderToProto(hdr), nil
5✔
942
}
943

944
// GetAll ...
945
func (d *db) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) {
4✔
946
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
4✔
947
        if err != nil {
4✔
948
                return nil, err
×
949
        }
×
950
        defer snap.Close()
4✔
951

4✔
952
        list := &schema.Entries{}
4✔
953

4✔
954
        for _, key := range req.Keys {
14✔
955
                e, err := d.get(ctx, EncodeKey(key), snap, true)
10✔
956
                if err == nil || errors.Is(err, store.ErrKeyNotFound) {
20✔
957
                        if e != nil {
19✔
958
                                list.Entries = append(list.Entries, e)
9✔
959
                        }
9✔
960
                } else {
×
961
                        return nil, err
×
962
                }
×
963
        }
964

965
        return list, nil
4✔
966
}
967

968
func (d *db) Size() (uint64, error) {
103✔
969
        return d.st.Size()
103✔
970
}
103✔
971

972
// TxCount ...
973
func (d *db) TxCount() (uint64, error) {
330✔
974
        return d.st.TxCount(), nil
330✔
975
}
330✔
976

977
// Count ...
978
func (d *db) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) {
8✔
979
        if prefix == nil {
10✔
980
                return nil, ErrIllegalArguments
2✔
981
        }
2✔
982

983
        tx, err := d.st.NewTx(ctx, store.DefaultTxOptions().WithMode(store.ReadOnlyTx))
6✔
984
        if err != nil {
6✔
985
                return nil, err
×
986
        }
×
987
        defer tx.Cancel()
6✔
988

6✔
989
        keyReader, err := tx.NewKeyReader(store.KeyReaderSpec{
6✔
990
                Prefix: WrapWithPrefix(prefix.Prefix, SetKeyPrefix),
6✔
991
        })
6✔
992
        if err != nil {
6✔
993
                return nil, err
×
994
        }
×
995
        defer keyReader.Close()
6✔
996

6✔
997
        count := 0
6✔
998

6✔
999
        for {
19✔
1000
                _, _, err := keyReader.Read(ctx)
13✔
1001
                if errors.Is(err, store.ErrNoMoreEntries) {
19✔
1002
                        break
6✔
1003
                }
1004
                if err != nil {
7✔
1005
                        return nil, err
×
1006
                }
×
1007

1008
                count++
7✔
1009
        }
1010

1011
        return &schema.EntryCount{Count: uint64(count)}, nil
6✔
1012
}
1013

1014
// CountAll ...
1015
func (d *db) CountAll(ctx context.Context) (*schema.EntryCount, error) {
4✔
1016
        return d.Count(ctx, &schema.KeyPrefix{})
4✔
1017
}
4✔
1018

1019
// TxByID ...
1020
func (d *db) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) {
62✔
1021
        if req == nil {
63✔
1022
                return nil, ErrIllegalArguments
1✔
1023
        }
1✔
1024

1025
        var snap *store.Snapshot
61✔
1026
        var err error
61✔
1027

61✔
1028
        tx, err := d.allocTx()
61✔
1029
        if err != nil {
61✔
1030
                return nil, err
×
1031
        }
×
1032
        defer d.releaseTx(tx)
61✔
1033

61✔
1034
        if !req.KeepReferencesUnresolved {
120✔
1035
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
59✔
1036
                if err != nil {
59✔
1037
                        return nil, err
×
1038
                }
×
1039
                defer snap.Close()
59✔
1040
        }
1041

1042
        // key-value inclusion proof
1043
        err = d.st.ReadTx(req.Tx, false, tx)
61✔
1044
        if err != nil {
61✔
1045
                return nil, err
×
1046
        }
×
1047

1048
        return d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
61✔
1049
}
1050

1051
func (d *db) snapshotSince(ctx context.Context, prefix []byte, txID uint64) (*store.Snapshot, error) {
1,706✔
1052
        currTxID, _ := d.st.CommittedAlh()
1,706✔
1053

1,706✔
1054
        if txID > currTxID {
1,706✔
1055
                return nil, ErrIllegalArguments
×
1056
        }
×
1057

1058
        waitUntilTx := txID
1,706✔
1059
        if waitUntilTx == 0 {
3,346✔
1060
                waitUntilTx = currTxID
1,640✔
1061
        }
1,640✔
1062

1063
        return d.st.SnapshotMustIncludeTxID(ctx, prefix, waitUntilTx)
1,706✔
1064
}
1065

1066
func (d *db) serializeTx(ctx context.Context, tx *store.Tx, spec *schema.EntriesSpec, snap *store.Snapshot, skipIntegrityCheck bool) (*schema.Tx, error) {
1,372✔
1067
        if spec == nil {
2,234✔
1068
                return schema.TxToProto(tx), nil
862✔
1069
        }
862✔
1070

1071
        stx := &schema.Tx{
510✔
1072
                Header: schema.TxHeaderToProto(tx.Header()),
510✔
1073
        }
510✔
1074

510✔
1075
        for _, e := range tx.Entries() {
1,048✔
1076
                switch e.Key()[0] {
538✔
1077
                case SetKeyPrefix:
39✔
1078
                        {
78✔
1079
                                if spec.KvEntriesSpec == nil || spec.KvEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
57✔
1080
                                        break
18✔
1081
                                }
1082

1083
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
24✔
1084
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
3✔
1085
                                        break
3✔
1086
                                }
1087

1088
                                v, err := d.st.ReadValue(e)
18✔
1089
                                if errors.Is(err, store.ErrExpiredEntry) {
18✔
1090
                                        break
×
1091
                                }
1092
                                if err != nil {
18✔
1093
                                        return nil, err
×
1094
                                }
×
1095

1096
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
27✔
1097
                                        kve := schema.TxEntryToProto(e)
9✔
1098
                                        kve.Value = v
9✔
1099
                                        stx.Entries = append(stx.Entries, kve)
9✔
1100
                                        break
9✔
1101
                                }
1102

1103
                                // resolve entry
1104
                                var index store.KeyIndex
9✔
1105
                                if snap != nil {
16✔
1106
                                        index = snap
7✔
1107
                                }
7✔
1108

1109
                                kve, err := d.resolveValue(ctx, e.Key(), v, 0, tx.Header().ID, e.Metadata(), index, 0, skipIntegrityCheck)
9✔
1110
                                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
10✔
1111
                                        break // ignore deleted ones (referenced key may have been deleted)
1✔
1112
                                }
1113
                                if err != nil {
8✔
1114
                                        return nil, err
×
1115
                                }
×
1116

1117
                                stx.KvEntries = append(stx.KvEntries, kve)
8✔
1118
                        }
1119
                case SortedSetKeyPrefix:
14✔
1120
                        {
28✔
1121
                                if spec.ZEntriesSpec == nil || spec.ZEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
22✔
1122
                                        break
8✔
1123
                                }
1124

1125
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
7✔
1126
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1127
                                        break
1✔
1128
                                }
1129

1130
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
6✔
1131
                                        v, err := d.st.ReadValue(e)
1✔
1132
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1133
                                                break
×
1134
                                        }
1135
                                        if err != nil {
1✔
1136
                                                return nil, err
×
1137
                                        }
×
1138

1139
                                        kve := schema.TxEntryToProto(e)
1✔
1140
                                        kve.Value = v
1✔
1141
                                        stx.Entries = append(stx.Entries, kve)
1✔
1142
                                        break
1✔
1143
                                }
1144

1145
                                // zKey = [1+setLenLen+set+scoreLen+keyLenLen+1+key+txIDLen]
1146
                                zKey := e.Key()
4✔
1147

4✔
1148
                                setLen := int(binary.BigEndian.Uint64(zKey[1:]))
4✔
1149
                                set := make([]byte, setLen)
4✔
1150
                                copy(set, zKey[1+setLenLen:])
4✔
1151

4✔
1152
                                scoreOff := 1 + setLenLen + setLen
4✔
1153
                                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
4✔
1154
                                score := math.Float64frombits(scoreB)
4✔
1155

4✔
1156
                                keyOff := scoreOff + scoreLen + keyLenLen
4✔
1157
                                key := make([]byte, len(zKey)-keyOff-txIDLen)
4✔
1158
                                copy(key, zKey[keyOff:])
4✔
1159

4✔
1160
                                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
4✔
1161

4✔
1162
                                var entry *schema.Entry
4✔
1163
                                var err error
4✔
1164

4✔
1165
                                if snap != nil {
7✔
1166
                                        entry, err = d.getAtTx(ctx, key, atTx, 1, snap, 0, skipIntegrityCheck)
3✔
1167
                                        if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
3✔
1168
                                                break // ignore deleted ones (referenced key may have been deleted)
×
1169
                                        }
1170
                                        if err != nil {
3✔
1171
                                                return nil, err
×
1172
                                        }
×
1173
                                }
1174

1175
                                zentry := &schema.ZEntry{
4✔
1176
                                        Set:   set,
4✔
1177
                                        Key:   key[1:],
4✔
1178
                                        Entry: entry,
4✔
1179
                                        Score: score,
4✔
1180
                                        AtTx:  atTx,
4✔
1181
                                }
4✔
1182

4✔
1183
                                stx.ZEntries = append(stx.ZEntries, zentry)
4✔
1184
                        }
1185
                case SQLPrefix:
5✔
1186
                        {
10✔
1187
                                if spec.SqlEntriesSpec == nil || spec.SqlEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
7✔
1188
                                        break
2✔
1189
                                }
1190

1191
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
4✔
1192
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1193
                                        break
1✔
1194
                                }
1195

1196
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
3✔
1197
                                        v, err := d.st.ReadValue(e)
1✔
1198
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1199
                                                break
×
1200
                                        }
1201
                                        if err != nil {
1✔
1202
                                                return nil, err
×
1203
                                        }
×
1204

1205
                                        kve := schema.TxEntryToProto(e)
1✔
1206
                                        kve.Value = v
1✔
1207
                                        stx.Entries = append(stx.Entries, kve)
1✔
1208
                                        break
1✔
1209
                                }
1210

1211
                                return nil, fmt.Errorf("%w: sql entry resolution is not supported", ErrIllegalArguments)
1✔
1212
                        }
1213
                }
1214
        }
1215

1216
        return stx, nil
509✔
1217
}
1218

1219
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
6,725✔
1220
        d.replicaStatesMutex.Lock()
6,725✔
1221
        defer d.replicaStatesMutex.Unlock()
6,725✔
1222

6,725✔
1223
        // clean up replicaStates
6,725✔
1224
        // it's safe to remove up to latest tx committed in primary
6,725✔
1225
        for uuid, st := range d.replicaStates {
7,159✔
1226
                if st.precommittedTxID <= committedTxID {
744✔
1227
                        delete(d.replicaStates, uuid)
310✔
1228
                }
310✔
1229
        }
1230

1231
        if newReplicaState.PrecommittedTxID <= committedTxID {
13,096✔
1232
                // as far as the primary is concerned, nothing really new has happened
6,371✔
1233
                return nil
6,371✔
1234
        }
6,371✔
1235

1236
        newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)
354✔
1237

354✔
1238
        replicaSt, ok := d.replicaStates[newReplicaState.UUID]
354✔
1239
        if ok {
392✔
1240
                if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
38✔
1241
                        return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
×
1242
                }
×
1243

1244
                if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
75✔
1245
                        // as of the last informed replica status update, nothing has changed
37✔
1246
                        return nil
37✔
1247
                }
37✔
1248

1249
                // actual replication progress is informed by the replica
1250
                replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
1✔
1251
                replicaSt.precommittedAlh = newReplicaAlh
1✔
1252
        } else {
316✔
1253
                // replica informs first replication state
316✔
1254
                d.replicaStates[newReplicaState.UUID] = &replicaState{
316✔
1255
                        precommittedTxID: newReplicaState.PrecommittedTxID,
316✔
1256
                        precommittedAlh:  newReplicaAlh,
316✔
1257
                }
316✔
1258
        }
316✔
1259

1260
        // check up to which tx enough replicas ack replication and it's safe to commit
1261
        mayCommitUpToTxID := uint64(0)
317✔
1262
        if len(d.replicaStates) > 0 {
634✔
1263
                mayCommitUpToTxID = math.MaxUint64
317✔
1264
        }
317✔
1265

1266
        allowances := 0
317✔
1267

317✔
1268
        // we may clean up replicaStates from those who are lagging behind commit
317✔
1269
        for _, st := range d.replicaStates {
715✔
1270
                if st.precommittedTxID < mayCommitUpToTxID {
715✔
1271
                        mayCommitUpToTxID = st.precommittedTxID
317✔
1272
                }
317✔
1273
                allowances++
398✔
1274
        }
1275

1276
        if allowances >= d.options.syncAcks {
603✔
1277
                err := d.st.AllowCommitUpto(mayCommitUpToTxID)
286✔
1278
                if err != nil {
286✔
1279
                        return err
×
1280
                }
×
1281
        }
1282

1283
        return nil
317✔
1284
}
1285

1286
func (d *db) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) {
8,407✔
1287
        if req == nil {
8,407✔
1288
                return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
×
1289
        }
×
1290

1291
        if d.replicaStates == nil && req.ReplicaState != nil {
8,407✔
1292
                return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
×
1293
        }
×
1294

1295
        tx, err := d.allocTx()
8,407✔
1296
        if err != nil {
8,407✔
1297
                return nil, 0, mayCommitUpToAlh, err
×
1298
        }
×
1299
        defer d.releaseTx(tx)
8,407✔
1300

8,407✔
1301
        committedTxID, committedAlh := d.st.CommittedAlh()
8,407✔
1302
        preCommittedTxID, _ := d.st.PrecommittedAlh()
8,407✔
1303

8,407✔
1304
        if req.ReplicaState != nil {
15,134✔
1305
                if req.ReplicaState.CommittedTxID > 0 {
12,924✔
1306
                        // validate replica commit state
6,197✔
1307
                        if req.ReplicaState.CommittedTxID > committedTxID {
6,197✔
1308
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1309
                        }
×
1310

1311
                        // integrityCheck is currently required to validate Alh
1312
                        expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false, false)
6,197✔
1313
                        if err != nil {
6,197✔
1314
                                return nil, committedTxID, committedAlh, err
×
1315
                        }
×
1316

1317
                        replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
6,197✔
1318

6,197✔
1319
                        if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
6,197✔
1320
                                return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(), fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1321
                        }
×
1322
                }
1323

1324
                if req.ReplicaState.PrecommittedTxID > 0 {
13,117✔
1325
                        // validate replica precommit state
6,390✔
1326
                        if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
6,392✔
1327
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
2✔
1328
                        }
2✔
1329

1330
                        // integrityCheck is currently required to validate Alh
1331
                        expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true, false)
6,388✔
1332
                        if err != nil {
6,388✔
1333
                                return nil, committedTxID, committedAlh, err
×
1334
                        }
×
1335

1336
                        replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
6,388✔
1337

6,388✔
1338
                        if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
6,388✔
1339
                                return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(), fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1340
                        }
×
1341

1342
                        // primary will provide commit state to the replica so it can commit pre-committed transactions
1343
                        if req.ReplicaState.PrecommittedTxID < committedTxID {
7,813✔
1344
                                // if replica is behind current commit state in primary
1,425✔
1345
                                // return the alh up to the point known by the replica.
1,425✔
1346
                                // That way the replica is able to validate is following the right primary.
1,425✔
1347
                                mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
1,425✔
1348
                                mayCommitUpToAlh = replicaPreCommittedAlh
1,425✔
1349
                        } else {
6,388✔
1350
                                mayCommitUpToTxID = committedTxID
4,963✔
1351
                                mayCommitUpToAlh = committedAlh
4,963✔
1352
                        }
4,963✔
1353
                }
1354

1355
                err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
6,725✔
1356
                if err != nil {
6,725✔
1357
                        return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1358
                }
×
1359
        }
1360

1361
        // it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
1362
        // current timeout it's not a special value but at least a relative one
1363
        // note: primary might also be waiting ack from any replica (even this primary may do progress)
1364

1365
        // TODO: under some circumstances, replica might not be able to do further progress until primary
1366
        // has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
1367
        var cancel context.CancelFunc
8,405✔
1368

8,405✔
1369
        if req.ReplicaState != nil {
15,130✔
1370
                ctx, cancel = context.WithTimeout(ctx, d.options.storeOpts.SyncFrequency*4)
6,725✔
1371
                defer cancel()
6,725✔
1372
        }
6,725✔
1373

1374
        err = d.WaitForTx(ctx, req.Tx, req.AllowPreCommitted)
8,405✔
1375
        if ctx.Err() != nil {
9,015✔
1376
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, nil
610✔
1377
        }
610✔
1378
        if err != nil {
7,795✔
1379
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1380
        }
×
1381

1382
        txbs, err = d.st.ExportTx(req.Tx, req.AllowPreCommitted, req.SkipIntegrityCheck, tx)
7,795✔
1383
        if err != nil {
7,795✔
1384
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1385
        }
×
1386

1387
        return txbs, mayCommitUpToTxID, mayCommitUpToAlh, nil
7,795✔
1388
}
1389

1390
func (d *db) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) {
7,725✔
1391
        d.mutex.RLock()
7,725✔
1392
        defer d.mutex.RUnlock()
7,725✔
1393

7,725✔
1394
        if !d.isReplica() {
7,725✔
1395
                return nil, ErrNotReplica
×
1396
        }
×
1397

1398
        hdr, err := d.st.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing)
7,725✔
1399
        if err != nil {
7,755✔
1400
                return nil, err
30✔
1401
        }
30✔
1402

1403
        return schema.TxHeaderToProto(hdr), nil
7,695✔
1404
}
1405

1406
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
1407
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
6,096✔
1408
        d.mutex.RLock()
6,096✔
1409
        defer d.mutex.RUnlock()
6,096✔
1410

6,096✔
1411
        if !d.isReplica() {
6,096✔
1412
                return ErrNotReplica
×
1413
        }
×
1414

1415
        // replica pre-committed state must be consistent with primary
1416

1417
        committedTxID, committedAlh := d.st.CommittedAlh()
6,096✔
1418
        // handling a particular case in an optimized manner
6,096✔
1419
        if committedTxID == txID {
6,398✔
1420
                if committedAlh != alh {
302✔
1421
                        return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1422
                }
×
1423
                return nil
302✔
1424
        }
1425

1426
        hdr, err := d.st.ReadTxHeader(txID, true, false)
5,794✔
1427
        if err != nil {
5,794✔
1428
                return err
×
1429
        }
×
1430

1431
        if hdr.Alh() != alh {
5,794✔
1432
                return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1433
        }
×
1434

1435
        return d.st.AllowCommitUpto(txID)
5,794✔
1436
}
1437

1438
func (d *db) DiscardPrecommittedTxsSince(txID uint64) error {
2✔
1439
        d.mutex.RLock()
2✔
1440
        defer d.mutex.RUnlock()
2✔
1441

2✔
1442
        _, err := d.st.DiscardPrecommittedTxsSince(txID)
2✔
1443

2✔
1444
        return err
2✔
1445
}
2✔
1446

1447
// VerifiableTxByID ...
1448
func (d *db) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) {
1,294✔
1449
        if req == nil {
1,295✔
1450
                return nil, ErrIllegalArguments
1✔
1451
        }
1✔
1452

1453
        lastTxID, _ := d.st.CommittedAlh()
1,293✔
1454
        if lastTxID < req.ProveSinceTx {
1,293✔
1455
                return nil, fmt.Errorf("%w: latest txID=%d is lower than specified as initial tx=%d", ErrIllegalState, lastTxID, req.ProveSinceTx)
×
1456
        }
×
1457

1458
        var snap *store.Snapshot
1,293✔
1459
        var err error
1,293✔
1460

1,293✔
1461
        if !req.KeepReferencesUnresolved {
2,586✔
1462
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
1,293✔
1463
                if err != nil {
1,293✔
1464
                        return nil, err
×
1465
                }
×
1466
                defer snap.Close()
1,293✔
1467
        }
1468

1469
        reqTx, err := d.allocTx()
1,293✔
1470
        if err != nil {
1,293✔
1471
                return nil, err
×
1472
        }
×
1473
        defer d.releaseTx(reqTx)
1,293✔
1474

1,293✔
1475
        err = d.st.ReadTx(req.Tx, false, reqTx)
1,293✔
1476
        if err != nil {
1,293✔
1477
                return nil, err
×
1478
        }
×
1479

1480
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,293✔
1481
        var rootTxHdr *store.TxHeader
1,293✔
1482

1,293✔
1483
        if req.ProveSinceTx == 0 {
1,297✔
1484
                rootTxHdr = reqTx.Header()
4✔
1485
        } else {
1,293✔
1486
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
1,289✔
1487
                if err != nil {
1,289✔
1488
                        return nil, err
×
1489
                }
×
1490
        }
1491

1492
        if req.ProveSinceTx <= req.Tx {
2,585✔
1493
                sourceTxHdr = rootTxHdr
1,292✔
1494
                targetTxHdr = reqTx.Header()
1,292✔
1495
        } else {
1,293✔
1496
                sourceTxHdr = reqTx.Header()
1✔
1497
                targetTxHdr = rootTxHdr
1✔
1498
        }
1✔
1499

1500
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,293✔
1501
        if err != nil {
1,293✔
1502
                return nil, err
×
1503
        }
×
1504

1505
        sReqTx, err := d.serializeTx(ctx, reqTx, req.EntriesSpec, snap, true)
1,293✔
1506
        if err != nil {
1,293✔
1507
                return nil, err
×
1508
        }
×
1509

1510
        return &schema.VerifiableTx{
1,293✔
1511
                Tx:        sReqTx,
1,293✔
1512
                DualProof: schema.DualProofToProto(dualProof),
1,293✔
1513
        }, nil
1,293✔
1514
}
1515

1516
// TxScan ...
1517
func (d *db) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) {
10✔
1518
        if req == nil {
11✔
1519
                return nil, ErrIllegalArguments
1✔
1520
        }
1✔
1521

1522
        if int(req.Limit) > d.maxResultSize {
10✔
1523
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1524
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1525
        }
1✔
1526

1527
        tx, err := d.allocTx()
8✔
1528
        if err != nil {
8✔
1529
                return nil, err
×
1530
        }
×
1531
        defer d.releaseTx(tx)
8✔
1532

8✔
1533
        limit := int(req.Limit)
8✔
1534
        if req.Limit == 0 {
13✔
1535
                limit = d.maxResultSize
5✔
1536
        }
5✔
1537

1538
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
8✔
1539
        if err != nil {
8✔
1540
                return nil, err
×
1541
        }
×
1542
        defer snap.Close()
8✔
1543

8✔
1544
        txReader, err := d.st.NewTxReader(req.InitialTx, req.Desc, tx)
8✔
1545
        if err != nil {
9✔
1546
                return nil, err
1✔
1547
        }
1✔
1548

1549
        txList := &schema.TxList{}
7✔
1550

7✔
1551
        for l := 1; l <= limit; l++ {
27✔
1552
                tx, err := txReader.Read()
20✔
1553
                if errors.Is(err, store.ErrNoMoreEntries) {
22✔
1554
                        break
2✔
1555
                }
1556
                if err != nil {
18✔
1557
                        return nil, err
×
1558
                }
×
1559

1560
                sTx, err := d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
18✔
1561
                if err != nil {
18✔
1562
                        return nil, err
×
1563
                }
×
1564

1565
                txList.Txs = append(txList.Txs, sTx)
18✔
1566
        }
1567

1568
        return txList, nil
7✔
1569
}
1570

1571
// History ...
1572
func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) {
9✔
1573
        if req == nil {
10✔
1574
                return nil, ErrIllegalArguments
1✔
1575
        }
1✔
1576

1577
        if int(req.Limit) > d.maxResultSize {
9✔
1578
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1579
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1580
        }
1✔
1581

1582
        currTxID, _ := d.st.CommittedAlh()
7✔
1583

7✔
1584
        if req.SinceTx > currTxID {
7✔
1585
                return nil, ErrIllegalArguments
×
1586
        }
×
1587

1588
        waitUntilTx := req.SinceTx
7✔
1589
        if waitUntilTx == 0 {
9✔
1590
                waitUntilTx = currTxID
2✔
1591
        }
2✔
1592

1593
        err := d.WaitForIndexingUpto(ctx, waitUntilTx)
7✔
1594
        if err != nil {
8✔
1595
                return nil, err
1✔
1596
        }
1✔
1597

1598
        limit := int(req.Limit)
6✔
1599
        if limit == 0 {
12✔
1600
                limit = d.maxResultSize
6✔
1601
        }
6✔
1602

1603
        key := EncodeKey(req.Key)
6✔
1604

6✔
1605
        valRefs, _, err := d.st.History(key, req.Offset, req.Desc, limit)
6✔
1606
        if err != nil && err != store.ErrOffsetOutOfRange {
6✔
1607
                return nil, err
×
1608
        }
×
1609

1610
        list := &schema.Entries{
6✔
1611
                Entries: make([]*schema.Entry, len(valRefs)),
6✔
1612
        }
6✔
1613

6✔
1614
        for i, valRef := range valRefs {
114✔
1615
                val, err := valRef.Resolve()
108✔
1616
                if err != nil && err != store.ErrExpiredEntry {
108✔
1617
                        return nil, err
×
1618
                }
×
1619
                if len(val) > 0 {
214✔
1620
                        val = TrimPrefix(val)
106✔
1621
                }
106✔
1622

1623
                list.Entries[i] = &schema.Entry{
108✔
1624
                        Tx:       valRef.Tx(),
108✔
1625
                        Key:      req.Key,
108✔
1626
                        Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
108✔
1627
                        Value:    val,
108✔
1628
                        Expired:  errors.Is(err, store.ErrExpiredEntry),
108✔
1629
                        Revision: valRef.HC(),
108✔
1630
                }
108✔
1631
        }
1632
        return list, nil
6✔
1633
}
1634

1635
func (d *db) IsClosed() bool {
196✔
1636
        d.closingMutex.Lock()
196✔
1637
        defer d.closingMutex.Unlock()
196✔
1638

196✔
1639
        return d.st.IsClosed()
196✔
1640
}
196✔
1641

1642
// Close ...
1643
func (d *db) Close() (err error) {
721✔
1644
        d.closingMutex.Lock()
721✔
1645
        defer d.closingMutex.Unlock()
721✔
1646

721✔
1647
        d.Logger.Infof("closing database '%s'...", d.name)
721✔
1648

721✔
1649
        defer func() {
1,442✔
1650
                if err == nil {
1,348✔
1651
                        d.Logger.Infof("database '%s' successfully closed", d.name)
627✔
1652
                } else {
721✔
1653
                        d.Logger.Infof("%v: while closing database '%s'", err, d.name)
94✔
1654
                }
94✔
1655
        }()
1656

1657
        return d.st.Close()
721✔
1658
}
1659

1660
// GetName ...
1661
func (d *db) GetName() string {
54,963✔
1662
        return d.name
54,963✔
1663
}
54,963✔
1664

1665
// GetOptions ...
1666
func (d *db) GetOptions() *Options {
3,082✔
1667
        d.mutex.RLock()
3,082✔
1668
        defer d.mutex.RUnlock()
3,082✔
1669

3,082✔
1670
        return d.options
3,082✔
1671
}
3,082✔
1672

1673
func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
15✔
1674
        d.mutex.Lock()
15✔
1675
        defer d.mutex.Unlock()
15✔
1676

15✔
1677
        d.replicaStatesMutex.Lock()
15✔
1678
        defer d.replicaStatesMutex.Unlock()
15✔
1679

15✔
1680
        d.options.replica = asReplica
15✔
1681
        d.options.syncAcks = syncAcks
15✔
1682
        d.options.syncReplication = syncReplication
15✔
1683

15✔
1684
        if asReplica {
23✔
1685
                d.replicaStates = nil
8✔
1686
        } else if syncAcks > 0 {
18✔
1687
                d.replicaStates = make(map[string]*replicaState, syncAcks)
3✔
1688
        }
3✔
1689

1690
        d.st.SetExternalCommitAllowance(syncReplication)
15✔
1691
}
1692

1693
func (d *db) IsReplica() bool {
448✔
1694
        d.mutex.RLock()
448✔
1695
        defer d.mutex.RUnlock()
448✔
1696

448✔
1697
        return d.isReplica()
448✔
1698
}
448✔
1699

1700
func (d *db) isReplica() bool {
19,585✔
1701
        return d.options.replica
19,585✔
1702
}
19,585✔
1703

1704
func (d *db) IsSyncReplicationEnabled() bool {
8,405✔
1705
        d.mutex.RLock()
8,405✔
1706
        defer d.mutex.RUnlock()
8,405✔
1707

8,405✔
1708
        return d.options.syncReplication
8,405✔
1709
}
8,405✔
1710

1711
func (d *db) SetSyncReplication(enabled bool) {
217✔
1712
        d.mutex.Lock()
217✔
1713
        defer d.mutex.Unlock()
217✔
1714

217✔
1715
        d.st.SetExternalCommitAllowance(enabled)
217✔
1716

217✔
1717
        d.options.syncReplication = enabled
217✔
1718
}
217✔
1719

1720
func logErr(log logger.Logger, formattedMessage string, err error) error {
1✔
1721
        if err != nil {
2✔
1722
                log.Errorf(formattedMessage, err)
1✔
1723
        }
1✔
1724
        return err
1✔
1725
}
1726

1727
// CopyCatalog creates a copy of the sql catalog and returns a transaction
1728
// that can be used to commit the copy.
1729
func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
13✔
1730
        // copy the sql catalog
13✔
1731
        err := d.sqlEngine.CopyCatalogToTx(ctx, tx)
13✔
1732
        if err != nil {
13✔
1733
                return err
×
1734
        }
×
1735

1736
        // copy the document store catalog
1737
        err = d.documentEngine.CopyCatalogToTx(ctx, tx)
13✔
1738
        if err != nil {
13✔
1739
                return err
×
1740
        }
×
1741

1742
        return nil
13✔
1743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc