• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9299226861

30 May 2024 08:12AM UTC coverage: 89.451% (-0.04%) from 89.49%
9299226861

push

gh-ci

ostafen
Log request information as transaction metadata

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

120 of 152 new or added lines in 16 files covered. (78.95%)

6 existing lines in 2 files now uncovered.

34859 of 38970 relevant lines covered (89.45%)

161745.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.46
/pkg/database/sql.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "bytes"
21
        "context"
22
        "fmt"
23
        "strings"
24

25
        "github.com/codenotary/immudb/embedded/sql"
26
        "github.com/codenotary/immudb/embedded/store"
27
        "github.com/codenotary/immudb/pkg/api/schema"
28
)
29

30
func (d *db) VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error) {
60✔
31
        if req == nil || req.SqlGetRequest == nil {
61✔
32
                return nil, ErrIllegalArguments
1✔
33
        }
1✔
34

35
        lastTxID, _ := d.st.CommittedAlh()
59✔
36
        if lastTxID < req.ProveSinceTx {
61✔
37
                return nil, ErrIllegalState
2✔
38
        }
2✔
39

40
        d.mutex.Lock()
57✔
41
        defer d.mutex.Unlock()
57✔
42

57✔
43
        sqlTx, err := d.sqlEngine.NewTx(ctx, sql.DefaultTxOptions().WithReadOnly(true))
57✔
44
        if err != nil {
57✔
45
                return nil, err
×
46
        }
×
47
        defer sqlTx.Cancel()
57✔
48

57✔
49
        table, err := sqlTx.Catalog().GetTableByName(req.SqlGetRequest.Table)
57✔
50
        if err != nil {
59✔
51
                return nil, err
2✔
52
        }
2✔
53

54
        valbuf := bytes.Buffer{}
55✔
55

55✔
56
        if len(req.SqlGetRequest.PkValues) != len(table.PrimaryIndex().Cols()) {
57✔
57
                return nil, fmt.Errorf(
2✔
58
                        "%w: incorrect number of primary key values, expected %d, got %d",
2✔
59
                        ErrIllegalArguments,
2✔
60
                        len(table.PrimaryIndex().Cols()),
2✔
61
                        len(req.SqlGetRequest.PkValues),
2✔
62
                )
2✔
63
        }
2✔
64

65
        for i, pkCol := range table.PrimaryIndex().Cols() {
106✔
66
                pkEncVal, _, err := sql.EncodeRawValueAsKey(schema.RawValue(req.SqlGetRequest.PkValues[i]), pkCol.Type(), pkCol.MaxLen())
53✔
67
                if err != nil {
54✔
68
                        return nil, err
1✔
69
                }
1✔
70

71
                _, err = valbuf.Write(pkEncVal)
52✔
72
                if err != nil {
52✔
73
                        return nil, err
×
74
                }
×
75
        }
76

77
        // build the encoded key for the pk
78
        pkKey := sql.MapKey(
52✔
79
                []byte{SQLPrefix},
52✔
80
                sql.MappedPrefix,
52✔
81
                sql.EncodeID(table.ID()),
52✔
82
                sql.EncodeID(sql.PKIndexID),
52✔
83
                valbuf.Bytes(),
52✔
84
                valbuf.Bytes())
52✔
85

52✔
86
        e, err := d.sqlGetAt(ctx, pkKey, req.SqlGetRequest.AtTx, d.st, true)
52✔
87
        if err != nil {
54✔
88
                return nil, err
2✔
89
        }
2✔
90

91
        tx, err := d.allocTx()
50✔
92
        if err != nil {
50✔
93
                return nil, err
×
94
        }
×
95
        defer d.releaseTx(tx)
50✔
96

50✔
97
        // key-value inclusion proof
50✔
98
        err = d.st.ReadTx(e.Tx, false, tx)
50✔
99
        if err != nil {
50✔
100
                return nil, err
×
101
        }
×
102

103
        sourceKey := sql.MapKey(
50✔
104
                []byte{SQLPrefix},
50✔
105
                sql.RowPrefix,
50✔
106
                sql.EncodeID(1), // fixed database identifier
50✔
107
                sql.EncodeID(table.ID()),
50✔
108
                sql.EncodeID(sql.PKIndexID),
50✔
109
                valbuf.Bytes())
50✔
110

50✔
111
        inclusionProof, err := tx.Proof(sourceKey)
50✔
112
        if err != nil {
50✔
113
                return nil, err
×
114
        }
×
115

116
        var rootTxHdr *store.TxHeader
50✔
117

50✔
118
        if req.ProveSinceTx == 0 {
54✔
119
                rootTxHdr = tx.Header()
4✔
120
        } else {
50✔
121
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
46✔
122
                if err != nil {
46✔
123
                        return nil, err
×
124
                }
×
125
        }
126

127
        var sourceTxHdr, targetTxHdr *store.TxHeader
50✔
128

50✔
129
        if req.ProveSinceTx <= e.Tx {
91✔
130
                sourceTxHdr = rootTxHdr
41✔
131
                targetTxHdr = tx.Header()
41✔
132
        } else {
50✔
133
                sourceTxHdr = tx.Header()
9✔
134
                targetTxHdr = rootTxHdr
9✔
135
        }
9✔
136

137
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
50✔
138
        if err != nil {
50✔
139
                return nil, err
×
140
        }
×
141

142
        verifiableTx := &schema.VerifiableTx{
50✔
143
                Tx:        schema.TxToProto(tx),
50✔
144
                DualProof: schema.DualProofToProto(dualProof),
50✔
145
        }
50✔
146

50✔
147
        colNamesByID := make(map[uint32]string, len(table.Cols()))
50✔
148
        colIdsByName := make(map[string]uint32, len(table.ColsByName()))
50✔
149
        colTypesByID := make(map[uint32]string, len(table.Cols()))
50✔
150
        colLenByID := make(map[uint32]int32, len(table.Cols()))
50✔
151

50✔
152
        for _, col := range table.Cols() {
201✔
153
                colNamesByID[col.ID()] = col.Name()
151✔
154
                colIdsByName[sql.EncodeSelector("", table.Name(), col.Name())] = col.ID()
151✔
155
                colTypesByID[col.ID()] = col.Type()
151✔
156
                colLenByID[col.ID()] = int32(col.MaxLen())
151✔
157
        }
151✔
158

159
        pkIDs := make([]uint32, len(table.PrimaryIndex().Cols()))
50✔
160

50✔
161
        for i, col := range table.PrimaryIndex().Cols() {
100✔
162
                pkIDs[i] = col.ID()
50✔
163
        }
50✔
164

165
        return &schema.VerifiableSQLEntry{
50✔
166
                SqlEntry:       e,
50✔
167
                VerifiableTx:   verifiableTx,
50✔
168
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
50✔
169
                DatabaseId:     1,
50✔
170
                TableId:        table.ID(),
50✔
171
                PKIDs:          pkIDs,
50✔
172
                ColNamesById:   colNamesByID,
50✔
173
                ColIdsByName:   colIdsByName,
50✔
174
                ColTypesById:   colTypesByID,
50✔
175
                ColLenById:     colLenByID,
50✔
176
                MaxColId:       table.GetMaxColID(),
50✔
177
        }, nil
50✔
178
}
179

180
func (d *db) sqlGetAt(ctx context.Context, key []byte, atTx uint64, index store.KeyIndex, skipIntegrityCheck bool) (entry *schema.SQLEntry, err error) {
52✔
181
        var valRef store.ValueRef
52✔
182

52✔
183
        if atTx == 0 {
103✔
184
                valRef, err = index.Get(ctx, key)
51✔
185
        } else {
52✔
186
                valRef, err = index.GetBetween(ctx, key, atTx, atTx)
1✔
187
        }
1✔
188
        if err != nil {
54✔
189
                return nil, err
2✔
190
        }
2✔
191

192
        val, err := valRef.Resolve()
50✔
193
        if err != nil {
50✔
194
                return nil, err
×
195
        }
×
196

197
        return &schema.SQLEntry{
50✔
198
                Tx:       valRef.Tx(),
50✔
199
                Key:      key,
50✔
200
                Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
50✔
201
                Value:    val,
50✔
202
        }, err
50✔
203
}
204

205
func (d *db) ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error) {
5✔
206
        d.mutex.RLock()
5✔
207
        defer d.mutex.RUnlock()
5✔
208

5✔
209
        catalog, err := d.sqlEngine.Catalog(ctx, tx)
5✔
210
        if err != nil {
5✔
211
                return nil, err
×
212
        }
×
213

214
        res := &schema.SQLQueryResult{Columns: []*schema.Column{{Name: "TABLE", Type: sql.VarcharType}}}
5✔
215

5✔
216
        for _, t := range catalog.GetTables() {
8✔
217
                res.Rows = append(res.Rows, &schema.Row{Values: []*schema.SQLValue{{Value: &schema.SQLValue_S{S: t.Name()}}}})
3✔
218
        }
3✔
219

220
        return res, nil
5✔
221
}
222

223
func (d *db) DescribeTable(ctx context.Context, tx *sql.SQLTx, tableName string) (*schema.SQLQueryResult, error) {
6✔
224
        d.mutex.RLock()
6✔
225
        defer d.mutex.RUnlock()
6✔
226

6✔
227
        catalog, err := d.sqlEngine.Catalog(ctx, tx)
6✔
228
        if err != nil {
6✔
229
                return nil, err
×
230
        }
×
231

232
        table, err := catalog.GetTableByName(tableName)
6✔
233
        if err != nil {
8✔
234
                return nil, err
2✔
235
        }
2✔
236

237
        res := &schema.SQLQueryResult{Columns: []*schema.Column{
4✔
238
                {Name: "COLUMN", Type: sql.VarcharType},
4✔
239
                {Name: "TYPE", Type: sql.VarcharType},
4✔
240
                {Name: "NULLABLE", Type: sql.BooleanType},
4✔
241
                {Name: "INDEX", Type: sql.VarcharType},
4✔
242
                {Name: "AUTO_INCREMENT", Type: sql.BooleanType},
4✔
243
                {Name: "UNIQUE", Type: sql.BooleanType},
4✔
244
        }}
4✔
245

4✔
246
        for _, c := range table.Cols() {
15✔
247
                index := "NO"
11✔
248

11✔
249
                indexed, err := table.IsIndexed(c.Name())
11✔
250
                if err != nil {
11✔
251
                        return nil, err
×
252
                }
×
253
                if indexed {
15✔
254
                        index = "YES"
4✔
255
                }
4✔
256

257
                if table.PrimaryIndex().IncludesCol(c.ID()) {
15✔
258
                        index = "PRIMARY KEY"
4✔
259
                }
4✔
260

261
                var unique bool
11✔
262
                for _, index := range table.GetIndexesByColID(c.ID()) {
15✔
263
                        if index.IsUnique() && len(index.Cols()) == 1 {
8✔
264
                                unique = true
4✔
265
                                break
4✔
266
                        }
267
                }
268

269
                var maxLen string
11✔
270

11✔
271
                if c.MaxLen() > 0 && (c.Type() == sql.VarcharType || c.Type() == sql.BLOBType) {
11✔
272
                        maxLen = fmt.Sprintf("(%d)", c.MaxLen())
×
273
                }
×
274

275
                res.Rows = append(res.Rows, &schema.Row{
11✔
276
                        Values: []*schema.SQLValue{
11✔
277
                                {Value: &schema.SQLValue_S{S: c.Name()}},
11✔
278
                                {Value: &schema.SQLValue_S{S: c.Type() + maxLen}},
11✔
279
                                {Value: &schema.SQLValue_B{B: c.IsNullable()}},
11✔
280
                                {Value: &schema.SQLValue_S{S: index}},
11✔
281
                                {Value: &schema.SQLValue_B{B: c.IsAutoIncremental()}},
11✔
282
                                {Value: &schema.SQLValue_B{B: unique}},
11✔
283
                        },
11✔
284
                })
11✔
285
        }
286

287
        return res, nil
4✔
288
}
289

290
func (d *db) NewSQLTx(ctx context.Context, opts *sql.TxOptions) (tx *sql.SQLTx, err error) {
283✔
291
        txCtx, txCancel := context.WithCancel(context.Background())
283✔
292

283✔
293
        txChan := make(chan *sql.SQLTx)
283✔
294
        errChan := make(chan error)
283✔
295

283✔
296
        defer func() {
566✔
297
                if err != nil {
283✔
298
                        txCancel()
×
299

×
300
                        if tx != nil {
×
301
                                tx.Cancel()
×
302
                        }
×
303
                }
304
        }()
305

306
        go func() {
566✔
307
                md := schema.MetadataFromContext(ctx)
283✔
308
                if len(md) > 0 {
336✔
309
                        data, err := md.Marshal()
53✔
310
                        if err != nil {
53✔
NEW
311
                                errChan <- err
×
NEW
312
                                return
×
NEW
313
                        }
×
314
                        opts = opts.WithExtra(data)
53✔
315
                }
316

317
                tx, err = d.sqlEngine.NewTx(txCtx, opts)
283✔
318
                if err != nil {
283✔
319
                        errChan <- err
×
320
                } else {
283✔
321
                        txChan <- tx
283✔
322
                }
283✔
323
        }()
324

325
        select {
283✔
326
        case <-ctx.Done():
×
327
                {
×
328
                        return nil, ctx.Err()
×
329
                }
×
330
        case tx = <-txChan:
283✔
331
                {
566✔
332
                        return tx, nil
283✔
333
                }
283✔
334
        case err = <-errChan:
×
335
                {
×
336
                        return nil, err
×
337
                }
×
338
        }
339
}
340

341
func (d *db) SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error) {
275✔
342
        if req == nil {
276✔
343
                return nil, nil, ErrIllegalArguments
1✔
344
        }
1✔
345

346
        stmts, err := sql.Parse(strings.NewReader(req.Sql))
274✔
347
        if err != nil {
276✔
348
                return nil, nil, err
2✔
349
        }
2✔
350

351
        params := make(map[string]interface{})
272✔
352

272✔
353
        for _, p := range req.Params {
367✔
354
                params[p.Name] = schema.RawValue(p.Value)
95✔
355
        }
95✔
356

357
        return d.SQLExecPrepared(ctx, tx, stmts, params)
272✔
358
}
359

360
func (d *db) SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error) {
307✔
361
        if len(stmts) == 0 {
308✔
362
                return nil, nil, ErrIllegalArguments
1✔
363
        }
1✔
364

365
        d.mutex.RLock()
306✔
366
        defer d.mutex.RUnlock()
306✔
367

306✔
368
        if d.isReplica() {
307✔
369
                return nil, nil, ErrIsReplica
1✔
370
        }
1✔
371

372
        return d.sqlEngine.ExecPreparedStmts(ctx, tx, stmts, params)
305✔
373
}
374

375
func (d *db) SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (sql.RowReader, error) {
72✔
376
        if req == nil {
74✔
377
                return nil, ErrIllegalArguments
2✔
378
        }
2✔
379

380
        stmts, err := sql.Parse(strings.NewReader(req.Sql))
70✔
381
        if err != nil {
72✔
382
                return nil, err
2✔
383
        }
2✔
384

385
        stmt, ok := stmts[0].(sql.DataSource)
68✔
386
        if !ok {
69✔
387
                return nil, sql.ErrExpectingDQLStmt
1✔
388
        }
1✔
389
        reader, err := d.SQLQueryPrepared(ctx, tx, stmt, schema.NamedParamsFromProto(req.Params))
67✔
390
        if !req.AcceptStream {
119✔
391
                reader = &limitRowReader{RowReader: reader, maxRows: d.maxResultSize}
52✔
392
        }
52✔
393
        return reader, err
67✔
394
}
395

396
func (d *db) SQLQueryAll(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) ([]*sql.Row, error) {
12✔
397
        reader, err := d.SQLQuery(ctx, tx, req)
12✔
398
        if err != nil {
16✔
399
                return nil, err
4✔
400
        }
4✔
401
        defer reader.Close()
8✔
402
        return sql.ReadAllRows(ctx, reader)
8✔
403
}
404

405
func (d *db) SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error) {
85✔
406
        if stmt == nil {
85✔
407
                return nil, ErrIllegalArguments
×
408
        }
×
409

410
        d.mutex.RLock()
85✔
411
        defer d.mutex.RUnlock()
85✔
412

85✔
413
        return d.sqlEngine.QueryPreparedStmt(ctx, tx, stmt, params)
85✔
414
}
415

416
func (d *db) InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error) {
1✔
417
        d.mutex.RLock()
1✔
418
        defer d.mutex.RUnlock()
1✔
419

1✔
420
        return d.sqlEngine.InferParameters(ctx, tx, sql)
1✔
421
}
1✔
422

423
func (d *db) InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error) {
8✔
424
        d.mutex.RLock()
8✔
425
        defer d.mutex.RUnlock()
8✔
426

8✔
427
        return d.sqlEngine.InferParametersPreparedStmts(ctx, tx, []sql.SQLStmt{stmt})
8✔
428
}
8✔
429

430
type limitRowReader struct {
431
        sql.RowReader
432
        nRead   int
433
        maxRows int
434
}
435

436
func (r *limitRowReader) Read(ctx context.Context) (*sql.Row, error) {
140✔
437
        row, err := r.RowReader.Read(ctx)
140✔
438
        if err != nil {
186✔
439
                return nil, err
46✔
440
        }
46✔
441

442
        if r.nRead == r.maxRows {
97✔
443
                return nil, fmt.Errorf("%w: found more than %d rows (the maximum limit). "+
3✔
444
                        "Query constraints can be applied using the LIMIT clause",
3✔
445
                        ErrResultSizeLimitReached, r.maxRows)
3✔
446
        }
3✔
447

448
        r.nRead++
91✔
449
        return row, nil
91✔
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc