• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 12240404451

09 Dec 2024 04:54PM UTC coverage: 89.284% (+0.004%) from 89.28%
12240404451

Pull #2034

gh-ci

ostafen
chore(embedded/sql): add support for LEFT JOIN

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
Pull Request #2034: Feature/left join

25 of 29 new or added lines in 1 file covered. (86.21%)

1 existing line in 1 file now uncovered.

37545 of 42051 relevant lines covered (89.28%)

302309.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/embedded/sql/joint_row_reader.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package sql
18

19
import (
20
        "context"
21
        "fmt"
22

23
        "github.com/codenotary/immudb/embedded/multierr"
24
)
25

26
type jointRowReader struct {
27
        rowReader RowReader
28

29
        joins []*JoinSpec
30

31
        rowReaders                 []RowReader
32
        rowReadersValuesByPosition [][]TypedValue
33
        rowReadersValuesBySelector []map[string]TypedValue
34
}
35

36
func newJointRowReader(rowReader RowReader, joins []*JoinSpec) (*jointRowReader, error) {
46✔
37
        if rowReader == nil || len(joins) == 0 {
48✔
38
                return nil, ErrIllegalArguments
2✔
39
        }
2✔
40

41
        for _, jspec := range joins {
102✔
42
                switch jspec.joinType {
58✔
43
                case InnerJoin, LeftJoin:
54✔
44
                default:
4✔
45
                        return nil, ErrUnsupportedJoinType
4✔
46
                }
47
        }
48

49
        return &jointRowReader{
40✔
50
                rowReader:                  rowReader,
40✔
51
                joins:                      joins,
40✔
52
                rowReaders:                 []RowReader{rowReader},
40✔
53
                rowReadersValuesByPosition: make([][]TypedValue, 1+len(joins)),
40✔
54
                rowReadersValuesBySelector: make([]map[string]TypedValue, 1+len(joins)),
40✔
55
        }, nil
40✔
56
}
57

58
func (jointr *jointRowReader) onClose(callback func()) {
26✔
59
        jointr.rowReader.onClose(callback)
26✔
60
}
26✔
61

62
func (jointr *jointRowReader) Tx() *SQLTx {
2,016✔
63
        return jointr.rowReader.Tx()
2,016✔
64
}
2,016✔
65

66
func (jointr *jointRowReader) TableAlias() string {
3,804✔
67
        return jointr.rowReader.TableAlias()
3,804✔
68
}
3,804✔
69

70
func (jointr *jointRowReader) OrderBy() []ColDescriptor {
2✔
71
        return jointr.rowReader.OrderBy()
2✔
72
}
2✔
73

74
func (jointr *jointRowReader) ScanSpecs() *ScanSpecs {
2✔
75
        return jointr.rowReader.ScanSpecs()
2✔
76
}
2✔
77

78
func (jointr *jointRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) {
22✔
79
        return jointr.colsByPos(ctx)
22✔
80
}
22✔
81

82
func (jointr *jointRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) {
30✔
83
        colDescriptors, err := jointr.rowReader.colsBySelector(ctx)
30✔
84
        if err != nil {
34✔
85
                return nil, err
4✔
86
        }
4✔
87

88
        for _, jspec := range jointr.joins {
62✔
89

36✔
90
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
36✔
91
                //            on jointRowReader creation,
36✔
92
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
36✔
93
                //       in column list though
36✔
94
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
36✔
95
                if err != nil {
38✔
96
                        return nil, err
2✔
97
                }
2✔
98
                defer rr.Close()
34✔
99

34✔
100
                cd, err := rr.colsBySelector(ctx)
34✔
101
                if err != nil {
36✔
102
                        return nil, err
2✔
103
                }
2✔
104

105
                for sel, des := range cd {
120✔
106
                        if _, exists := colDescriptors[sel]; exists {
90✔
107
                                return nil, fmt.Errorf(
2✔
108
                                        "error resolving '%s' in a join: %w, "+
2✔
109
                                                "use aliasing to assign unique names "+
2✔
110
                                                "for all tables, sub-queries and columns",
2✔
111
                                        sel,
2✔
112
                                        ErrAmbiguousSelector,
2✔
113
                                )
2✔
114
                        }
2✔
115
                        colDescriptors[sel] = des
86✔
116
                }
117
        }
118
        return colDescriptors, nil
20✔
119
}
120

121
func (jointr *jointRowReader) colsByPos(ctx context.Context) ([]ColDescriptor, error) {
22✔
122
        colDescriptors, err := jointr.rowReader.Columns(ctx)
22✔
123
        if err != nil {
24✔
124
                return nil, err
2✔
125
        }
2✔
126

127
        for _, jspec := range jointr.joins {
48✔
128

28✔
129
                // TODO (byo) optimize this by getting selector list only or opening all joint readers
28✔
130
                //            on jointRowReader creation,
28✔
131
                // Note: We're using a dummy ScanSpec object that is only used during read, we're only interested
28✔
132
                //       in column list though
28✔
133
                rr, err := jspec.ds.Resolve(ctx, jointr.Tx(), nil, &ScanSpecs{Index: &Index{}})
28✔
134
                if err != nil {
28✔
135
                        return nil, err
×
136
                }
×
137
                defer rr.Close()
28✔
138

28✔
139
                cd, err := rr.Columns(ctx)
28✔
140
                if err != nil {
28✔
141
                        return nil, err
×
142
                }
×
143

144
                colDescriptors = append(colDescriptors, cd...)
28✔
145
        }
146

147
        return colDescriptors, nil
20✔
148
}
149

150
func (jointr *jointRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error {
6✔
151
        err := jointr.rowReader.InferParameters(ctx, params)
6✔
152
        if err != nil {
8✔
153
                return err
2✔
154
        }
2✔
155

156
        cols, err := jointr.colsBySelector(ctx)
4✔
157
        if err != nil {
6✔
158
                return err
2✔
159
        }
2✔
160

161
        for _, join := range jointr.joins {
4✔
162
                err = join.ds.inferParameters(ctx, jointr.Tx(), params)
2✔
163
                if err != nil {
2✔
164
                        return err
×
165
                }
×
166

167
                _, err = join.cond.inferType(cols, params, jointr.TableAlias())
2✔
168
                if err != nil {
2✔
169
                        return err
×
170
                }
×
171
        }
172

173
        return err
2✔
174
}
175

176
func (jointr *jointRowReader) Parameters() map[string]interface{} {
1,836✔
177
        return jointr.rowReader.Parameters()
1,836✔
178
}
1,836✔
179

180
func (jointr *jointRowReader) Read(ctx context.Context) (row *Row, err error) {
328✔
181
        for {
838✔
182
                row := &Row{
510✔
183
                        ValuesByPosition: make([]TypedValue, 0),
510✔
184
                        ValuesBySelector: make(map[string]TypedValue),
510✔
185
                }
510✔
186

510✔
187
                for len(jointr.rowReaders) > 0 {
1,374✔
188
                        lastReader := jointr.rowReaders[len(jointr.rowReaders)-1]
864✔
189

864✔
190
                        r, err := lastReader.Read(ctx)
864✔
191
                        if err == ErrNoMoreRows {
1,234✔
192
                                // previous reader will need to read next row
370✔
193
                                jointr.rowReaders = jointr.rowReaders[:len(jointr.rowReaders)-1]
370✔
194

370✔
195
                                err = lastReader.Close()
370✔
196
                                if err != nil {
370✔
197
                                        return nil, err
×
198
                                }
×
199

200
                                continue
370✔
201
                        }
202
                        if err != nil {
494✔
203
                                return nil, err
×
204
                        }
×
205

206
                        // override row data
207
                        jointr.rowReadersValuesByPosition[len(jointr.rowReaders)-1] = r.ValuesByPosition
494✔
208
                        jointr.rowReadersValuesBySelector[len(jointr.rowReaders)-1] = r.ValuesBySelector
494✔
209

494✔
210
                        break
494✔
211
                }
212

213
                if len(jointr.rowReaders) == 0 {
526✔
214
                        return nil, ErrNoMoreRows
16✔
215
                }
16✔
216

217
                // append values from readers
218
                for i := 0; i < len(jointr.rowReaders); i++ {
996✔
219
                        row.ValuesByPosition = append(row.ValuesByPosition, jointr.rowReadersValuesByPosition[i]...)
502✔
220

502✔
221
                        for c, v := range jointr.rowReadersValuesBySelector[i] {
1,622✔
222
                                row.ValuesBySelector[c] = v
1,120✔
223
                        }
1,120✔
224
                }
225

226
                unsolvedFK := false
494✔
227

494✔
228
                for i := len(jointr.rowReaders) - 1; i < len(jointr.joins); i++ {
1,046✔
229
                        jspec := jointr.joins[i]
552✔
230

552✔
231
                        jointq := &SelectStmt{
552✔
232
                                ds:      jspec.ds,
552✔
233
                                where:   jspec.cond.reduceSelectors(row, jointr.TableAlias()),
552✔
234
                                indexOn: jspec.indexOn,
552✔
235
                        }
552✔
236

552✔
237
                        reader, err := jointq.Resolve(ctx, jointr.Tx(), jointr.Parameters(), nil)
552✔
238
                        if err != nil {
554✔
239
                                return nil, err
2✔
240
                        }
2✔
241

242
                        r, err := reader.Read(ctx)
550✔
243
                        if err == ErrNoMoreRows {
734✔
244
                                if jspec.joinType == InnerJoin {
366✔
245
                                        // previous reader will need to read next row
182✔
246
                                        unsolvedFK = true
182✔
247

182✔
248
                                        err = reader.Close()
182✔
249
                                        if err != nil {
182✔
NEW
250
                                                return nil, err
×
NEW
251
                                        }
×
252

253
                                        break
182✔
254
                                } else { // LEFT JOIN: fill column values with NULLs
2✔
255
                                        cols, err := reader.Columns(ctx)
2✔
256
                                        if err != nil {
2✔
NEW
257
                                                return nil, err
×
NEW
258
                                        }
×
259

260
                                        r = &Row{
2✔
261
                                                ValuesByPosition: make([]TypedValue, len(cols)),
2✔
262
                                                ValuesBySelector: make(map[string]TypedValue, len(cols)),
2✔
263
                                        }
2✔
264

2✔
265
                                        for i, col := range cols {
8✔
266
                                                nullValue := NewNull(col.Type)
6✔
267

6✔
268
                                                r.ValuesByPosition[i] = nullValue
6✔
269
                                                r.ValuesBySelector[col.Selector()] = nullValue
6✔
270
                                        }
6✔
271
                                }
272
                        } else if err != nil {
366✔
UNCOV
273
                                reader.Close()
×
274
                                return nil, err
×
275
                        }
×
276

277
                        // progress with the joint readers
278
                        // append the reader and kept the values for following rows
279
                        jointr.rowReaders = append(jointr.rowReaders, reader)
368✔
280
                        jointr.rowReadersValuesByPosition[i+1] = r.ValuesByPosition
368✔
281
                        jointr.rowReadersValuesBySelector[i+1] = r.ValuesBySelector
368✔
282

368✔
283
                        row.ValuesByPosition = append(row.ValuesByPosition, r.ValuesByPosition...)
368✔
284

368✔
285
                        for c, v := range r.ValuesBySelector {
1,794✔
286
                                row.ValuesBySelector[c] = v
1,426✔
287
                        }
1,426✔
288
                }
289

290
                // all readers have a valid read
291
                if !unsolvedFK {
802✔
292
                        return row, nil
310✔
293
                }
310✔
294
        }
295
}
296

297
func (jointr *jointRowReader) Close() error {
28✔
298
        merr := multierr.NewMultiErr()
28✔
299

28✔
300
        // Closing joint readers backwards - the first reader executes the onClose callback
28✔
301
        // thus it must be closed at the end
28✔
302
        for i := len(jointr.rowReaders) - 1; i >= 0; i-- {
54✔
303
                err := jointr.rowReaders[i].Close()
26✔
304
                merr.Append(err)
26✔
305
        }
26✔
306

307
        return merr.Reduce()
28✔
308
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc