• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

codenotary / immudb / 12240680310

09 Dec 2024 05:11PM UTC coverage: 89.28%. Remained the same
12240680310

push

gh-ci

ostafen
chore(embedded/sql): add support for LEFT JOIN

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

25 of 29 new or added lines in 1 file covered. (86.21%)

1 existing line in 1 file now uncovered.

37543 of 42051 relevant lines covered (89.28%)

151006.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/embedded/sql/joint_row_reader.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "fmt"
22

23
        "github.com/codenotary/immudb/embedded/multierr"
24
)
25

26
type jointRowReader struct {
27
        rowReader RowReader
28

29
        joins []*JoinSpec
30

31
        rowReaders                 []RowReader
32
        rowReadersValuesByPosition [][]TypedValue
33
        rowReadersValuesBySelector []map[string]TypedValue
34
}
35

36
func newJointRowReader(rowReader RowReader, joins []*JoinSpec) (*jointRowReader, error) {
23✔
37
        if rowReader == nil || len(joins) == 0 {
24✔
38
                return nil, ErrIllegalArguments
1✔
39
        }
1✔
40

41
        for _, jspec := range joins {
51✔
42
                switch jspec.joinType {
29✔
43
                case InnerJoin, LeftJoin:
27✔
44
                default:
2✔
45
                        return nil, ErrUnsupportedJoinType
2✔
46
                }
47
        }
48

49
        return &jointRowReader{
20✔
50
                rowReader:                  rowReader,
20✔
51
                joins:                      joins,
20✔
52
                rowReaders:                 []RowReader{rowReader},
20✔
53
                rowReadersValuesByPosition: make([][]TypedValue, 1+len(joins)),
20✔
54
                rowReadersValuesBySelector: make([]map[string]TypedValue, 1+len(joins)),
20✔
55
        }, nil
20✔
56
}
57

58
func (jointr *jointRowReader) onClose(callback func()) {
13✔
59
        jointr.rowReader.onClose(callback)
13✔
60
}
13✔
61

62
func (jointr *jointRowReader) Tx() *SQLTx {
1,008✔
63
        return jointr.rowReader.Tx()
1,008✔
64
}
1,008✔
65

66
func (jointr *jointRowReader) TableAlias() string {
1,902✔
67
        return jointr.rowReader.TableAlias()
1,902✔
68
}
1,902✔
69

70
func (jointr *jointRowReader) OrderBy() []ColDescriptor {
1✔
71
        return jointr.rowReader.OrderBy()
1✔
72
}
1✔
73

74
func (jointr *jointRowReader) ScanSpecs() *ScanSpecs {
1✔
75
        return jointr.rowReader.ScanSpecs()
1✔
76
}
1✔
77

78
func (jointr *jointRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
11✔
79
        return jointr.colsByPos(ctx)
11✔
80
}
11✔
81

82
func (jointr *jointRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
15✔
83
        colDescriptors, err := jointr.rowReader.colsBySelector(ctx)
15✔
84
        if err != nil {
17✔
85
                return nil, err
2✔
86
        }
2✔
87

88
        for _, jspec := range jointr.joins {
31✔
89

18✔
90
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
18✔
91
                //            on jointRowReader creation,
18✔
92
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
18✔
93
                //       in column list though
18✔
94
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
18✔
95
                if err != nil {
19✔
96
                        return nil, err
1✔
97
                }
1✔
98
                defer rr.Close()
17✔
99

17✔
100
                cd, err := rr.colsBySelector(ctx)
17✔
101
                if err != nil {
18✔
102
                        return nil, err
1✔
103
                }
1✔
104

105
                for sel, des := range cd {
60✔
106
                        if _, exists := colDescriptors[sel]; exists {
45✔
107
                                return nil, fmt.Errorf(
1✔
108
                                        "error resolving '%s' in a join: %w, "+
1✔
109
                                                "use aliasing to assign unique names "+
1✔
110
                                                "for all tables, sub-queries and columns",
1✔
111
                                        sel,
1✔
112
                                        ErrAmbiguousSelector,
1✔
113
                                )
1✔
114
                        }
1✔
115
                        colDescriptors[sel] = des
43✔
116
                }
117
        }
118
        return colDescriptors, nil
10✔
119
}
120

121
func (jointr *jointRowReader) colsByPos(ctx context.Context) ([]ColDescriptor, error) {
11✔
122
        colDescriptors, err := jointr.rowReader.Columns(ctx)
11✔
123
        if err != nil {
12✔
124
                return nil, err
1✔
125
        }
1✔
126

127
        for _, jspec := range jointr.joins {
24✔
128

14✔
129
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
14✔
130
                //            on jointRowReader creation,
14✔
131
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
14✔
132
                //       in column list though
14✔
133
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
14✔
134
                if err != nil {
14✔
135
                        return nil, err
×
136
                }
×
137
                defer rr.Close()
14✔
138

14✔
139
                cd, err := rr.Columns(ctx)
14✔
140
                if err != nil {
14✔
141
                        return nil, err
×
142
                }
×
143

144
                colDescriptors = append(colDescriptors, cd...)
14✔
145
        }
146

147
        return colDescriptors, nil
10✔
148
}
149

150
func (jointr *jointRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
3✔
151
        err := jointr.rowReader.InferParameters(ctx, params)
3✔
152
        if err != nil {
4✔
153
                return err
1✔
154
        }
1✔
155

156
        cols, err := jointr.colsBySelector(ctx)
2✔
157
        if err != nil {
3✔
158
                return err
1✔
159
        }
1✔
160

161
        for _, join := range jointr.joins {
2✔
162
                err = join.ds.inferParameters(ctx, jointr.Tx(), params)
1✔
163
                if err != nil {
1✔
164
                        return err
×
165
                }
×
166

167
                _, err = join.cond.inferType(cols, params, jointr.TableAlias())
1✔
168
                if err != nil {
1✔
169
                        return err
×
170
                }
×
171
        }
172

173
        return err
1✔
174
}
175

176
func (jointr *jointRowReader) Parameters() map[string]interface{} {
918✔
177
        return jointr.rowReader.Parameters()
918✔
178
}
918✔
179

180
func (jointr *jointRowReader) Read(ctx context.Context) (row *Row, err error) {
164✔
181
        for {
419✔
182
                row := &Row{
255✔
183
                        ValuesByPosition: make([]TypedValue, 0),
255✔
184
                        ValuesBySelector: make(map[string]TypedValue),
255✔
185
                }
255✔
186

255✔
187
                for len(jointr.rowReaders) > 0 {
687✔
188
                        lastReader := jointr.rowReaders[len(jointr.rowReaders)-1]
432✔
189

432✔
190
                        r, err := lastReader.Read(ctx)
432✔
191
                        if err == ErrNoMoreRows {
617✔
192
                                // previous reader will need to read next row
185✔
193
                                jointr.rowReaders = jointr.rowReaders[:len(jointr.rowReaders)-1]
185✔
194

185✔
195
                                err = lastReader.Close()
185✔
196
                                if err != nil {
185✔
197
                                        return nil, err
×
198
                                }
×
199

200
                                continue
185✔
201
                        }
202
                        if err != nil {
247✔
203
                                return nil, err
×
204
                        }
×
205

206
                        // override row data
207
                        jointr.rowReadersValuesByPosition[len(jointr.rowReaders)-1] = r.ValuesByPosition
247✔
208
                        jointr.rowReadersValuesBySelector[len(jointr.rowReaders)-1] = r.ValuesBySelector
247✔
209

247✔
210
                        break
247✔
211
                }
212

213
                if len(jointr.rowReaders) == 0 {
263✔
214
                        return nil, ErrNoMoreRows
8✔
215
                }
8✔
216

217
                // append values from readers
218
                for i := 0; i < len(jointr.rowReaders); i++ {
498✔
219
                        row.ValuesByPosition = append(row.ValuesByPosition, jointr.rowReadersValuesByPosition[i]...)
251✔
220

251✔
221
                        for c, v := range jointr.rowReadersValuesBySelector[i] {
811✔
222
                                row.ValuesBySelector[c] = v
560✔
223
                        }
560✔
224
                }
225

226
                unsolvedFK := false
247✔
227

247✔
228
                for i := len(jointr.rowReaders) - 1; i < len(jointr.joins); i++ {
523✔
229
                        jspec := jointr.joins[i]
276✔
230

276✔
231
                        jointq := &SelectStmt{
276✔
232
                                ds:      jspec.ds,
276✔
233
                                where:   jspec.cond.reduceSelectors(row, jointr.TableAlias()),
276✔
234
                                indexOn: jspec.indexOn,
276✔
235
                        }
276✔
236

276✔
237
                        reader, err := jointq.Resolve(ctx, jointr.Tx(), jointr.Parameters(), nil)
276✔
238
                        if err != nil {
277✔
239
                                return nil, err
1✔
240
                        }
1✔
241

242
                        r, err := reader.Read(ctx)
275✔
243
                        if err == ErrNoMoreRows {
367✔
244
                                if jspec.joinType == InnerJoin {
183✔
245
                                        // previous reader will need to read next row
91✔
246
                                        unsolvedFK = true
91✔
247

91✔
248
                                        err = reader.Close()
91✔
249
                                        if err != nil {
91✔
NEW
250
                                                return nil, err
×
NEW
251
                                        }
×
252

253
                                        break
91✔
254
                                } else { // LEFT JOIN: fill column values with NULLs
1✔
255
                                        cols, err := reader.Columns(ctx)
1✔
256
                                        if err != nil {
1✔
NEW
257
                                                return nil, err
×
NEW
258
                                        }
×
259

260
                                        r = &Row{
1✔
261
                                                ValuesByPosition: make([]TypedValue, len(cols)),
1✔
262
                                                ValuesBySelector: make(map[string]TypedValue, len(cols)),
1✔
263
                                        }
1✔
264

1✔
265
                                        for i, col := range cols {
4✔
266
                                                nullValue := NewNull(col.Type)
3✔
267

3✔
268
                                                r.ValuesByPosition[i] = nullValue
3✔
269
                                                r.ValuesBySelector[col.Selector()] = nullValue
3✔
270
                                        }
3✔
271
                                }
272
                        } else if err != nil {
183✔
UNCOV
273
                                reader.Close()
×
274
                                return nil, err
×
275
                        }
×
276

277
                        // progress with the joint readers
278
                        // append the reader and kept the values for following rows
279
                        jointr.rowReaders = append(jointr.rowReaders, reader)
184✔
280
                        jointr.rowReadersValuesByPosition[i+1] = r.ValuesByPosition
184✔
281
                        jointr.rowReadersValuesBySelector[i+1] = r.ValuesBySelector
184✔
282

184✔
283
                        row.ValuesByPosition = append(row.ValuesByPosition, r.ValuesByPosition...)
184✔
284

184✔
285
                        for c, v := range r.ValuesBySelector {
897✔
286
                                row.ValuesBySelector[c] = v
713✔
287
                        }
713✔
288
                }
289

290
                // all readers have a valid read
291
                if !unsolvedFK {
401✔
292
                        return row, nil
155✔
293
                }
155✔
294
        }
295
}
296

297
func (jointr *jointRowReader) Close() error {
14✔
298
        merr := multierr.NewMultiErr()
14✔
299

14✔
300
        // Closing joint readers backwards - the first reader executes the onClose callback
14✔
301
        // thus it must be closed at the end
14✔
302
        for i := len(jointr.rowReaders) - 1; i >= 0; i-- {
27✔
303
                err := jointr.rowReaders[i].Close()
13✔
304
                merr.Append(err)
13✔
305
        }
13✔
306

307
        return merr.Reduce()
14✔
308
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc