• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 12274925552

11 Dec 2024 10:57AM UTC coverage: 89.257% (-0.009%) from 89.266%
12274925552

push

gh-ci

ostafen
chore(embedded/sql): Add support for core pg_catalog tables (pg_class, pg_namespace, pg_roles)

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

160 of 184 new or added lines in 12 files covered. (86.96%)

7 existing lines in 4 files now uncovered.

37629 of 42158 relevant lines covered (89.26%)

151353.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/embedded/sql/sql_tx.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "errors"
22
        "os"
23
        "time"
24

25
        "github.com/codenotary/immudb/embedded/multierr"
26
        "github.com/codenotary/immudb/embedded/store"
27
)
28

29
// SQLTx (no-thread safe) represents an interactive or incremental transaction with support of RYOW
30
type SQLTx struct {
31
        engine *Engine
32

33
        opts *TxOptions
34

35
        tx        *store.OngoingTx
36
        tempFiles []*os.File
37

38
        catalog *Catalog // in-mem catalog
39

40
        mutatedCatalog bool // set when a DDL stmt was executed within the current tx
41

42
        updatedRows      int
43
        lastInsertedPKs  map[string]int64 // last inserted PK by table name
44
        firstInsertedPKs map[string]int64 // first inserted PK by table name
45

46
        txHeader *store.TxHeader // header is set once tx is committed
47

48
        onCommittedCallbacks []onCommittedCallback
49
}
50

51
type onCommittedCallback = func(sqlTx *SQLTx) error
52

53
func (sqlTx *SQLTx) Catalog() *Catalog {
304✔
54
        return sqlTx.catalog
304✔
55
}
304✔
56

57
func (sqlTx *SQLTx) IsExplicitCloseRequired() bool {
5,286✔
58
        return sqlTx.opts.ExplicitClose
5,286✔
59
}
5,286✔
60

61
func (sqlTx *SQLTx) RequireExplicitClose() error {
60✔
62
        if sqlTx.updatedRows != 0 {
61✔
63
                return store.ErrIllegalState
1✔
64
        }
1✔
65

66
        sqlTx.opts.ExplicitClose = true
59✔
67

59✔
68
        return nil
59✔
69
}
70

71
func (sqlTx *SQLTx) Timestamp() time.Time {
165✔
72
        return sqlTx.tx.Timestamp()
165✔
73
}
165✔
74

75
func (sqlTx *SQLTx) UpdatedRows() int {
226✔
76
        return sqlTx.updatedRows
226✔
77
}
226✔
78

79
func (sqlTx *SQLTx) LastInsertedPKs() map[string]int64 {
423✔
80
        return sqlTx.lastInsertedPKs
423✔
81
}
423✔
82

83
func (sqlTx *SQLTx) FirstInsertedPKs() map[string]int64 {
205✔
84
        return sqlTx.firstInsertedPKs
205✔
85
}
205✔
86

87
func (sqlTx *SQLTx) TxHeader() *store.TxHeader {
344✔
88
        return sqlTx.txHeader
344✔
89
}
344✔
90

91
func (sqlTx *SQLTx) sqlPrefix() []byte {
6,990✔
92
        return sqlTx.engine.prefix
6,990✔
93
}
6,990✔
94

95
func (sqlTx *SQLTx) distinctLimit() int {
74✔
96
        return sqlTx.engine.distinctLimit
74✔
97
}
74✔
98

99
func (sqlTx *SQLTx) newKeyReader(rSpec store.KeyReaderSpec) (store.KeyReader, error) {
961✔
100
        return sqlTx.tx.NewKeyReader(rSpec)
961✔
101
}
961✔
102

103
func (sqlTx *SQLTx) get(ctx context.Context, key []byte) (store.ValueRef, error) {
2,315✔
104
        return sqlTx.tx.Get(ctx, key)
2,315✔
105
}
2,315✔
106

107
func (sqlTx *SQLTx) set(key []byte, metadata *store.KVMetadata, value []byte) error {
3,740✔
108
        return sqlTx.tx.Set(key, metadata, value)
3,740✔
109
}
3,740✔
110

111
func (sqlTx *SQLTx) setTransient(key []byte, metadata *store.KVMetadata, value []byte) error {
840✔
112
        return sqlTx.tx.SetTransient(key, metadata, value)
840✔
113
}
840✔
114

115
func (sqlTx *SQLTx) getWithPrefix(ctx context.Context, prefix, neq []byte) (key []byte, valRef store.ValueRef, err error) {
97✔
116
        return sqlTx.tx.GetWithPrefix(ctx, prefix, neq)
97✔
117
}
97✔
118

119
func (sqlTx *SQLTx) Cancel() error {
1,212✔
120
        defer sqlTx.removeTempFiles()
1,212✔
121

1,212✔
122
        return sqlTx.tx.Cancel()
1,212✔
123
}
1,212✔
124

125
func (sqlTx *SQLTx) Commit(ctx context.Context) error {
2,464✔
126
        defer sqlTx.removeTempFiles()
2,464✔
127

2,464✔
128
        err := sqlTx.tx.RequireMVCCOnFollowingTxs(sqlTx.mutatedCatalog)
2,464✔
129
        if err != nil {
2,464✔
130
                return err
×
131
        }
×
132

133
        // no need to wait for indexing to be up to date during commit phase
134
        sqlTx.txHeader, err = sqlTx.tx.AsyncCommit(ctx)
2,464✔
135
        if err != nil && !errors.Is(err, store.ErrNoEntriesProvided) {
2,474✔
136
                return err
10✔
137
        }
10✔
138

139
        merr := multierr.NewMultiErr()
2,454✔
140

2,454✔
141
        for _, onCommitCallback := range sqlTx.onCommittedCallbacks {
2,465✔
142
                err := onCommitCallback(sqlTx)
11✔
143
                merr.Append(err)
11✔
144
        }
11✔
145

146
        return merr.Reduce()
2,454✔
147
}
148

149
func (sqlTx *SQLTx) Closed() bool {
11,207✔
150
        return sqlTx.tx.Closed()
11,207✔
151
}
11,207✔
152

153
func (sqlTx *SQLTx) delete(ctx context.Context, key []byte) error {
50✔
154
        return sqlTx.tx.Delete(ctx, key)
50✔
155
}
50✔
156

157
func (sqlTx *SQLTx) addOnCommittedCallback(callback onCommittedCallback) error {
12✔
158
        if callback == nil {
12✔
159
                return ErrIllegalArguments
×
160
        }
×
161

162
        sqlTx.onCommittedCallbacks = append(sqlTx.onCommittedCallbacks, callback)
12✔
163

12✔
164
        return nil
12✔
165
}
166

167
func (sqlTx *SQLTx) createTempFile() (*os.File, error) {
30✔
168
        tempFile, err := os.CreateTemp("", "immudb")
30✔
169
        if err == nil {
60✔
170
                sqlTx.tempFiles = append(sqlTx.tempFiles, tempFile)
30✔
171
        }
30✔
172
        return tempFile, err
30✔
173
}
174

175
func (sqlTx *SQLTx) removeTempFiles() error {
3,676✔
176
        for _, file := range sqlTx.tempFiles {
3,706✔
177
                err := file.Close()
30✔
178
                if err != nil {
30✔
179
                        return err
×
180
                }
×
181

182
                err = os.Remove(file.Name())
30✔
183
                if err != nil {
30✔
184
                        return err
×
185
                }
×
186
        }
187
        return nil
3,676✔
188
}
189

190
func (sqlTx *SQLTx) ListUsers(ctx context.Context) ([]User, error) {
7✔
191
        if sqlTx.engine.multidbHandler == nil {
7✔
NEW
192
                return nil, ErrUnspecifiedMultiDBHandler
×
NEW
193
        }
×
194
        return sqlTx.engine.multidbHandler.ListUsers(ctx)
7✔
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc