• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 6891668741

16 Nov 2023 01:50PM UTC coverage: 89.507% (+0.02%) from 89.485%
6891668741

push

gh-ci

jeroiraz
Update README.md

33923 of 37900 relevant lines covered (89.51%)

143515.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.49
/pkg/database/sorted_set.go
1
/*
2
Copyright 2022 Codenotary Inc. All rights reserved.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "io"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
)
30

31
const setLenLen = 8
32
const scoreLen = 8
33
const keyLenLen = 8
34
const txIDLen = 8
35

36
// ZAdd adds a score for an existing key in a sorted set
37
// As a parameter of ZAddOptions is possible to provide the associated index of the provided key. In this way, when resolving reference, the specified version of the key will be returned.
38
// If the index is not provided the resolution will use only the key and last version of the item will be returned
39
// If ZAddOptions.index is provided key is optional
40
func (d *db) ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error) {
157✔
41
        if req == nil || len(req.Set) == 0 || len(req.Key) == 0 {
160✔
42
                return nil, store.ErrIllegalArguments
3✔
43
        }
3✔
44

45
        if (req.AtTx == 0 && req.BoundRef) || (req.AtTx > 0 && !req.BoundRef) {
155✔
46
                return nil, store.ErrIllegalArguments
1✔
47
        }
1✔
48

49
        d.mutex.Lock()
153✔
50
        defer d.mutex.Unlock()
153✔
51

153✔
52
        if d.isReplica() {
154✔
53
                return nil, ErrIsReplica
1✔
54
        }
1✔
55

56
        lastTxID, _ := d.st.CommittedAlh()
152✔
57
        err := d.st.WaitForIndexingUpto(ctx, lastTxID)
152✔
58
        if err != nil {
152✔
59
                return nil, err
×
60
        }
×
61

62
        // check referenced key exists and it's not a reference
63
        key := EncodeKey(req.Key)
152✔
64

152✔
65
        refEntry, err := d.getAtTx(ctx, key, req.AtTx, 0, d.st, 0, true)
152✔
66
        if err != nil {
153✔
67
                return nil, err
1✔
68
        }
1✔
69
        if refEntry.ReferencedBy != nil {
152✔
70
                return nil, ErrReferencedKeyCannotBeAReference
1✔
71
        }
1✔
72

73
        tx, err := d.st.NewWriteOnlyTx(ctx)
150✔
74
        if err != nil {
150✔
75
                return nil, err
×
76
        }
×
77
        defer tx.Cancel()
150✔
78

150✔
79
        e := EncodeZAdd(req.Set, req.Score, key, req.AtTx)
150✔
80

150✔
81
        err = tx.Set(e.Key, e.Metadata, e.Value)
150✔
82
        if err != nil {
150✔
83
                return nil, err
×
84
        }
×
85

86
        var hdr *store.TxHeader
150✔
87

150✔
88
        if req.NoWait {
150✔
89
                hdr, err = tx.AsyncCommit(ctx)
×
90
        } else {
150✔
91
                hdr, err = tx.Commit(ctx)
150✔
92
        }
150✔
93
        if err != nil {
150✔
94
                return nil, err
×
95
        }
×
96

97
        return schema.TxHeaderToProto(hdr), nil
150✔
98
}
99

100
// ZScan ...
101
func (d *db) ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error) {
35✔
102
        if req == nil || len(req.Set) == 0 {
36✔
103
                return nil, store.ErrIllegalArguments
1✔
104
        }
1✔
105

106
        if req.Limit > uint64(d.maxResultSize) {
35✔
107
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
108
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
109
        }
1✔
110

111
        limit := int(req.Limit)
33✔
112

33✔
113
        if req.Limit == 0 {
60✔
114
                limit = d.maxResultSize
27✔
115
        }
27✔
116

117
        d.mutex.RLock()
33✔
118
        defer d.mutex.RUnlock()
33✔
119

33✔
120
        currTxID, _ := d.st.CommittedAlh()
33✔
121

33✔
122
        if req.SinceTx > currTxID {
33✔
123
                return nil, ErrIllegalArguments
×
124
        }
×
125

126
        prefix := make([]byte, 1+setLenLen+len(req.Set))
33✔
127
        prefix[0] = SortedSetKeyPrefix
33✔
128
        binary.BigEndian.PutUint64(prefix[1:], uint64(len(req.Set)))
33✔
129
        copy(prefix[1+setLenLen:], req.Set)
33✔
130

33✔
131
        var seekKey []byte
33✔
132

33✔
133
        if len(req.SeekKey) == 0 {
62✔
134
                seekKey = make([]byte, len(prefix)+scoreLen)
29✔
135
                copy(seekKey, prefix)
29✔
136
                // here we compose the offset if Min score filter is provided only if is not reversed order
29✔
137
                if req.MinScore != nil && !req.Desc {
31✔
138
                        binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(req.MinScore.Score))
2✔
139
                }
2✔
140
                // here we compose the offset if Max score filter is provided only if is reversed order
141
                if req.Desc {
30✔
142
                        var maxScore float64
1✔
143

1✔
144
                        if req.MaxScore == nil {
1✔
145
                                maxScore = math.MaxFloat64
×
146
                        } else {
1✔
147
                                maxScore = req.MaxScore.Score
1✔
148
                        }
1✔
149

150
                        binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(maxScore))
1✔
151
                }
152
        } else {
4✔
153
                seekKey = make([]byte, len(prefix)+scoreLen+keyLenLen+1+len(req.SeekKey)+txIDLen)
4✔
154
                copy(seekKey, prefix)
4✔
155
                binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(req.SeekScore))
4✔
156
                binary.BigEndian.PutUint64(seekKey[len(prefix)+scoreLen:], uint64(1+len(req.SeekKey)))
4✔
157
                copy(seekKey[len(prefix)+scoreLen+keyLenLen:], EncodeKey(req.SeekKey))
4✔
158
                binary.BigEndian.PutUint64(seekKey[len(prefix)+scoreLen+keyLenLen+1+len(req.SeekKey):], req.SeekAtTx)
4✔
159
        }
4✔
160

161
        zsnap, err := d.snapshotSince(ctx, []byte{SortedSetKeyPrefix}, req.SinceTx)
33✔
162
        if err != nil {
34✔
163
                return nil, err
1✔
164
        }
1✔
165
        defer zsnap.Close()
32✔
166

32✔
167
        r, err := zsnap.NewKeyReader(
32✔
168
                store.KeyReaderSpec{
32✔
169
                        SeekKey:       seekKey,
32✔
170
                        Prefix:        prefix,
32✔
171
                        InclusiveSeek: req.InclusiveSeek,
32✔
172
                        DescOrder:     req.Desc,
32✔
173
                        Filters:       []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
32✔
174
                        Offset:        req.Offset,
32✔
175
                })
32✔
176
        if err != nil {
32✔
177
                return nil, err
×
178
        }
×
179
        defer r.Close()
32✔
180

32✔
181
        kvsnap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
32✔
182
        if err != nil {
32✔
183
                return nil, err
×
184
        }
×
185
        defer kvsnap.Close()
32✔
186

32✔
187
        entries := &schema.ZEntries{}
32✔
188

32✔
189
        for l := 1; l <= limit; l++ {
1,305✔
190
                zKey, _, err := r.Read(ctx)
1,273✔
191
                if errors.Is(err, store.ErrNoMoreEntries) {
1,299✔
192
                        break
26✔
193
                }
194
                if err != nil {
1,247✔
195
                        return nil, err
×
196
                }
×
197

198
                // zKey = [1+setLenLen+len(req.Set)+scoreLen+keyLenLen+1+len(req.Key)+txIDLen]
199
                scoreOff := 1 + setLenLen + len(req.Set)
1,247✔
200
                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
1,247✔
201
                score := math.Float64frombits(scoreB)
1,247✔
202

1,247✔
203
                // Guard to ensure that score match the filter range if filter is provided
1,247✔
204
                if req.MinScore != nil && score < req.MinScore.Score {
1,247✔
205
                        continue
×
206
                }
207
                if req.MaxScore != nil && score > req.MaxScore.Score {
1,247✔
208
                        continue
×
209
                }
210

211
                keyOff := scoreOff + scoreLen + keyLenLen
1,247✔
212
                key := make([]byte, len(zKey)-keyOff-txIDLen)
1,247✔
213
                copy(key, zKey[keyOff:])
1,247✔
214

1,247✔
215
                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
1,247✔
216

1,247✔
217
                e, err := d.getAtTx(ctx, key, atTx, 1, kvsnap, 0, true)
1,247✔
218
                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, io.EOF) {
1,247✔
219
                        continue // ignore deleted or truncated ones (referenced key may have been deleted or truncated)
×
220
                }
221
                if err != nil {
1,248✔
222
                        return nil, err
1✔
223
                }
1✔
224

225
                zentry := &schema.ZEntry{
1,246✔
226
                        Set:   req.Set,
1,246✔
227
                        Key:   key[1:],
1,246✔
228
                        Entry: e,
1,246✔
229
                        Score: score,
1,246✔
230
                        AtTx:  atTx,
1,246✔
231
                }
1,246✔
232

1,246✔
233
                entries.Entries = append(entries.Entries, zentry)
1,246✔
234

1,246✔
235
                if l == d.maxResultSize {
1,246✔
236
                        return entries, fmt.Errorf("%w: found at least %d entries (the maximum limit). "+
×
237
                                "Pagination over large results can be achieved by using the limit, seekKey, seekScore and seekAtTx arguments",
×
238
                                ErrResultSizeLimitReached, d.maxResultSize)
×
239
                }
×
240
        }
241

242
        return entries, nil
31✔
243
}
244

245
// VerifiableZAdd ...
246
func (d *db) VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error) {
13✔
247
        if req == nil {
14✔
248
                return nil, store.ErrIllegalArguments
1✔
249
        }
1✔
250

251
        lastTxID, _ := d.st.CommittedAlh()
12✔
252
        if lastTxID < req.ProveSinceTx {
12✔
253
                return nil, store.ErrIllegalArguments
×
254
        }
×
255

256
        lastTx, err := d.allocTx()
12✔
257
        if err != nil {
12✔
258
                return nil, err
×
259
        }
×
260
        defer d.releaseTx(lastTx)
12✔
261

12✔
262
        txMetatadata, err := d.ZAdd(ctx, req.ZAddRequest)
12✔
263
        if err != nil {
14✔
264
                return nil, err
2✔
265
        }
2✔
266

267
        err = d.st.ReadTx(uint64(txMetatadata.Id), false, lastTx)
10✔
268
        if err != nil {
10✔
269
                return nil, err
×
270
        }
×
271

272
        var prevTxHdr *store.TxHeader
10✔
273
        if req.ProveSinceTx == 0 {
12✔
274
                prevTxHdr = lastTx.Header()
2✔
275
        } else {
10✔
276
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
8✔
277
                if err != nil {
8✔
278
                        return nil, err
×
279
                }
×
280
        }
281

282
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
10✔
283
        if err != nil {
10✔
284
                return nil, err
×
285
        }
×
286

287
        return &schema.VerifiableTx{
10✔
288
                Tx:        schema.TxToProto(lastTx),
10✔
289
                DualProof: schema.DualProofToProto(dualProof),
10✔
290
        }, nil
10✔
291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc