• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geniusrabbit / eventstream / 3953294224

pending completion
3953294224

push

github

Dmitry Ponomarev
Configs fixes and improvements

147 of 219 new or added lines in 13 files covered. (67.12%)

20 existing lines in 1 file now uncovered.

718 of 1495 relevant lines covered (48.03%)

1.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.71
/storage/sql/query.go
1
//
2
// @project geniusrabbit::eventstream 2017, 2019 - 2023
3
// @author Dmitry Ponomarev <demdxx@gmail.com> 2017, 2019 - 2023
4
//
5

6
package sql
7

8
import (
9
        "errors"
10
        "reflect"
11
        "regexp"
12
        "strconv"
13
        "strings"
14

15
        "github.com/demdxx/gocast/v2"
16
        "github.com/geniusrabbit/eventstream"
17
)
18

19
var (
20
        paramsSearch = regexp.MustCompile(`\{\{([^}:]+)(?::([^}|]+))?(?:\|([^}]+))?\}\}`)
21
        paramParser  = regexp.MustCompile(`([^}:]+)(?::([^}|]+))?(?:\|([^}]+))?`)
22
)
23

24
var (
25
        errInvalidQueryFieldsValue = errors.New(`[stream::query] invalid fields build value`)
26
)
27

28
// Value item
29
type Value struct {
30
        Key       string // Field key in the object
31
        TargetKey string // Target Key in the database
32
        Type      eventstream.FieldType
33
        Length    int
34
        Format    string
35
}
36

37
// vector of options contains of: {key, typeName, [format]}
38
func valueFromArray(target string, a []string) Value {
2✔
39
        var (
2✔
40
                length   int64
2✔
41
                typeName string
2✔
42
        )
2✔
43

2✔
44
        if len(a) > 1 {
4✔
45
                vals := strings.Split(a[1], "*")
2✔
46
                if len(vals) == 2 {
4✔
47
                        typeName = vals[0]
2✔
48
                        length, _ = strconv.ParseInt(vals[1], 10, 64)
2✔
49
                } else {
4✔
50
                        typeName = a[1]
2✔
51
                }
2✔
52
        }
53

54
        if len(a) > 2 {
4✔
55
                return Value{
2✔
56
                        Key:       a[0],
2✔
57
                        TargetKey: target,
2✔
58
                        Type:      eventstream.TypeByString(typeName),
2✔
59
                        Length:    int(length),
2✔
60
                        Format:    a[2],
2✔
61
                }
2✔
62
        } else if len(a) > 1 {
6✔
63
                return Value{
2✔
64
                        Key:       a[0],
2✔
65
                        TargetKey: target,
2✔
66
                        Type:      eventstream.TypeByString(typeName),
2✔
67
                        Length:    int(length),
2✔
68
                }
2✔
69
        }
2✔
70
        return Value{Key: a[0], TargetKey: target}
×
71
}
72

73
// Query extractor
74
type Query struct {
75
        query     string
76
        iterateBy string
77
        values    []Value
78
}
79

80
// NewQuery returns query object from SQL query
81
func NewQuery(query string, opts ...QueryOption) (*Query, error) {
2✔
82
        var (
2✔
83
                err             error
2✔
84
                queryConfig     QueryConfig
2✔
85
                values          []Value
2✔
86
                fields, inserts []string
2✔
87
        )
2✔
88
        for _, opt := range opts {
4✔
89
                opt(&queryConfig)
2✔
90
        }
2✔
91
        query = strings.ReplaceAll(query, "{{target}}", queryConfig.Target)
2✔
92

2✔
93
        if isEmptyFields(queryConfig.FieldObject) {
4✔
94
                if args := paramsSearch.FindAllStringSubmatch(query, -1); len(args) > 0 {
4✔
95
                        query = paramsSearch.ReplaceAllString(query, "?")
2✔
96
                        for _, arg := range args {
4✔
97
                                values = append(values, valueFromArray(arg[1], arg[1:]))
2✔
98
                        }
2✔
99
                }
100
        } else if values, fields, inserts, err = PrepareFields(queryConfig.FieldObject); err != nil {
2✔
101
                return nil, err
×
102
        }
×
103

104
        return &Query{
2✔
105
                query: strings.NewReplacer(
2✔
106
                        "{{fields}}", strings.Join(fields, ", "),
2✔
107
                        "{{values}}", strings.Join(inserts, ", "),
2✔
108
                ).Replace(query),
2✔
109
                iterateBy: queryConfig.IterateBy,
2✔
110
                values:    values,
2✔
111
        }, nil
2✔
112
}
113

114
// QueryString - returns the SQL query string
115
func (q *Query) QueryString() string {
×
116
        return q.query
×
117
}
×
118

119
// ParamsBy by message
120
func (q *Query) ParamsBy(msg eventstream.Message) paramsResult {
2✔
121
        var (
2✔
122
                iterated      = q.iterateBy != ``
2✔
123
                iteratorSlice = gocast.Cast[[]any](msg.Item(q.iterateBy, nil))
2✔
124
                countSlice    = tMax(len(iteratorSlice), 1)
2✔
125
                params        = acquireResultParams()
2✔
126
        )
2✔
127
        for i := 0; i < countSlice; i++ {
4✔
128
                subParams := make([]any, 0, len(q.values))
2✔
129
                for _, v := range q.values {
4✔
130
                        var val any
2✔
131
                        if iterated && strings.HasPrefix(v.Key, "$iter") {
4✔
132
                                subItem := iteratorSlice
2✔
133
                                if strings.HasPrefix(v.Key, "$iter.") {
4✔
134
                                        subItem = gocast.Cast[[]any](msg.Item(v.Key[6:], nil))
2✔
135
                                }
2✔
136
                                val = v.Type.CastExt(arrValue(i, subItem), v.Length, v.Format)
2✔
137
                        } else {
2✔
138
                                val = msg.ItemCast(v.Key, v.Type, v.Length, v.Format)
2✔
139
                        }
2✔
140
                        subParams = append(subParams, val)
2✔
141
                }
142
                params = append(params, subParams)
2✔
143
        }
144
        return params
2✔
145
}
146

147
// Extract message by special fields and types
148
func (q *Query) Extract(msg eventstream.Message) []map[string]any {
2✔
149
        var (
2✔
150
                iterated      = q.iterateBy != ``
2✔
151
                iteratorSlice = gocast.Cast[[]any](msg.Item(q.iterateBy, nil))
2✔
152
                countSlice    = tMax(len(iteratorSlice), 1)
2✔
153
                resp          = make([]map[string]any, 0, countSlice)
2✔
154
        )
2✔
155
        for i := 0; i < countSlice; i++ {
4✔
156
                subResp := make(map[string]any, len(q.values))
2✔
157
                for _, v := range q.values {
4✔
158
                        var val any
2✔
159
                        if iterated && strings.HasPrefix(v.Key, "$iter") {
4✔
160
                                subItem := iteratorSlice
2✔
161
                                if strings.HasPrefix(v.Key, "$iter.") {
4✔
162
                                        subItem = gocast.Cast[[]any](msg.Item(v.Key[6:], nil))
2✔
163
                                }
2✔
164
                                val = v.Type.CastExt(arrValue(i, subItem), v.Length, v.Format)
2✔
165
                        } else {
2✔
166
                                val = msg.ItemCast(v.Key, v.Type, v.Length, v.Format)
2✔
167
                        }
2✔
168
                        subResp[v.TargetKey] = val
2✔
169
                }
170
                resp = append(resp, subResp)
2✔
171
        }
172
        return resp
2✔
173
}
174

175
///////////////////////////////////////////////////////////////////////////////
176
/// Helpers
177
///////////////////////////////////////////////////////////////////////////////
178

179
// PrepareFields matching
180
func PrepareFields(fls any) (values []Value, fields, inserts []string, err error) {
2✔
181
        switch fs := fls.(type) {
2✔
182
        case []string:
2✔
183
                values, fields, inserts = PrepareFieldsByArray(fs)
2✔
184
        case []any:
2✔
185
                if len(fs) == 1 {
4✔
186
                        switch reflect.TypeOf(fs[0]).Kind() {
2✔
187
                        case reflect.Map:
2✔
188
                                values, fields, inserts, err = MapIntoQueryParams(fs[0])
2✔
189
                        case reflect.Struct, reflect.Ptr:
×
190
                                values, fields, inserts, err = ConvertObjectIntoQueryParams(fs[0])
×
191
                        default:
×
192
                                values, fields, inserts = PrepareFieldsByArray(gocast.Slice[string](fs))
×
193
                        }
194
                } else {
×
195
                        values, fields, inserts = PrepareFieldsByArray(gocast.Slice[string](fs))
×
196
                }
×
197
        case string:
2✔
198
                values, fields, inserts = PrepareFieldsByString(fs)
2✔
199
        default:
2✔
200
                switch reflect.TypeOf(fs).Kind() {
2✔
201
                case reflect.Map:
2✔
202
                        values, fields, inserts, err = MapIntoQueryParams(fs)
2✔
203
                default:
2✔
204
                        values, fields, inserts, err = ConvertObjectIntoQueryParams(fs)
2✔
205
                }
206
        }
207
        if err == errInvalidValue || (err == nil && (len(inserts) < 1 || len(fields) > len(values))) {
2✔
208
                err = errInvalidQueryFieldsValue
×
209
        }
×
210
        return values, fields, inserts, err
2✔
211
}
212

213
// PrepareFieldsByArray matching and returns raw fields for insert
214
// Example: [service=srv:int, name:string]
215
// Result: [srv:int, name:string], [service,name], [?,?]
216
func PrepareFieldsByArray(fields []string) (values []Value, fieldNames, inserts []string) {
2✔
217
        for _, field := range fields {
4✔
218
                if len(field) == 0 {
2✔
219
                        continue
×
220
                }
221
                fieldValues, fieldValue, insertValue := prepareOneFieldByString(field)
2✔
222
                if len(fieldValues) != 0 {
4✔
223
                        values = append(values, fieldValues...)
2✔
224
                }
2✔
225
                if len(fieldValue) != 0 {
4✔
226
                        fieldNames = append(fieldNames, fieldValue)
2✔
227
                }
2✔
228
                if len(insertValue) != 0 {
4✔
229
                        inserts = append(inserts, insertValue)
2✔
230
                }
2✔
231
        }
232
        return values, fieldNames, inserts
2✔
233
}
234

235
// PrepareFieldsByString matching and returns raw fields for insert
236
func PrepareFieldsByString(fls string) (values []Value, fields, inserts []string) {
2✔
237
        return PrepareFieldsByArray(strings.Split(fls, ","))
2✔
238
}
2✔
239

240
func prepareOneFieldByString(field string) (values []Value, fieldValue, insert string) {
2✔
241
        insert = "?"
2✔
242
        switch {
2✔
243
        case strings.ContainsAny(field, "="):
2✔
244
                if field := strings.SplitN(field, "=", 2); field[1][0] == '@' {
4✔
245
                        fieldValue = field[0]
2✔
246
                        insert = field[1][1:]
2✔
247
                        if match := paramsSearch.FindAllStringSubmatch(insert, -1); len(match) > 0 {
4✔
248
                                for _, v := range match {
4✔
249
                                        values = append(values, valueFromArray(v[1], v[1:]))
2✔
250
                                        insert = strings.Replace(insert, v[0], "?", -1)
2✔
251
                                }
2✔
252
                        }
253
                } else {
2✔
254
                        fieldValue = field[0]
2✔
255
                        if !strings.ContainsAny(field[1], ":|") {
4✔
256
                                values = append(values, Value{Key: field[1], TargetKey: field[0]})
2✔
257
                        } else {
4✔
258
                                match := paramParser.FindAllStringSubmatch(field[1], -1)
2✔
259
                                values = append(values, valueFromArray(field[0], match[0][1:]))
2✔
260
                        }
2✔
261
                }
262
        case !strings.ContainsAny(field, ":|"):
2✔
263
                fieldValue = field
2✔
264
                values = append(values, Value{Key: field, TargetKey: field})
2✔
NEW
265
        default:
×
NEW
266
                match := paramParser.FindAllStringSubmatch(field, -1)
×
NEW
267
                fieldValue = match[0][1]
×
NEW
268
                values = append(values, valueFromArray(match[0][1], match[0][1:]))
×
269
        }
270
        return values, fieldValue, insert
2✔
271
}
272

273
func isEmptyFields(fls any) bool {
2✔
274
        switch fs := fls.(type) {
2✔
275
        case []string:
2✔
276
                return len(fs) == 0
2✔
277
        case []any:
2✔
278
                return len(fs) == 0
2✔
279
        case string:
2✔
280
                return strings.TrimSpace(fs) == ""
2✔
281
        case nil:
2✔
282
                return true
2✔
283
        }
284
        return gocast.IsEmpty(fls)
2✔
285
}
286

287
func tMax[T gocast.Numeric](a, b T) T {
2✔
288
        if a > b {
4✔
289
                return a
2✔
290
        }
2✔
291
        return b
2✔
292
}
293

294
func arrValue[T any](idx int, arr []T) any {
2✔
295
        if idx < len(arr) {
4✔
296
                return arr[idx]
2✔
297
        }
2✔
NEW
298
        return nil
×
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc