• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9367064922

04 Jun 2024 12:27PM UTC coverage: 89.43% (-0.02%) from 89.451%
9367064922

push

gh-ci

ostafen
Add support for JSON type

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

521 of 575 new or added lines in 14 files covered. (90.61%)

12 existing lines in 5 files now uncovered.

35172 of 39329 relevant lines covered (89.43%)

160547.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.94
/embedded/sql/file_sort.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "encoding/binary"
23
        "io"
24
        "os"
25
        "sort"
26
)
27

28
type sortedChunk struct {
29
        offset uint64
30
        size   uint64
31
}
32

33
type fileSorter struct {
34
        colPosBySelector map[string]int
35
        colTypes         []string
36
        cmp              func(r1, r2 *Row) (int, error)
37

38
        tx          *SQLTx
39
        sortBufSize int
40
        sortBuf     []*Row
41
        nextIdx     int
42

43
        tempFile     *os.File
44
        writer       *bufio.Writer
45
        tempFileSize uint64
46

47
        chunksToMerge []sortedChunk
48
}
49

50
func (s *fileSorter) update(r *Row) error {
8,877✔
51
        if s.nextIdx == s.sortBufSize {
9,612✔
52
                err := s.sortAndFlushBuffer()
735✔
53
                if err != nil {
735✔
54
                        return err
×
55
                }
×
56
                s.nextIdx = 0
735✔
57
        }
58

59
        s.sortBuf[s.nextIdx] = r
8,877✔
60
        s.nextIdx++
8,877✔
61

8,877✔
62
        return nil
8,877✔
63
}
64

65
func (s *fileSorter) finalize() (resultReader, error) {
29✔
66
        if s.nextIdx > 0 {
57✔
67
                if err := s.sortBuffer(); err != nil {
29✔
68
                        return nil, err
1✔
69
                }
1✔
70
        }
71

72
        // result rows are all in memory
73
        if len(s.chunksToMerge) == 0 {
49✔
74
                return &bufferResultReader{
21✔
75
                        sortBuf: s.sortBuf[:s.nextIdx],
21✔
76
                }, nil
21✔
77
        }
21✔
78

79
        err := s.flushBuffer()
7✔
80
        if err != nil {
7✔
81
                return nil, err
×
82
        }
×
83

84
        err = s.writer.Flush()
7✔
85
        if err != nil {
7✔
86
                return nil, err
×
87
        }
×
88
        return s.mergeAllChunks()
7✔
89
}
90

91
func (s *fileSorter) mergeAllChunks() (resultReader, error) {
7✔
92
        currFile := s.tempFile
7✔
93

7✔
94
        outFile, err := s.tx.createTempFile()
7✔
95
        if err != nil {
7✔
96
                return nil, err
×
97
        }
×
98

99
        lbuf := &bufio.Reader{}
7✔
100
        rbuf := &bufio.Reader{}
7✔
101

7✔
102
        lr := &fileRowReader{
7✔
103
                colPosBySelector: s.colPosBySelector,
7✔
104
                colTypes:         s.colTypes,
7✔
105
                reader:           lbuf,
7✔
106
                reuseRow:         true,
7✔
107
        }
7✔
108
        rr := &fileRowReader{
7✔
109
                colPosBySelector: s.colPosBySelector,
7✔
110
                colTypes:         s.colTypes,
7✔
111
                reader:           rbuf,
7✔
112
                reuseRow:         true,
7✔
113
        }
7✔
114

7✔
115
        chunks := s.chunksToMerge
7✔
116
        for len(chunks) > 1 {
56✔
117
                s.writer.Reset(outFile)
49✔
118

49✔
119
                var offset uint64
49✔
120

49✔
121
                newChunks := make([]sortedChunk, (len(chunks)+1)/2)
49✔
122
                for i := 0; i < len(chunks)/2; i++ {
784✔
123
                        c1 := chunks[i*2]
735✔
124
                        c2 := chunks[i*2+1]
735✔
125

735✔
126
                        lbuf.Reset(io.NewSectionReader(currFile, int64(c1.offset), int64(c1.size)))
735✔
127
                        rbuf.Reset(io.NewSectionReader(currFile, int64(c2.offset), int64(c2.size)))
735✔
128

735✔
129
                        err := s.mergeChunks(lr, rr, s.writer)
735✔
130
                        if err != nil {
735✔
131
                                return nil, err
×
132
                        }
×
133

134
                        newChunks[i] = sortedChunk{
735✔
135
                                offset: offset,
735✔
136
                                size:   c1.size + c2.size,
735✔
137
                        }
735✔
138
                        offset += c1.size + c2.size
735✔
139
                }
140

141
                err := s.writer.Flush()
49✔
142
                if err != nil {
49✔
143
                        return nil, err
×
144
                }
×
145

146
                if len(chunks)%2 != 0 { // copy last sorted chunk
70✔
147
                        lastChunk := chunks[len(chunks)-1]
21✔
148

21✔
149
                        _, err := io.Copy(outFile, io.NewSectionReader(currFile, int64(lastChunk.offset), int64(lastChunk.size)))
21✔
150
                        if err != nil {
21✔
151
                                return nil, err
×
152
                        }
×
153
                        newChunks[len(chunks)/2] = lastChunk
21✔
154
                }
155

156
                temp := currFile
49✔
157
                currFile = outFile
49✔
158
                outFile = temp
49✔
159

49✔
160
                _, err = outFile.Seek(0, io.SeekStart)
49✔
161
                if err != nil {
49✔
162
                        return nil, err
×
163
                }
×
164

165
                chunks = newChunks
49✔
166
        }
167

168
        return &fileRowReader{
7✔
169
                colTypes:         s.colTypes,
7✔
170
                colPosBySelector: s.colPosBySelector,
7✔
171
                reader:           bufio.NewReader(io.NewSectionReader(currFile, 0, int64(s.tempFileSize))),
7✔
172
        }, nil
7✔
173
}
174

175
func (s *fileSorter) mergeChunks(lr, rr *fileRowReader, writer io.Writer) error {
735✔
176
        var err error
735✔
177
        var lrAtEOF bool
735✔
178
        var r1, r2 *Row
735✔
179

735✔
180
        for {
33,808✔
181
                if r1 == nil {
52,213✔
182
                        r1, err = lr.Read()
19,140✔
183
                        if err == ErrNoMoreRows {
19,550✔
184
                                lrAtEOF = true
410✔
185
                                break
410✔
186
                        }
187

188
                        if err != nil {
18,730✔
189
                                return err
×
190
                        }
×
191
                }
192

193
                if r2 == nil {
47,331✔
194
                        r2, err = rr.Read()
14,668✔
195
                        if err == ErrNoMoreRows {
14,993✔
196
                                break
325✔
197
                        }
198

199
                        if err != nil {
14,343✔
200
                                return err
×
201
                        }
×
202
                }
203

204
                var rawData []byte
32,338✔
205
                res, err := s.cmp(r1, r2)
32,338✔
206
                if err != nil {
32,338✔
NEW
207
                        return err
×
NEW
208
                }
×
209

210
                if res < 0 {
50,743✔
211
                        rawData = lr.rowBuf.Bytes()
18,405✔
212
                        r1 = nil
18,405✔
213
                } else {
32,338✔
214
                        rawData = rr.rowBuf.Bytes()
13,933✔
215
                        r2 = nil
13,933✔
216
                }
13,933✔
217

218
                _, err = writer.Write(rawData)
32,338✔
219
                if err != nil {
32,338✔
220
                        return err
×
221
                }
×
222
        }
223

224
        readerToCopy := lr
735✔
225
        if lrAtEOF {
1,145✔
226
                readerToCopy = rr
410✔
227
        }
410✔
228

229
        _, err = writer.Write(readerToCopy.rowBuf.Bytes())
735✔
230
        if err != nil {
735✔
231
                return err
×
232
        }
×
233

234
        _, err = io.Copy(writer, readerToCopy.reader)
735✔
235
        return err
735✔
236
}
237

238
type resultReader interface {
239
        Read() (*Row, error)
240
}
241

242
type bufferResultReader struct {
243
        sortBuf []*Row
244
        nextIdx int
245
}
246

247
func (r *bufferResultReader) Read() (*Row, error) {
3,628✔
248
        if r.nextIdx == len(r.sortBuf) {
3,654✔
249
                return nil, ErrNoMoreRows
26✔
250
        }
26✔
251

252
        row := r.sortBuf[r.nextIdx]
3,602✔
253
        r.nextIdx++
3,602✔
254
        return row, nil
3,602✔
255
}
256

257
type fileRowReader struct {
258
        colPosBySelector map[string]int
259
        colTypes         []SQLValueType
260
        reader           io.Reader
261
        rowBuf           bytes.Buffer
262
        row              *Row
263
        reuseRow         bool
264
}
265

266
func (r *fileRowReader) readValues(out []TypedValue) error {
38,990✔
267
        var size uint16
38,990✔
268
        err := binary.Read(r.reader, binary.BigEndian, &size)
38,990✔
269
        if err != nil {
39,732✔
270
                return err
742✔
271
        }
742✔
272

273
        r.rowBuf.Reset()
38,248✔
274

38,248✔
275
        binary.Write(&r.rowBuf, binary.BigEndian, &size)
38,248✔
276

38,248✔
277
        _, err = io.CopyN(&r.rowBuf, r.reader, int64(size))
38,248✔
278
        if err != nil {
38,248✔
NEW
279
                return err
×
280
        }
×
281

282
        data := r.rowBuf.Bytes()
38,248✔
283
        return decodeValues(data[2:], r.colTypes, out)
38,248✔
284
}
285

286
func (r *fileRowReader) Read() (*Row, error) {
38,990✔
287
        row := r.getRow()
38,990✔
288

38,990✔
289
        err := r.readValues(row.ValuesByPosition)
38,990✔
290
        if err == io.EOF {
39,732✔
291
                return nil, ErrNoMoreRows
742✔
292
        }
742✔
293
        if err != nil {
38,248✔
294
                return nil, err
×
295
        }
×
296

297
        for sel, pos := range r.colPosBySelector {
217,972✔
298
                row.ValuesBySelector[sel] = row.ValuesByPosition[pos]
179,724✔
299
        }
179,724✔
300
        return row, nil
38,248✔
301
}
302

303
func (r *fileRowReader) getRow() *Row {
38,990✔
304
        row := r.row
38,990✔
305
        if row == nil || !r.reuseRow {
44,186✔
306
                row = &Row{
5,196✔
307
                        ValuesByPosition: make([]TypedValue, len(r.colPosBySelector)),
5,196✔
308
                        ValuesBySelector: make(map[string]TypedValue, len(r.colPosBySelector)),
5,196✔
309
                }
5,196✔
310
                r.row = row
5,196✔
311
        }
5,196✔
312
        return row
38,990✔
313
}
314

315
func decodeValues(data []byte, colTypes []SQLValueType, out []TypedValue) error {
38,248✔
316
        var voff int
38,248✔
317
        for i, col := range colTypes {
217,972✔
318
                v, n, err := DecodeNullableValue(data[voff:], col)
179,724✔
319
                if err != nil {
179,724✔
NEW
320
                        return err
×
321
                }
×
322
                voff += n
179,724✔
323

179,724✔
324
                out[i] = v
179,724✔
325
        }
326
        return nil
38,248✔
327
}
328

329
func (s *fileSorter) sortAndFlushBuffer() error {
735✔
330
        if err := s.sortBuffer(); err != nil {
735✔
NEW
331
                return err
×
NEW
332
        }
×
333
        return s.flushBuffer()
735✔
334
}
335

336
func (s *fileSorter) sortBuffer() error {
763✔
337
        buf := s.sortBuf[:s.nextIdx]
763✔
338

763✔
339
        var outErr error
763✔
340
        sort.Slice(buf, func(i, j int) bool {
42,153✔
341
                r1 := buf[i]
41,390✔
342
                r2 := buf[j]
41,390✔
343

41,390✔
344
                res, err := s.cmp(r1, r2)
41,390✔
345
                if err != nil {
41,595✔
346
                        outErr = err
205✔
347
                }
205✔
348
                return res < 0
41,390✔
349
        })
350
        return outErr
763✔
351
}
352

353
func (s *fileSorter) flushBuffer() error {
742✔
354
        writer, err := s.tempFileWriter()
742✔
355
        if err != nil {
742✔
356
                return err
×
357
        }
×
358

359
        var chunkSize uint64
742✔
360
        for _, row := range s.sortBuf[:s.nextIdx] {
5,917✔
361
                data, err := encodeRow(row)
5,175✔
362
                if err != nil {
5,175✔
363
                        return err
×
364
                }
×
365

366
                _, err = writer.Write(data)
5,175✔
367
                if err != nil {
5,175✔
368
                        return err
×
369
                }
×
370

371
                chunkSize += uint64(len(data))
5,175✔
372
        }
373

374
        s.chunksToMerge = append(s.chunksToMerge, sortedChunk{
742✔
375
                offset: s.tempFileSize,
742✔
376
                size:   chunkSize,
742✔
377
        })
742✔
378
        s.tempFileSize += chunkSize
742✔
379
        return nil
742✔
380
}
381

382
func (s *fileSorter) tempFileWriter() (*bufio.Writer, error) {
742✔
383
        if s.writer != nil {
1,477✔
384
                return s.writer, nil
735✔
385
        }
735✔
386
        file, err := s.tx.createTempFile()
7✔
387
        if err != nil {
7✔
388
                return nil, err
×
389
        }
×
390
        s.tempFile = file
7✔
391
        s.writer = bufio.NewWriter(file)
7✔
392
        return s.writer, nil
7✔
393
}
394

395
func encodeRow(r *Row) ([]byte, error) {
5,175✔
396
        var buf bytes.Buffer
5,175✔
397
        buf.Write([]byte{0, 0}) // make room for size field
5,175✔
398

5,175✔
399
        for _, v := range r.ValuesByPosition {
29,572✔
400
                rawValue, err := EncodeNullableValue(v, v.Type(), -1)
24,397✔
401
                if err != nil {
24,397✔
402
                        return nil, err
×
403
                }
×
404
                buf.Write(rawValue)
24,397✔
405
        }
406

407
        data := buf.Bytes()
5,175✔
408
        size := uint16(len(data) - 2)
5,175✔
409
        binary.BigEndian.PutUint16(data, size)
5,175✔
410

5,175✔
411
        return data, nil
5,175✔
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc