• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

capillariesio / capillaries / 7270609459

20 Dec 2023 03:25AM UTC coverage: 91.494% (+0.1%) from 91.382%
7270609459

push

github

web-flow
Support  parenthesis in Go expressions (#61)

41 of 51 new or added lines in 4 files covered. (80.39%)

1 existing line in 1 file now uncovered.

3571 of 3903 relevant lines covered (91.49%)

1.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.96
/pkg/sc/script_node_def.go
1
package sc
2

3
import (
4
        "encoding/json"
5
        "fmt"
6
        "go/ast"
7
        "math"
8
        "regexp"
9
        "strings"
10

11
        "github.com/capillariesio/capillaries/pkg/eval"
12
)
13

14
const (
15
        HandlerExeTypeGeneric  string = "capi_daemon"
16
        HandlerExeTypeToolbelt string = "capi_toolbelt"
17
        HandlerExeTypeWebapi   string = "capi_webapi"
18
)
19

20
const MaxAcceptedBatchesByTableReader int = 1000000
21
const DefaultRowsetSize int = 1000
22
const MaxRowsetSize int = 100000
23

24
type AggFinderVisitor struct {
25
        Error error
26
}
27

28
func (v *AggFinderVisitor) Visit(node ast.Node) ast.Visitor {
1✔
29
        switch callExp := node.(type) {
1✔
30
        case *ast.CallExpr:
1✔
31
                switch callIdentExp := callExp.Fun.(type) {
1✔
32
                case *ast.Ident:
1✔
33
                        if eval.StringToAggFunc(callIdentExp.Name) != eval.AggUnknown {
2✔
34
                                v.Error = fmt.Errorf("found aggregate function %s()", callIdentExp.Name)
1✔
35
                                return nil
1✔
36
                        } else {
1✔
37
                                return v
×
38
                        }
×
39
                default:
×
40
                        return v
×
41
                }
42
        default:
1✔
43
                return v
1✔
44
        }
45
}
46

47
type NodeType string
48

49
const (
50
        NodeTypeNone                NodeType = "none"
51
        NodeTypeFileTable           NodeType = "file_table"
52
        NodeTypeTableTable          NodeType = "table_table"
53
        NodeTypeTableLookupTable    NodeType = "table_lookup_table"
54
        NodeTypeTableFile           NodeType = "table_file"
55
        NodeTypeTableCustomTfmTable NodeType = "table_custom_tfm_table"
56
)
57

58
func ValidateNodeType(nodeType NodeType) error {
1✔
59
        if nodeType == NodeTypeFileTable ||
1✔
60
                nodeType == NodeTypeTableTable ||
1✔
61
                nodeType == NodeTypeTableLookupTable ||
1✔
62
                nodeType == NodeTypeTableFile ||
1✔
63
                nodeType == NodeTypeTableCustomTfmTable {
2✔
64
                return nil
1✔
65
        }
1✔
66
        return fmt.Errorf("invalid node type %s", nodeType)
×
67
}
68

69
const ReaderAlias string = "r"
70
const CreatorAlias string = "w"
71
const LookupAlias string = "l"
72
const CustomProcessorAlias string = "p"
73

74
type NodeRerunPolicy string
75

76
const (
77
        NodeRerun NodeRerunPolicy = "rerun" // Default
78
        NodeFail  NodeRerunPolicy = "fail"
79
)
80

81
func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error {
1✔
82
        if rerunPolicy == NodeRerun ||
1✔
83
                rerunPolicy == NodeFail {
2✔
84
                return nil
1✔
85
        }
1✔
86
        return fmt.Errorf("invalid node rerun policy %s", rerunPolicy)
1✔
87
}
88

89
type NodeStartPolicy string
90

91
const (
92
        NodeStartManual NodeStartPolicy = "manual"
93
        NodeStartAuto   NodeStartPolicy = "auto" // Default
94
)
95

96
func ValidateStartPolicy(startPolicy NodeStartPolicy) error {
1✔
97
        if startPolicy == NodeStartManual ||
1✔
98
                startPolicy == NodeStartAuto {
2✔
99
                return nil
1✔
100
        }
1✔
101
        return fmt.Errorf("invalid node start policy %s", startPolicy)
×
102
}
103

104
type ScriptNodeDef struct {
105
        Name                string          // Get it from the key
106
        Type                NodeType        `json:"type"`
107
        Desc                string          `json:"desc"`
108
        StartPolicy         NodeStartPolicy `json:"start_policy"`
109
        RerunPolicy         NodeRerunPolicy `json:"rerun_policy,omitempty"`
110
        CustomProcessorType string          `json:"custom_proc_type,omitempty"`
111
        HandlerExeType      string          `json:"handler_exe_type,omitempty"`
112

113
        RawReader   json.RawMessage `json:"r"` // This depends on tfm type
114
        TableReader TableReaderDef
115
        FileReader  FileReaderDef
116

117
        Lookup LookupDef `json:"l"`
118

119
        RawProcessorDef json.RawMessage    `json:"p"` // This depends on tfm type
120
        CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner
121

122
        RawWriter            json.RawMessage `json:"w"` // This depends on tfm type
123
        DependencyPolicyName string          `json:"dependency_policy"`
124
        TableCreator         TableCreatorDef
125
        TableUpdater         TableUpdaterDef
126
        FileCreator          FileCreatorDef
127
        DepPolDef            *DependencyPolicyDef
128
}
129

130
func (node *ScriptNodeDef) HasTableReader() bool {
1✔
131
        return node.Type == NodeTypeTableTable ||
1✔
132
                node.Type == NodeTypeTableLookupTable ||
1✔
133
                node.Type == NodeTypeTableFile ||
1✔
134
                node.Type == NodeTypeTableCustomTfmTable
1✔
135
}
1✔
136
func (node *ScriptNodeDef) HasFileReader() bool {
1✔
137
        return node.Type == NodeTypeFileTable
1✔
138
}
1✔
139

140
func (node *ScriptNodeDef) HasLookup() bool {
1✔
141
        return node.Type == NodeTypeTableLookupTable
1✔
142
}
1✔
143

144
func (node *ScriptNodeDef) HasCustomProcessor() bool {
1✔
145
        return node.Type == NodeTypeTableCustomTfmTable
1✔
146
}
1✔
147

148
func (node *ScriptNodeDef) HasTableCreator() bool {
1✔
149
        return node.Type == NodeTypeFileTable ||
1✔
150
                node.Type == NodeTypeTableTable ||
1✔
151
                node.Type == NodeTypeTableLookupTable ||
1✔
152
                node.Type == NodeTypeTableCustomTfmTable
1✔
153
}
1✔
154
func (node *ScriptNodeDef) HasFileCreator() bool {
1✔
155
        return node.Type == NodeTypeTableFile
1✔
156
}
1✔
157
func (node *ScriptNodeDef) GetTargetName() string {
1✔
158
        if node.HasTableCreator() {
2✔
159
                return node.TableCreator.Name
1✔
160
        } else if node.HasFileCreator() {
3✔
161
                return CreatorAlias
1✔
162
        }
1✔
163
        return "dev_error_uknown_target_name"
×
164
}
165

166
func (node *ScriptNodeDef) initReader() error {
1✔
167
        if node.HasTableReader() {
2✔
168
                if err := json.Unmarshal(node.RawReader, &node.TableReader); err != nil {
1✔
169
                        return fmt.Errorf("cannot unmarshal table reader: [%s]", err.Error())
×
170
                }
×
171
                errors := make([]string, 0)
1✔
172
                if len(node.TableReader.TableName) == 0 {
1✔
173
                        errors = append(errors, "table reader cannot reference empty table name")
×
174
                }
×
175
                if node.TableReader.ExpectedBatchesTotal == 0 {
2✔
176
                        node.TableReader.ExpectedBatchesTotal = 1
1✔
177
                } else if node.TableReader.ExpectedBatchesTotal < 0 || node.TableReader.ExpectedBatchesTotal > MaxAcceptedBatchesByTableReader {
2✔
178
                        errors = append(errors, fmt.Sprintf("table reader can accept between 1 and %d batches, %d specified", MaxAcceptedBatchesByTableReader, node.TableReader.ExpectedBatchesTotal))
×
179
                }
×
180
                if node.TableReader.RowsetSize < 0 || MaxRowsetSize < node.TableReader.RowsetSize {
1✔
181
                        errors = append(errors, fmt.Sprintf("invalid rowset size %d, table reader can accept between 0 (defaults to %d) and %d", node.TableReader.RowsetSize, DefaultRowsetSize, MaxRowsetSize))
×
182
                }
×
183
                if node.TableReader.RowsetSize == 0 {
2✔
184
                        node.TableReader.RowsetSize = DefaultRowsetSize
1✔
185
                }
1✔
186
                if len(errors) > 0 {
1✔
187
                        return fmt.Errorf(strings.Join(errors, "; "))
×
188
                }
×
189
        } else if node.HasFileReader() {
2✔
190
                if err := node.FileReader.Deserialize(node.RawReader); err != nil {
1✔
191
                        return fmt.Errorf("cannot deserialize file reader [%s]: [%s]", string(node.RawReader), err.Error())
×
192
                }
×
193
        }
194

195
        return nil
1✔
196
}
197

198
func (node *ScriptNodeDef) initCreator() error {
1✔
199
        if node.HasTableCreator() {
2✔
200
                if err := node.TableCreator.Deserialize(node.RawWriter); err != nil {
2✔
201
                        return fmt.Errorf("cannot deserialize table creator [%s]: [%s]", strings.ReplaceAll(string(node.RawWriter), "\n", " "), err.Error())
1✔
202
                }
1✔
203
        } else if node.HasFileCreator() {
2✔
204
                if err := node.FileCreator.Deserialize(node.RawWriter); err != nil {
2✔
205
                        return fmt.Errorf("cannot deserialize file creator [%s]: [%s]", strings.ReplaceAll(string(node.RawWriter), "\n", " "), err.Error())
1✔
206
                }
1✔
207
        }
208
        return nil
1✔
209
}
210

211
func (node *ScriptNodeDef) initCustomProcessor(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error {
1✔
212
        if node.HasCustomProcessor() {
2✔
213
                if customProcessorDefFactory == nil {
1✔
214
                        return fmt.Errorf("undefined custom processor factory")
×
215
                }
×
216
                if customProcessorsSettings == nil {
1✔
217
                        return fmt.Errorf("missing custom processor settings section")
×
218
                }
×
219
                var ok bool
1✔
220
                node.CustomProcessor, ok = customProcessorDefFactory.Create(node.CustomProcessorType)
1✔
221
                if !ok {
1✔
222
                        return fmt.Errorf("cannot deserialize unknown custom processor %s", node.CustomProcessorType)
×
223
                }
×
224
                customProcSettings, ok := customProcessorsSettings[node.CustomProcessorType]
1✔
225
                if !ok {
1✔
226
                        return fmt.Errorf("cannot find custom processing settings for [%s] in the environment config file", node.CustomProcessorType)
×
227
                }
×
228
                if err := node.CustomProcessor.Deserialize(node.RawProcessorDef, customProcSettings, caPath, privateKeys); err != nil {
1✔
229
                        re := regexp.MustCompile("[ \r\n]+")
×
230
                        return fmt.Errorf("cannot deserialize custom processor [%s]: [%s]", re.ReplaceAllString(string(node.RawProcessorDef), ""), err.Error())
×
231
                }
×
232
        }
233
        return nil
1✔
234
}
235

236
func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error {
1✔
237
        errors := make([]string, 0)
1✔
238

1✔
239
        if err := ValidateNodeType(node.Type); err != nil {
1✔
240
                return err
×
241
        }
×
242

243
        // Defaults
244

245
        if len(node.HandlerExeType) == 0 {
2✔
246
                node.HandlerExeType = HandlerExeTypeGeneric
1✔
247
        }
1✔
248

249
        if len(node.RerunPolicy) == 0 {
2✔
250
                node.RerunPolicy = NodeRerun
1✔
251
        } else if err := ValidateRerunPolicy(node.RerunPolicy); err != nil {
3✔
252
                return err
1✔
253
        }
1✔
254

255
        if len(node.StartPolicy) == 0 {
2✔
256
                node.StartPolicy = NodeStartAuto
1✔
257
        } else if err := ValidateStartPolicy(node.StartPolicy); err != nil {
2✔
258
                return err
×
259
        }
×
260

261
        // Reader
262
        if err := node.initReader(); err != nil {
1✔
263
                errors = append(errors, err.Error())
×
264
        }
×
265

266
        // Creator
267
        if err := node.initCreator(); err != nil {
2✔
268
                errors = append(errors, err.Error())
1✔
269
        }
1✔
270

271
        // Custom processor
272
        if err := node.initCustomProcessor(customProcessorDefFactory, customProcessorsSettings, caPath, privateKeys); err != nil {
1✔
273
                errors = append(errors, err.Error())
×
274
        }
×
275

276
        if len(errors) > 0 {
2✔
277
                return fmt.Errorf(strings.Join(errors, "; "))
1✔
278
        }
1✔
279

280
        return nil
1✔
281
}
282

283
func (node *ScriptNodeDef) evalCreatorAndLookupExpressionsAndCheckType() error {
1✔
284
        errors := make([]string, 0, 2)
1✔
285

1✔
286
        if node.HasLookup() && node.Lookup.UsesFilter() {
2✔
287
                if err := evalExpressionWithFieldRefsAndCheckType(node.Lookup.Filter, node.Lookup.UsedInFilterFields, FieldTypeBool); err != nil {
2✔
288
                        errors = append(errors, fmt.Sprintf("cannot evaluate lookup filter expression [%s]: [%s]", node.Lookup.RawFilter, err.Error()))
1✔
289
                }
1✔
290
        }
291

292
        if node.HasTableCreator() {
2✔
293
                // Having
1✔
294
                if err := evalExpressionWithFieldRefsAndCheckType(node.TableCreator.Having, node.TableCreator.UsedInHavingFields, FieldTypeBool); err != nil {
2✔
295
                        errors = append(errors, fmt.Sprintf("cannot evaluate table creator 'having' expression [%s]: [%s]", node.TableCreator.RawHaving, err.Error()))
1✔
296
                }
1✔
297

298
                // Target table fields
299
                for tgtFieldName, tgtFieldDef := range node.TableCreator.Fields {
2✔
300

1✔
301
                        // TODO: find a way to check field usage:
1✔
302
                        // - lookup fields must be used only within enclosing agg calls (sum etc), otherwise last one wins
1✔
303
                        // - src table fields are allowed within enclosing agg calls, and there is even a biz case for it (multiply src field by the number of lookup rows)
1✔
304

1✔
305
                        // If no grouping is used, no agg calls allowed
1✔
306
                        if node.HasLookup() && !node.Lookup.IsGroup || !node.HasLookup() {
2✔
307
                                v := AggFinderVisitor{}
1✔
308
                                ast.Walk(&v, tgtFieldDef.ParsedExpression)
1✔
309
                                if v.Error != nil {
2✔
310
                                        errors = append(errors, fmt.Sprintf("cannot use agg functions in [%s], lookup group flag is not set or no lookups used: [%s]", tgtFieldDef.RawExpression, v.Error.Error()))
1✔
311
                                }
1✔
312
                        }
313

314
                        // Just eval with test values, agg functions will go through preserving the type no problem
315
                        if err := evalExpressionWithFieldRefsAndCheckType(tgtFieldDef.ParsedExpression, node.TableCreator.UsedInTargetExpressionsFields, tgtFieldDef.Type); err != nil {
1✔
316
                                errors = append(errors, fmt.Sprintf("cannot evaluate table creator target field %s expression [%s]: [%s]", tgtFieldName, tgtFieldDef.RawExpression, err.Error()))
×
317
                        }
×
318
                }
319
        }
320

321
        if node.HasFileCreator() {
2✔
322
                // Having
1✔
323
                if err := evalExpressionWithFieldRefsAndCheckType(node.FileCreator.Having, node.FileCreator.UsedInHavingFields, FieldTypeBool); err != nil {
2✔
324
                        errors = append(errors, fmt.Sprintf("cannot evaluate file creator 'having' expression [%s]: [%s]", node.FileCreator.RawHaving, err.Error()))
1✔
325
                }
1✔
326

327
                // Target table fields (yes, they are not just strings, we check the type)
328
                for i := 0; i < len(node.FileCreator.Columns); i++ {
2✔
329
                        colDef := &node.FileCreator.Columns[i]
1✔
330
                        if err := evalExpressionWithFieldRefsAndCheckType(colDef.ParsedExpression, node.FileCreator.UsedInTargetExpressionsFields, colDef.Type); err != nil {
1✔
NEW
331
                                errors = append(errors, fmt.Sprintf("cannot evaluate file creator target field %s expression [%s]: [%s]", colDef.Name, colDef.RawExpression, err.Error()))
×
332
                        }
×
333
                }
334
        }
335

336
        // NOTE: do not even try to eval expressions from the custom processor here,
337
        // they may contain custom stuff and are pretty much guaranteed to fail
338

339
        if len(errors) > 0 {
2✔
340
                return fmt.Errorf(strings.Join(errors, "; "))
1✔
341
        }
1✔
342

343
        return nil
1✔
344
}
345

346
func (node *ScriptNodeDef) getSourceFieldRefs() (*FieldRefs, error) {
1✔
347
        if node.HasFileReader() {
2✔
348
                return node.FileReader.getFieldRefs(), nil
1✔
349
        } else if node.HasTableReader() {
3✔
350
                return node.TableReader.TableCreator.GetFieldRefsWithAlias(ReaderAlias), nil
1✔
351
        }
1✔
352

353
        return nil, fmt.Errorf("dev error, node of type %s has no file or table reader", node.Type)
×
354
}
355

356
func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs {
1✔
357
        if !node.HasTableCreator() {
1✔
358
                return &FieldRefs{}
×
359
        }
×
360
        fieldTypeMap := map[string]TableFieldType{}
1✔
361
        for _, idxDef := range node.TableCreator.Indexes {
2✔
362
                if idxDef.Uniqueness == IdxUnique {
2✔
363
                        for _, idxComponentDef := range idxDef.Components {
2✔
364
                                fieldTypeMap[idxComponentDef.FieldName] = idxComponentDef.FieldType
1✔
365
                        }
1✔
366
                }
367
        }
368
        fieldRefs := make(FieldRefs, len(fieldTypeMap))
1✔
369
        fieldRefIdx := 0
1✔
370
        for fieldName, fieldType := range fieldTypeMap {
2✔
371
                fieldRefs[fieldRefIdx] = FieldRef{
1✔
372
                        FieldName: fieldName,
1✔
373
                        FieldType: fieldType,
1✔
374
                        TableName: node.TableCreator.Name}
1✔
375
                fieldRefIdx++
1✔
376
        }
1✔
377

378
        return &fieldRefs
1✔
379
}
380

381
func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error) {
1✔
382
        if node.HasTableReader() || node.HasFileCreator() && node.TableReader.ExpectedBatchesTotal > 1 {
2✔
383
                if node.TableReader.ExpectedBatchesTotal == 1 {
2✔
384
                        return [][]int64{{int64(math.MinInt64), int64(math.MaxInt64)}}, nil
1✔
385
                }
1✔
386

387
                tokenIntervalPerBatch := int64(math.MaxInt64/node.TableReader.ExpectedBatchesTotal) - int64(math.MinInt64/node.TableReader.ExpectedBatchesTotal)
1✔
388

1✔
389
                intervals := make([][]int64, node.TableReader.ExpectedBatchesTotal)
1✔
390
                left := int64(math.MinInt64)
1✔
391
                for i := 0; i < len(intervals); i++ {
2✔
392
                        var right int64
1✔
393
                        if i == len(intervals)-1 {
2✔
394
                                right = math.MaxInt64
1✔
395
                        } else {
2✔
396
                                right = left + tokenIntervalPerBatch - 1
1✔
397
                        }
1✔
398
                        intervals[i] = []int64{left, right}
1✔
399
                        left = right + 1
1✔
400
                }
401
                return intervals, nil
1✔
402
                // } else if node.HasFileCreator() && node.TableReader.ExpectedBatchesTotal == 1 {
403
                //         // One output file - one batch, dummy intervals
404
                //         intervals := make([][]int64, 1)
405
                //         intervals[0] = []int64{int64(0), 0}
406
                //         return intervals, nil
407
        } else if node.HasFileReader() {
2✔
408
                // One input file - one batch
1✔
409
                intervals := make([][]int64, len(node.FileReader.SrcFileUrls))
1✔
410
                for i := 0; i < len(node.FileReader.SrcFileUrls); i++ {
2✔
411
                        intervals[i] = []int64{int64(i), int64(i)}
1✔
412
                }
1✔
413
                return intervals, nil
1✔
414
        }
415

416
        return nil, fmt.Errorf("cannot find implementation for intervals for node %s", node.Name)
×
417
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc