• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

capillariesio / capillaries / 14208558196

02 Apr 2025 12:10AM UTC coverage: 92.133% (+0.06%) from 92.078%
14208558196

push

github

web-flow
2025 03 update (#83)

Go 1.24
Node 23.10
Svelte 5.25.3

119 of 159 new or added lines in 18 files covered. (74.84%)

3 existing lines in 3 files now uncovered.

5352 of 5809 relevant lines covered (92.13%)

1.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.58
/pkg/sc/script_node_def.go
1
package sc
2

3
import (
4
        "encoding/json"
5
        "errors"
6
        "fmt"
7
        "go/ast"
8
        "math"
9
        "regexp"
10
        "strings"
11

12
        "github.com/capillariesio/capillaries/pkg/eval"
13
)
14

15
const (
16
        HandlerExeTypeGeneric  string = "capi_daemon"
17
        HandlerExeTypeToolbelt string = "capi_toolbelt"
18
        HandlerExeTypeWebapi   string = "capi_webapi"
19
)
20

21
const MaxAcceptedBatchesByTableReader int = 1000000
22
const DefaultRowsetSize int = 1000
23
const MaxRowsetSize int = 100000
24

25
type AggFinderVisitor struct {
26
        Error error
27
}
28

29
func (v *AggFinderVisitor) Visit(node ast.Node) ast.Visitor {
1✔
30
        switch callExp := node.(type) {
1✔
31
        case *ast.CallExpr:
1✔
32
                switch callIdentExp := callExp.Fun.(type) {
1✔
33
                case *ast.Ident:
1✔
34
                        if eval.StringToAggFunc(callIdentExp.Name) != eval.AggUnknown {
2✔
35
                                v.Error = fmt.Errorf("found aggregate function %s()", callIdentExp.Name)
1✔
36
                                return nil
1✔
37
                        }
1✔
NEW
38
                        return v
×
39
                default:
×
40
                        return v
×
41
                }
42
        default:
1✔
43
                return v
1✔
44
        }
45
}
46

47
type NodeType string
48

49
const (
50
        NodeTypeNone                NodeType = "none"
51
        NodeTypeFileTable           NodeType = "file_table"
52
        NodeTypeTableTable          NodeType = "table_table"
53
        NodeTypeTableLookupTable    NodeType = "table_lookup_table"
54
        NodeTypeTableFile           NodeType = "table_file"
55
        NodeTypeTableCustomTfmTable NodeType = "table_custom_tfm_table"
56
        NodeTypeDistinctTable       NodeType = "distinct_table"
57
)
58

59
func ValidateNodeType(nodeType NodeType) error {
1✔
60
        if nodeType == NodeTypeFileTable ||
1✔
61
                nodeType == NodeTypeTableTable ||
1✔
62
                nodeType == NodeTypeTableLookupTable ||
1✔
63
                nodeType == NodeTypeTableFile ||
1✔
64
                nodeType == NodeTypeDistinctTable ||
1✔
65
                nodeType == NodeTypeTableCustomTfmTable {
2✔
66
                return nil
1✔
67
        }
1✔
68
        return fmt.Errorf("invalid node type %s", nodeType)
×
69
}
70

71
const ReaderAlias string = "r"
72
const CreatorAlias string = "w"
73
const LookupAlias string = "l"
74
const CustomProcessorAlias string = "p"
75

76
type NodeRerunPolicy string
77

78
const (
79
        NodeRerun NodeRerunPolicy = "rerun" // Default
80
        NodeFail  NodeRerunPolicy = "fail"
81
)
82

83
func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error {
1✔
84
        if rerunPolicy == NodeRerun ||
1✔
85
                rerunPolicy == NodeFail {
2✔
86
                return nil
1✔
87
        }
1✔
88
        return fmt.Errorf("invalid node rerun policy %s", rerunPolicy)
1✔
89
}
90

91
type NodeStartPolicy string
92

93
const (
94
        NodeStartManual NodeStartPolicy = "manual"
95
        NodeStartAuto   NodeStartPolicy = "auto" // Default
96
)
97

98
func ValidateStartPolicy(startPolicy NodeStartPolicy) error {
1✔
99
        if startPolicy == NodeStartManual ||
1✔
100
                startPolicy == NodeStartAuto {
2✔
101
                return nil
1✔
102
        }
1✔
103
        return fmt.Errorf("invalid node start policy %s", startPolicy)
×
104
}
105

106
type ScriptNodeDef struct {
107
        Name                string          // Get it from the key
108
        Type                NodeType        `json:"type" yaml:"type"`
109
        Desc                string          `json:"desc" yaml:"desc"`
110
        StartPolicy         NodeStartPolicy `json:"start_policy" yaml:"start_policy"`
111
        RerunPolicy         NodeRerunPolicy `json:"rerun_policy,omitempty" yaml:"rerun_policy,omitempty"`
112
        CustomProcessorType string          `json:"custom_proc_type,omitempty" yaml:"custom_proc_type,omitempty"`
113
        HandlerExeType      string          `json:"handler_exe_type,omitempty" yaml:"handler_exe_type,omitempty"`
114

115
        RawReader   json.RawMessage `json:"r" yaml:"r"` // This depends on tfm type
116
        TableReader TableReaderDef
117
        FileReader  FileReaderDef
118

119
        Lookup LookupDef `json:"l" yaml:"l"`
120

121
        RawProcessorDef json.RawMessage    `json:"p" yaml:"p"` // This depends on tfm type
122
        CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner
123

124
        RawWriter            json.RawMessage `json:"w" yaml:"w"` // This depends on tfm type
125
        DependencyPolicyName string          `json:"dependency_policy" yaml:"dependency_policy"`
126
        TableCreator         TableCreatorDef
127
        TableUpdater         TableUpdaterDef
128
        FileCreator          FileCreatorDef
129
        DepPolDef            *DependencyPolicyDef
130
}
131

132
func (node *ScriptNodeDef) HasTableReader() bool {
1✔
133
        return node.Type == NodeTypeTableTable ||
1✔
134
                node.Type == NodeTypeTableLookupTable ||
1✔
135
                node.Type == NodeTypeTableFile ||
1✔
136
                node.Type == NodeTypeDistinctTable ||
1✔
137
                node.Type == NodeTypeTableCustomTfmTable
1✔
138
}
1✔
139
func (node *ScriptNodeDef) HasFileReader() bool {
1✔
140
        return node.Type == NodeTypeFileTable
1✔
141
}
1✔
142

143
func (node *ScriptNodeDef) HasLookup() bool {
1✔
144
        return node.Type == NodeTypeTableLookupTable
1✔
145
}
1✔
146

147
func (node *ScriptNodeDef) HasCustomProcessor() bool {
1✔
148
        return node.Type == NodeTypeTableCustomTfmTable
1✔
149
}
1✔
150

151
func (node *ScriptNodeDef) HasTableCreator() bool {
1✔
152
        return node.Type == NodeTypeFileTable ||
1✔
153
                node.Type == NodeTypeTableTable ||
1✔
154
                node.Type == NodeTypeDistinctTable ||
1✔
155
                node.Type == NodeTypeTableLookupTable ||
1✔
156
                node.Type == NodeTypeTableCustomTfmTable
1✔
157
}
1✔
158
func (node *ScriptNodeDef) HasFileCreator() bool {
1✔
159
        return node.Type == NodeTypeTableFile
1✔
160
}
1✔
161
func (node *ScriptNodeDef) GetTargetName() string {
1✔
162
        if node.HasTableCreator() {
2✔
163
                return node.TableCreator.Name
1✔
164
        } else if node.HasFileCreator() {
3✔
165
                return CreatorAlias
1✔
166
        }
1✔
167
        return "dev_error_uknown_target_name"
×
168
}
169

170
func (node *ScriptNodeDef) initReader() error {
1✔
171
        if node.HasTableReader() {
2✔
172
                if err := json.Unmarshal(node.RawReader, &node.TableReader); err != nil {
1✔
173
                        return fmt.Errorf("cannot unmarshal table reader: [%s]", err.Error())
×
174
                }
×
175
                foundErrors := make([]string, 0)
1✔
176
                if len(node.TableReader.TableName) == 0 {
1✔
NEW
177
                        foundErrors = append(foundErrors, "table reader cannot reference empty table name")
×
178
                }
×
179
                if node.TableReader.ExpectedBatchesTotal == 0 {
2✔
180
                        node.TableReader.ExpectedBatchesTotal = 1
1✔
181
                } else if node.TableReader.ExpectedBatchesTotal < 0 || node.TableReader.ExpectedBatchesTotal > MaxAcceptedBatchesByTableReader {
2✔
NEW
182
                        foundErrors = append(foundErrors, fmt.Sprintf("table reader can accept between 1 and %d batches, %d specified", MaxAcceptedBatchesByTableReader, node.TableReader.ExpectedBatchesTotal))
×
183
                }
×
184
                if node.TableReader.RowsetSize < 0 || MaxRowsetSize < node.TableReader.RowsetSize {
1✔
NEW
185
                        foundErrors = append(foundErrors, fmt.Sprintf("invalid rowset size %d, table reader can accept between 0 (defaults to %d) and %d", node.TableReader.RowsetSize, DefaultRowsetSize, MaxRowsetSize))
×
186
                }
×
187
                if node.TableReader.RowsetSize == 0 {
2✔
188
                        node.TableReader.RowsetSize = DefaultRowsetSize
1✔
189
                }
1✔
190
                if len(foundErrors) > 0 {
1✔
NEW
191
                        return fmt.Errorf("%s", strings.Join(foundErrors, "; "))
×
UNCOV
192
                }
×
193
        } else if node.HasFileReader() {
2✔
194
                if err := node.FileReader.Deserialize(node.RawReader); err != nil {
1✔
195
                        return fmt.Errorf("cannot deserialize file reader [%s]: [%s]", string(node.RawReader), err.Error())
×
196
                }
×
197
        }
198

199
        return nil
1✔
200
}
201

202
func (node *ScriptNodeDef) initCreator() error {
1✔
203
        if node.HasTableCreator() {
2✔
204
                if err := node.TableCreator.Deserialize(node.RawWriter); err != nil {
2✔
205
                        return fmt.Errorf("cannot deserialize table creator [%s]: [%s]", strings.ReplaceAll(string(node.RawWriter), "\n", " "), err.Error())
1✔
206
                }
1✔
207
        } else if node.HasFileCreator() {
2✔
208
                if err := node.FileCreator.Deserialize(node.RawWriter); err != nil {
2✔
209
                        return fmt.Errorf("cannot deserialize file creator [%s]: [%s]", strings.ReplaceAll(string(node.RawWriter), "\n", " "), err.Error())
1✔
210
                }
1✔
211
        }
212
        return nil
1✔
213
}
214

215
func (node *ScriptNodeDef) initCustomProcessor(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, scriptType ScriptType, caPath string, privateKeys map[string]string) error {
1✔
216
        if node.HasCustomProcessor() {
2✔
217
                if customProcessorDefFactory == nil {
1✔
NEW
218
                        return errors.New("undefined custom processor factory")
×
219
                }
×
220
                if customProcessorsSettings == nil {
1✔
NEW
221
                        return errors.New("missing custom processor settings section")
×
222
                }
×
223
                var ok bool
1✔
224
                node.CustomProcessor, ok = customProcessorDefFactory.Create(node.CustomProcessorType)
1✔
225
                if !ok {
1✔
226
                        return fmt.Errorf("cannot deserialize unknown custom processor %s", node.CustomProcessorType)
×
227
                }
×
228
                customProcSettings, ok := customProcessorsSettings[node.CustomProcessorType]
1✔
229
                if !ok {
1✔
230
                        return fmt.Errorf("cannot find custom processing settings for [%s] in the environment config file", node.CustomProcessorType)
×
231
                }
×
232
                if err := node.CustomProcessor.Deserialize(node.RawProcessorDef, customProcSettings, scriptType, caPath, privateKeys); err != nil {
1✔
233
                        re := regexp.MustCompile("[ \r\n]+")
×
234
                        return fmt.Errorf("cannot deserialize custom processor [%s]: [%s]", re.ReplaceAllString(string(node.RawProcessorDef), ""), err.Error())
×
235
                }
×
236
        }
237
        return nil
1✔
238
}
239

240
func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, scriptType ScriptType, caPath string, privateKeys map[string]string) error {
1✔
241
        foundErrors := make([]string, 0)
1✔
242

1✔
243
        if err := ValidateNodeType(node.Type); err != nil {
1✔
244
                return err
×
245
        }
×
246

247
        // Defaults
248

249
        if len(node.HandlerExeType) == 0 {
2✔
250
                node.HandlerExeType = HandlerExeTypeGeneric
1✔
251
        }
1✔
252

253
        if len(node.RerunPolicy) == 0 {
2✔
254
                node.RerunPolicy = NodeRerun
1✔
255
        } else if err := ValidateRerunPolicy(node.RerunPolicy); err != nil {
3✔
256
                return err
1✔
257
        }
1✔
258

259
        if len(node.StartPolicy) == 0 {
2✔
260
                node.StartPolicy = NodeStartAuto
1✔
261
        } else if err := ValidateStartPolicy(node.StartPolicy); err != nil {
2✔
262
                return err
×
263
        }
×
264

265
        // Reader
266
        if err := node.initReader(); err != nil {
1✔
NEW
267
                foundErrors = append(foundErrors, err.Error())
×
268
        }
×
269

270
        // Creator
271
        if err := node.initCreator(); err != nil {
2✔
272
                foundErrors = append(foundErrors, err.Error())
1✔
273
        }
1✔
274

275
        // Custom processor
276
        if err := node.initCustomProcessor(customProcessorDefFactory, customProcessorsSettings, scriptType, caPath, privateKeys); err != nil {
1✔
NEW
277
                foundErrors = append(foundErrors, err.Error())
×
278
        }
×
279

280
        // Distinct table
281
        if node.Type == NodeTypeDistinctTable {
2✔
282
                if node.RerunPolicy != NodeFail {
2✔
283
                        foundErrors = append(foundErrors, "distinct_table node must have fail policy, no reruns possible")
1✔
284
                }
1✔
285
                if _, _, err := node.TableCreator.GetSingleUniqueIndexDef(); err != nil {
2✔
286
                        foundErrors = append(foundErrors, err.Error())
1✔
287
                }
1✔
288
        }
289

290
        if len(foundErrors) > 0 {
2✔
291
                return fmt.Errorf("%s", strings.Join(foundErrors, "; "))
1✔
292
        }
1✔
293

294
        return nil
1✔
295
}
296

297
func (node *ScriptNodeDef) evalCreatorAndLookupExpressionsAndCheckType() error {
1✔
298
        foundErrors := make([]string, 0, 2)
1✔
299

1✔
300
        if node.HasLookup() && node.Lookup.UsesFilter() {
2✔
301
                if err := evalExpressionWithFieldRefsAndCheckType(node.Lookup.Filter, node.Lookup.UsedInFilterFields, FieldTypeBool); err != nil {
2✔
302
                        foundErrors = append(foundErrors, fmt.Sprintf("cannot evaluate lookup filter expression [%s]: [%s]", node.Lookup.RawFilter, err.Error()))
1✔
303
                }
1✔
304
        }
305

306
        if node.HasTableCreator() {
2✔
307
                // Having
1✔
308
                if err := evalExpressionWithFieldRefsAndCheckType(node.TableCreator.Having, node.TableCreator.UsedInHavingFields, FieldTypeBool); err != nil {
2✔
309
                        foundErrors = append(foundErrors, fmt.Sprintf("cannot evaluate table creator 'having' expression [%s]: [%s]", node.TableCreator.RawHaving, err.Error()))
1✔
310
                }
1✔
311

312
                // Target table fields
313
                for tgtFieldName, tgtFieldDef := range node.TableCreator.Fields {
2✔
314

1✔
315
                        // TODO: find a way to check field usage:
1✔
316
                        // - lookup fields must be used only within enclosing agg calls (sum etc), otherwise last one wins
1✔
317
                        // - src table fields are allowed within enclosing agg calls, and there is even a biz case for it (multiply src field by the number of lookup rows)
1✔
318

1✔
319
                        // If no grouping is used, no agg calls allowed
1✔
320
                        if node.HasLookup() && !node.Lookup.IsGroup || !node.HasLookup() {
2✔
321
                                v := AggFinderVisitor{}
1✔
322
                                ast.Walk(&v, tgtFieldDef.ParsedExpression)
1✔
323
                                if v.Error != nil {
2✔
324
                                        foundErrors = append(foundErrors, fmt.Sprintf("cannot use agg functions in [%s], lookup group flag is not set or no lookups used: [%s]", tgtFieldDef.RawExpression, v.Error.Error()))
1✔
325
                                }
1✔
326
                        }
327

328
                        // Just eval with test values, agg functions will go through preserving the type no problem
329
                        if err := evalExpressionWithFieldRefsAndCheckType(tgtFieldDef.ParsedExpression, node.TableCreator.UsedInTargetExpressionsFields, tgtFieldDef.Type); err != nil {
1✔
NEW
330
                                foundErrors = append(foundErrors, fmt.Sprintf("cannot evaluate table creator target field %s expression [%s]: [%s]", tgtFieldName, tgtFieldDef.RawExpression, err.Error()))
×
331
                        }
×
332
                }
333
        }
334

335
        if node.HasFileCreator() {
2✔
336
                // Having
1✔
337
                if err := evalExpressionWithFieldRefsAndCheckType(node.FileCreator.Having, node.FileCreator.UsedInHavingFields, FieldTypeBool); err != nil {
2✔
338
                        foundErrors = append(foundErrors, fmt.Sprintf("cannot evaluate file creator 'having' expression [%s]: [%s]", node.FileCreator.RawHaving, err.Error()))
1✔
339
                }
1✔
340

341
                // Target table fields (yes, they are not just strings, we check the type)
342
                for i := 0; i < len(node.FileCreator.Columns); i++ {
2✔
343
                        colDef := &node.FileCreator.Columns[i]
1✔
344
                        if err := evalExpressionWithFieldRefsAndCheckType(colDef.ParsedExpression, node.FileCreator.UsedInTargetExpressionsFields, colDef.Type); err != nil {
1✔
NEW
345
                                foundErrors = append(foundErrors, fmt.Sprintf("cannot evaluate file creator target field %s expression [%s]: [%s]", colDef.Name, colDef.RawExpression, err.Error()))
×
346
                        }
×
347
                }
348
        }
349

350
        // NOTE: do not even try to eval expressions from the custom processor here,
351
        // they may contain custom stuff and are pretty much guaranteed to fail
352

353
        if len(foundErrors) > 0 {
2✔
354
                return fmt.Errorf("%s", strings.Join(foundErrors, "; "))
1✔
355
        }
1✔
356

357
        return nil
1✔
358
}
359

360
func (node *ScriptNodeDef) getSourceFieldRefs() (*FieldRefs, error) {
1✔
361
        if node.HasFileReader() {
2✔
362
                return node.FileReader.getFieldRefs(), nil
1✔
363
        } else if node.HasTableReader() {
3✔
364
                return node.TableReader.TableCreator.GetFieldRefsWithAlias(ReaderAlias), nil
1✔
365
        }
1✔
366

367
        return nil, fmt.Errorf("dev error, node of type %s has no file or table reader", node.Type)
×
368
}
369

370
func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs {
1✔
371
        if !node.HasTableCreator() {
1✔
372
                return &FieldRefs{}
×
373
        }
×
374
        fieldTypeMap := map[string]TableFieldType{}
1✔
375
        for _, idxDef := range node.TableCreator.Indexes {
2✔
376
                if idxDef.Uniqueness == IdxUnique {
2✔
377
                        for _, idxComponentDef := range idxDef.Components {
2✔
378
                                fieldTypeMap[idxComponentDef.FieldName] = idxComponentDef.FieldType
1✔
379
                        }
1✔
380
                }
381
        }
382
        fieldRefs := make(FieldRefs, len(fieldTypeMap))
1✔
383
        fieldRefIdx := 0
1✔
384
        for fieldName, fieldType := range fieldTypeMap {
2✔
385
                fieldRefs[fieldRefIdx] = FieldRef{
1✔
386
                        FieldName: fieldName,
1✔
387
                        FieldType: fieldType,
1✔
388
                        TableName: node.TableCreator.Name}
1✔
389
                fieldRefIdx++
1✔
390
        }
1✔
391

392
        return &fieldRefs
1✔
393
}
394

395
func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error) {
1✔
396
        if node.HasTableReader() || node.HasFileCreator() && node.TableReader.ExpectedBatchesTotal > 1 {
2✔
397
                if node.TableReader.ExpectedBatchesTotal == 1 {
2✔
398
                        return [][]int64{{int64(math.MinInt64), int64(math.MaxInt64)}}, nil
1✔
399
                }
1✔
400

401
                tokenIntervalPerBatch := int64(math.MaxInt64/node.TableReader.ExpectedBatchesTotal) - int64(math.MinInt64/node.TableReader.ExpectedBatchesTotal)
1✔
402

1✔
403
                intervals := make([][]int64, node.TableReader.ExpectedBatchesTotal)
1✔
404
                left := int64(math.MinInt64)
1✔
405
                for i := 0; i < len(intervals); i++ {
2✔
406
                        var right int64
1✔
407
                        if i == len(intervals)-1 {
2✔
408
                                right = math.MaxInt64
1✔
409
                        } else {
2✔
410
                                right = left + tokenIntervalPerBatch - 1
1✔
411
                        }
1✔
412
                        intervals[i] = []int64{left, right}
1✔
413
                        left = right + 1
1✔
414
                }
415
                return intervals, nil
1✔
416
                // } else if node.HasFileCreator() && node.TableReader.ExpectedBatchesTotal == 1 {
417
                //         // One output file - one batch, dummy intervals
418
                //         intervals := make([][]int64, 1)
419
                //         intervals[0] = []int64{int64(0), 0}
420
                //         return intervals, nil
421
        } else if node.HasFileReader() {
2✔
422
                // One input file - one batch
1✔
423
                intervals := make([][]int64, len(node.FileReader.SrcFileUrls))
1✔
424
                for i := 0; i < len(node.FileReader.SrcFileUrls); i++ {
2✔
425
                        intervals[i] = []int64{int64(i), int64(i)}
1✔
426
                }
1✔
427
                return intervals, nil
1✔
428
        }
429

430
        return nil, fmt.Errorf("cannot find implementation for intervals for node %s", node.Name)
×
431
}
432

433
func (node *ScriptNodeDef) isNodeUsesIdx(idxName string) bool {
1✔
434
        if node.HasLookup() && node.Lookup.IndexName == idxName {
2✔
435
                return true
1✔
436
        }
1✔
437

438
        distinctIdxCandidate, ok := node.TableCreator.Indexes[idxName]
1✔
439
        if ok {
2✔
440
                if node.Type == NodeTypeDistinctTable && distinctIdxCandidate.Uniqueness == IdxUnique {
2✔
441
                        return true
1✔
442
                }
1✔
443
        }
444

445
        return false
1✔
446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc