• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9195153262

22 May 2024 04:38PM UTC coverage: 89.479% (-0.04%) from 89.52%
9195153262

push

gh-ci

ostafen
Update version in Makefile

34749 of 38835 relevant lines covered (89.48%)

162213.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.82
/pkg/database/sorted_set.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "encoding/binary"
22
        "errors"
23
        "fmt"
24
        "io"
25
        "math"
26

27
        "github.com/codenotary/immudb/embedded/store"
28
        "github.com/codenotary/immudb/pkg/api/schema"
29
)
30

31
const setLenLen = 8
32
const scoreLen = 8
33
const keyLenLen = 8
34
const txIDLen = 8
35

36
// ZAdd adds a score for an existing key in a sorted set
37
// As a parameter of ZAddOptions is possible to provide the associated index of the provided key. In this way, when resolving reference, the specified version of the key will be returned.
38
// If the index is not provided the resolution will use only the key and last version of the item will be returned
39
// If ZAddOptions.index is provided key is optional
40
func (d *db) ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error) {
157✔
41
        if req == nil || len(req.Set) == 0 || len(req.Key) == 0 {
160✔
42
                return nil, store.ErrIllegalArguments
3✔
43
        }
3✔
44

45
        if (req.AtTx == 0 && req.BoundRef) || (req.AtTx > 0 && !req.BoundRef) {
155✔
46
                return nil, store.ErrIllegalArguments
1✔
47
        }
1✔
48

49
        d.mutex.Lock()
153✔
50
        defer d.mutex.Unlock()
153✔
51

153✔
52
        if d.isReplica() {
154✔
53
                return nil, ErrIsReplica
1✔
54
        }
1✔
55

56
        lastTxID, _ := d.st.CommittedAlh()
152✔
57
        err := d.st.WaitForIndexingUpto(ctx, lastTxID)
152✔
58
        if err != nil {
152✔
59
                return nil, err
×
60
        }
×
61

62
        // check referenced key exists and it's not a reference
63
        key := EncodeKey(req.Key)
152✔
64

152✔
65
        refEntry, err := d.getAtTx(ctx, key, req.AtTx, 0, d.st, 0, true)
152✔
66
        if err != nil {
153✔
67
                return nil, err
1✔
68
        }
1✔
69
        if refEntry.ReferencedBy != nil {
152✔
70
                return nil, ErrReferencedKeyCannotBeAReference
1✔
71
        }
1✔
72

73
        tx, err := d.st.NewWriteOnlyTx(ctx)
150✔
74
        if err != nil {
150✔
75
                return nil, err
×
76
        }
×
77
        defer tx.Cancel()
150✔
78

150✔
79
        e := EncodeZAdd(req.Set, req.Score, key, req.AtTx)
150✔
80

150✔
81
        err = tx.Set(e.Key, e.Metadata, e.Value)
150✔
82
        if err != nil {
150✔
83
                return nil, err
×
84
        }
×
85

86
        var hdr *store.TxHeader
150✔
87

150✔
88
        if req.NoWait {
150✔
89
                hdr, err = tx.AsyncCommit(ctx)
×
90
        } else {
150✔
91
                hdr, err = tx.Commit(ctx)
150✔
92
        }
150✔
93
        if err != nil {
150✔
94
                return nil, err
×
95
        }
×
96

97
        return schema.TxHeaderToProto(hdr), nil
150✔
98
}
99

100
// ZScan ...
101
func (d *db) ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error) {
35✔
102
        if req == nil || len(req.Set) == 0 {
36✔
103
                return nil, store.ErrIllegalArguments
1✔
104
        }
1✔
105

106
        if req.Limit > uint64(d.maxResultSize) {
35✔
107
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
108
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
109
        }
1✔
110

111
        limit := int(req.Limit)
33✔
112
        if req.Limit == 0 {
60✔
113
                limit = d.maxResultSize
27✔
114
        }
27✔
115

116
        d.mutex.RLock()
33✔
117
        defer d.mutex.RUnlock()
33✔
118

33✔
119
        currTxID, _ := d.st.CommittedAlh()
33✔
120

33✔
121
        if req.SinceTx > currTxID {
33✔
122
                return nil, ErrIllegalArguments
×
123
        }
×
124

125
        prefix := make([]byte, 1+setLenLen+len(req.Set))
33✔
126
        prefix[0] = SortedSetKeyPrefix
33✔
127
        binary.BigEndian.PutUint64(prefix[1:], uint64(len(req.Set)))
33✔
128
        copy(prefix[1+setLenLen:], req.Set)
33✔
129

33✔
130
        var seekKey []byte
33✔
131

33✔
132
        if len(req.SeekKey) == 0 {
62✔
133
                seekKey = make([]byte, len(prefix)+scoreLen)
29✔
134
                copy(seekKey, prefix)
29✔
135
                // here we compose the offset if Min score filter is provided only if is not reversed order
29✔
136
                if req.MinScore != nil && !req.Desc {
31✔
137
                        binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(req.MinScore.Score))
2✔
138
                }
2✔
139
                // here we compose the offset if Max score filter is provided only if is reversed order
140
                if req.Desc {
30✔
141
                        var maxScore float64
1✔
142

1✔
143
                        if req.MaxScore == nil {
1✔
144
                                maxScore = math.MaxFloat64
×
145
                        } else {
1✔
146
                                maxScore = req.MaxScore.Score
1✔
147
                        }
1✔
148

149
                        binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(maxScore))
1✔
150
                }
151
        } else {
4✔
152
                seekKey = make([]byte, len(prefix)+scoreLen+keyLenLen+1+len(req.SeekKey)+txIDLen)
4✔
153
                copy(seekKey, prefix)
4✔
154
                binary.BigEndian.PutUint64(seekKey[len(prefix):], math.Float64bits(req.SeekScore))
4✔
155
                binary.BigEndian.PutUint64(seekKey[len(prefix)+scoreLen:], uint64(1+len(req.SeekKey)))
4✔
156
                copy(seekKey[len(prefix)+scoreLen+keyLenLen:], EncodeKey(req.SeekKey))
4✔
157
                binary.BigEndian.PutUint64(seekKey[len(prefix)+scoreLen+keyLenLen+1+len(req.SeekKey):], req.SeekAtTx)
4✔
158
        }
4✔
159

160
        zsnap, err := d.snapshotSince(ctx, []byte{SortedSetKeyPrefix}, req.SinceTx)
33✔
161
        if err != nil {
33✔
162
                return nil, err
×
163
        }
×
164
        defer zsnap.Close()
33✔
165

33✔
166
        r, err := zsnap.NewKeyReader(
33✔
167
                store.KeyReaderSpec{
33✔
168
                        SeekKey:       seekKey,
33✔
169
                        Prefix:        prefix,
33✔
170
                        InclusiveSeek: req.InclusiveSeek,
33✔
171
                        DescOrder:     req.Desc,
33✔
172
                        Filters:       []store.FilterFn{store.IgnoreExpired, store.IgnoreDeleted},
33✔
173
                        Offset:        req.Offset,
33✔
174
                })
33✔
175
        if err != nil {
33✔
176
                return nil, err
×
177
        }
×
178
        defer r.Close()
33✔
179

33✔
180
        kvsnap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
33✔
181
        if err != nil {
33✔
182
                return nil, err
×
183
        }
×
184
        defer kvsnap.Close()
33✔
185

33✔
186
        entries := &schema.ZEntries{}
33✔
187
        for l := 1; l <= limit; l++ {
1,307✔
188
                zKey, _, err := r.Read(ctx)
1,274✔
189
                if errors.Is(err, store.ErrNoMoreEntries) {
1,301✔
190
                        break
27✔
191
                }
192
                if err != nil {
1,247✔
193
                        return nil, err
×
194
                }
×
195

196
                // zKey = [1+setLenLen+len(req.Set)+scoreLen+keyLenLen+1+len(req.Key)+txIDLen]
197
                scoreOff := 1 + setLenLen + len(req.Set)
1,247✔
198
                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
1,247✔
199
                score := math.Float64frombits(scoreB)
1,247✔
200

1,247✔
201
                // Guard to ensure that score match the filter range if filter is provided
1,247✔
202
                if req.MinScore != nil && score < req.MinScore.Score {
1,247✔
203
                        continue
×
204
                }
205
                if req.MaxScore != nil && score > req.MaxScore.Score {
1,247✔
206
                        continue
×
207
                }
208

209
                keyOff := scoreOff + scoreLen + keyLenLen
1,247✔
210
                key := make([]byte, len(zKey)-keyOff-txIDLen)
1,247✔
211
                copy(key, zKey[keyOff:])
1,247✔
212

1,247✔
213
                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
1,247✔
214

1,247✔
215
                e, err := d.getAtTx(ctx, key, atTx, 1, kvsnap, 0, true)
1,247✔
216
                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, io.EOF) {
1,247✔
217
                        continue // ignore deleted or truncated ones (referenced key may have been deleted or truncated)
×
218
                }
219
                if err != nil {
1,248✔
220
                        return nil, err
1✔
221
                }
1✔
222

223
                zentry := &schema.ZEntry{
1,246✔
224
                        Set:   req.Set,
1,246✔
225
                        Key:   key[1:],
1,246✔
226
                        Entry: e,
1,246✔
227
                        Score: score,
1,246✔
228
                        AtTx:  atTx,
1,246✔
229
                }
1,246✔
230

1,246✔
231
                entries.Entries = append(entries.Entries, zentry)
1,246✔
232
        }
233

234
        return entries, nil
32✔
235
}
236

237
// VerifiableZAdd ...
238
func (d *db) VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error) {
13✔
239
        if req == nil {
14✔
240
                return nil, store.ErrIllegalArguments
1✔
241
        }
1✔
242

243
        lastTxID, _ := d.st.CommittedAlh()
12✔
244
        if lastTxID < req.ProveSinceTx {
12✔
245
                return nil, store.ErrIllegalArguments
×
246
        }
×
247

248
        lastTx, err := d.allocTx()
12✔
249
        if err != nil {
12✔
250
                return nil, err
×
251
        }
×
252
        defer d.releaseTx(lastTx)
12✔
253

12✔
254
        txMetatadata, err := d.ZAdd(ctx, req.ZAddRequest)
12✔
255
        if err != nil {
14✔
256
                return nil, err
2✔
257
        }
2✔
258

259
        err = d.st.ReadTx(uint64(txMetatadata.Id), false, lastTx)
10✔
260
        if err != nil {
10✔
261
                return nil, err
×
262
        }
×
263

264
        var prevTxHdr *store.TxHeader
10✔
265
        if req.ProveSinceTx == 0 {
12✔
266
                prevTxHdr = lastTx.Header()
2✔
267
        } else {
10✔
268
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
8✔
269
                if err != nil {
8✔
270
                        return nil, err
×
271
                }
×
272
        }
273

274
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
10✔
275
        if err != nil {
10✔
276
                return nil, err
×
277
        }
×
278

279
        return &schema.VerifiableTx{
10✔
280
                Tx:        schema.TxToProto(lastTx),
10✔
281
                DualProof: schema.DualProofToProto(dualProof),
10✔
282
        }, nil
10✔
283
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc