• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 12258730221

10 Dec 2024 02:47PM UTC coverage: 89.138% (-0.1%) from 89.266%
12258730221

Pull #2036

gh-ci

ostafen
chore(embedded/sql): Add support for core pg_catalog tables (pg_class, pg_namespace, pg_roles)

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
Pull Request #2036: chore(embedded/sql): Add support for core pg_catalog tables (pg_class…

101 of 183 new or added lines in 13 files covered. (55.19%)

1 existing line in 1 file now uncovered.

37586 of 42166 relevant lines covered (89.14%)

150871.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.04
/embedded/sql/sql_tx.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "os"
24
        "time"
25

26
        "github.com/codenotary/immudb/embedded/multierr"
27
        "github.com/codenotary/immudb/embedded/store"
28
)
29

30
// SQLTx (no-thread safe) represents an interactive or incremental transaction with support of RYOW
31
type SQLTx struct {
32
        engine *Engine
33

34
        opts *TxOptions
35

36
        tx        *store.OngoingTx
37
        tempFiles []*os.File
38

39
        catalog *Catalog // in-mem catalog
40

41
        mutatedCatalog bool // set when a DDL stmt was executed within the current tx
42

43
        updatedRows      int
44
        lastInsertedPKs  map[string]int64 // last inserted PK by table name
45
        firstInsertedPKs map[string]int64 // first inserted PK by table name
46

47
        txHeader *store.TxHeader // header is set once tx is committed
48

49
        onCommittedCallbacks []onCommittedCallback
50
}
51

52
type onCommittedCallback = func(sqlTx *SQLTx) error
53

54
func (sqlTx *SQLTx) Catalog() *Catalog {
304✔
55
        return sqlTx.catalog
304✔
56
}
304✔
57

58
func (sqlTx *SQLTx) IsExplicitCloseRequired() bool {
5,286✔
59
        return sqlTx.opts.ExplicitClose
5,286✔
60
}
5,286✔
61

62
func (sqlTx *SQLTx) RequireExplicitClose() error {
60✔
63
        if sqlTx.updatedRows != 0 {
61✔
64
                return store.ErrIllegalState
1✔
65
        }
1✔
66

67
        sqlTx.opts.ExplicitClose = true
59✔
68

59✔
69
        return nil
59✔
70
}
71

72
func (sqlTx *SQLTx) Timestamp() time.Time {
165✔
73
        return sqlTx.tx.Timestamp()
165✔
74
}
165✔
75

76
func (sqlTx *SQLTx) UpdatedRows() int {
226✔
77
        return sqlTx.updatedRows
226✔
78
}
226✔
79

80
func (sqlTx *SQLTx) LastInsertedPKs() map[string]int64 {
423✔
81
        return sqlTx.lastInsertedPKs
423✔
82
}
423✔
83

84
func (sqlTx *SQLTx) FirstInsertedPKs() map[string]int64 {
205✔
85
        return sqlTx.firstInsertedPKs
205✔
86
}
205✔
87

88
func (sqlTx *SQLTx) TxHeader() *store.TxHeader {
344✔
89
        return sqlTx.txHeader
344✔
90
}
344✔
91

92
func (sqlTx *SQLTx) sqlPrefix() []byte {
6,982✔
93
        return sqlTx.engine.prefix
6,982✔
94
}
6,982✔
95

96
func (sqlTx *SQLTx) distinctLimit() int {
74✔
97
        return sqlTx.engine.distinctLimit
74✔
98
}
74✔
99

100
func (sqlTx *SQLTx) newKeyReader(rSpec store.KeyReaderSpec) (store.KeyReader, error) {
961✔
101
        return sqlTx.tx.NewKeyReader(rSpec)
961✔
102
}
961✔
103

104
func (sqlTx *SQLTx) get(ctx context.Context, key []byte) (store.ValueRef, error) {
2,311✔
105
        return sqlTx.tx.Get(ctx, key)
2,311✔
106
}
2,311✔
107

108
func (sqlTx *SQLTx) set(key []byte, metadata *store.KVMetadata, value []byte) error {
3,736✔
109
        return sqlTx.tx.Set(key, metadata, value)
3,736✔
110
}
3,736✔
111

112
func (sqlTx *SQLTx) setTransient(key []byte, metadata *store.KVMetadata, value []byte) error {
840✔
113
        return sqlTx.tx.SetTransient(key, metadata, value)
840✔
114
}
840✔
115

116
func (sqlTx *SQLTx) getWithPrefix(ctx context.Context, prefix, neq []byte) (key []byte, valRef store.ValueRef, err error) {
97✔
117
        return sqlTx.tx.GetWithPrefix(ctx, prefix, neq)
97✔
118
}
97✔
119

120
func (sqlTx *SQLTx) Cancel() error {
1,210✔
121
        defer sqlTx.removeTempFiles()
1,210✔
122

1,210✔
123
        return sqlTx.tx.Cancel()
1,210✔
124
}
1,210✔
125

126
func (sqlTx *SQLTx) Commit(ctx context.Context) error {
2,464✔
127
        defer sqlTx.removeTempFiles()
2,464✔
128

2,464✔
129
        err := sqlTx.tx.RequireMVCCOnFollowingTxs(sqlTx.mutatedCatalog)
2,464✔
130
        if err != nil {
2,464✔
131
                return err
×
132
        }
×
133

134
        // no need to wait for indexing to be up to date during commit phase
135
        sqlTx.txHeader, err = sqlTx.tx.AsyncCommit(ctx)
2,464✔
136
        if err != nil && !errors.Is(err, store.ErrNoEntriesProvided) {
2,474✔
137
                return err
10✔
138
        }
10✔
139

140
        merr := multierr.NewMultiErr()
2,454✔
141

2,454✔
142
        for _, onCommitCallback := range sqlTx.onCommittedCallbacks {
2,465✔
143
                err := onCommitCallback(sqlTx)
11✔
144
                merr.Append(err)
11✔
145
        }
11✔
146

147
        return merr.Reduce()
2,454✔
148
}
149

150
func (sqlTx *SQLTx) Closed() bool {
11,207✔
151
        return sqlTx.tx.Closed()
11,207✔
152
}
11,207✔
153

154
func (sqlTx *SQLTx) delete(ctx context.Context, key []byte) error {
50✔
155
        return sqlTx.tx.Delete(ctx, key)
50✔
156
}
50✔
157

158
func (sqlTx *SQLTx) addOnCommittedCallback(callback onCommittedCallback) error {
12✔
159
        if callback == nil {
12✔
160
                return ErrIllegalArguments
×
161
        }
×
162

163
        sqlTx.onCommittedCallbacks = append(sqlTx.onCommittedCallbacks, callback)
12✔
164

12✔
165
        return nil
12✔
166
}
167

168
func (sqlTx *SQLTx) createTempFile() (*os.File, error) {
30✔
169
        tempFile, err := os.CreateTemp("", "immudb")
30✔
170
        if err == nil {
60✔
171
                sqlTx.tempFiles = append(sqlTx.tempFiles, tempFile)
30✔
172
        }
30✔
173
        return tempFile, err
30✔
174
}
175

176
func (sqlTx *SQLTx) removeTempFiles() error {
3,674✔
177
        for _, file := range sqlTx.tempFiles {
3,704✔
178
                err := file.Close()
30✔
179
                if err != nil {
30✔
180
                        return err
×
181
                }
×
182

183
                err = os.Remove(file.Name())
30✔
184
                if err != nil {
30✔
185
                        return err
×
186
                }
×
187
        }
188
        return nil
3,674✔
189
}
190

NEW
191
func (sqlTx *SQLTx) ListUsers() ([]User, error) {
×
NEW
192
        if sqlTx.engine.multidbHandler == nil {
×
NEW
193
                return nil,
×
NEW
194
                        fmt.Errorf("list users requires multi db handler to be specified")
×
NEW
195
        }
×
NEW
196
        return sqlTx.engine.multidbHandler.ListUsers(sqlTx.tx.Context())
×
197
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc