• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 11609122130

31 Oct 2024 09:12AM UTC coverage: 82.764% (-0.1%) from 82.877%
11609122130

push

github

ravisuhag
feat: add before_script before making http request

22 of 36 new or added lines in 2 files covered. (61.11%)

1 existing line in 1 file now uncovered.

6804 of 8221 relevant lines covered (82.76%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.27
/plugins/extractors/http/execute_script.go
1
package http
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "reflect"
9
        "strings"
10
        "sync"
11

12
        "github.com/d5/tengo/v2"
13
        "github.com/go-playground/validator/v10"
14
        "github.com/mcuadros/go-defaults"
15
        "github.com/raystack/meteor/models"
16
        v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
17
        "github.com/raystack/meteor/plugins"
18
        "github.com/raystack/meteor/plugins/internal/tengoutil"
19
        "github.com/raystack/meteor/plugins/internal/tengoutil/structmap"
20
        "google.golang.org/protobuf/proto"
21
)
22

23
func (e *Extractor) executeScript(ctx context.Context, res interface{}, scriptCfg Script, emit plugins.Emit) error {
1✔
24
        s, err := tengoutil.NewSecureScript(
1✔
25
                ([]byte)(scriptCfg.Source), e.scriptGlobals(ctx, res, emit),
1✔
26
        )
1✔
27
        if err != nil {
1✔
28
                return err
×
29
        }
×
30

31
        s.SetMaxAllocs(scriptCfg.MaxAllocs)
1✔
32
        s.SetMaxConstObjects(scriptCfg.MaxConstObjects)
1✔
33

1✔
34
        c, err := s.Compile()
1✔
35
        if err != nil {
1✔
36
                return fmt.Errorf("compile: %w", err)
×
37
        }
×
38

39
        if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) {
2✔
40
                return fmt.Errorf("run: %w", err)
1✔
41
        }
1✔
42

43
        err = e.convertTengoObjToRequest(c.Get("request").Value())
1✔
44
        if err != nil {
1✔
NEW
45
                return err
×
NEW
46
        }
×
47

48
        return nil
1✔
49
}
50

51
func (e *Extractor) scriptGlobals(ctx context.Context, res interface{}, emit plugins.Emit) map[string]interface{} {
1✔
52
        req, err := e.convertRequestToTengoObj()
1✔
53
        if err != nil {
1✔
NEW
54
                e.logger.Error(err.Error())
×
NEW
55
        }
×
56

57
        return map[string]interface{}{
1✔
58
                "recipe_scope": &tengo.String{Value: e.UrnScope},
1✔
59
                "request":      req,
1✔
60
                "response":     res,
1✔
61
                "new_asset": &tengo.UserFunction{
1✔
62
                        Name:  "new_asset",
1✔
63
                        Value: newAssetWrapper(),
1✔
64
                },
1✔
65
                "emit": &tengo.UserFunction{
1✔
66
                        Name:  "emit",
1✔
67
                        Value: emitWrapper(emit),
1✔
68
                },
1✔
69
                "execute_request": &tengo.UserFunction{
1✔
70
                        Name:  "execute_request",
1✔
71
                        Value: executeRequestWrapper(ctx, e.config.Concurrency, e.executeRequest),
1✔
72
                },
1✔
73
                "exit": &tengo.UserFunction{
1✔
74
                        Name: "exit",
1✔
75
                        Value: func(...tengo.Object) (tengo.Object, error) {
2✔
76
                                return nil, errUserExit
1✔
77
                        },
1✔
78
                },
79
        }
80
}
81

82
func (e *Extractor) convertTengoObjToRequest(obj interface{}) error {
1✔
83
        r, err := json.Marshal(obj)
1✔
84
        if err != nil {
1✔
NEW
85
                return err
×
NEW
86
        }
×
87
        err = json.Unmarshal(r, &e.config.Request)
1✔
88
        if err != nil {
1✔
NEW
89
                return err
×
NEW
90
        }
×
91
        return nil
1✔
92
}
93
func (e *Extractor) convertRequestToTengoObj() (tengo.Object, error) {
1✔
94
        var res map[string]interface{}
1✔
95
        r, err := json.Marshal(e.config.Request)
1✔
96
        if err != nil {
1✔
NEW
97
                return nil, err
×
NEW
98
        }
×
99
        err = json.Unmarshal(r, &res)
1✔
100
        if err != nil {
1✔
NEW
101
                return nil, err
×
NEW
102
        }
×
103
        return tengo.FromInterface(res)
1✔
104
}
105

106
func newAssetWrapper() tengo.CallableFunc {
1✔
107
        typeURLs := knownTypeURLs()
1✔
108
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
109
                if len(args) != 1 {
2✔
110
                        return nil, tengo.ErrWrongNumArguments
1✔
111
                }
1✔
112

113
                typ, ok := tengo.ToString(args[0])
1✔
114
                if !ok {
1✔
115
                        return nil, tengo.ErrInvalidArgumentType{
×
116
                                Name:     "typ",
×
117
                                Expected: "string(compatible)",
×
118
                                Found:    args[0].TypeName(),
×
119
                        }
×
120
                }
×
121

122
                return newAsset(typeURLs, typ)
1✔
123
        }
124
}
125

126
func emitWrapper(emit plugins.Emit) tengo.CallableFunc {
1✔
127
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
128
                if len(args) != 1 {
2✔
129
                        return nil, tengo.ErrWrongNumArguments
1✔
130
                }
1✔
131

132
                m, ok := tengo.ToInterface(args[0]).(map[string]interface{})
1✔
133
                if !ok {
2✔
134
                        return nil, tengo.ErrInvalidArgumentType{
1✔
135
                                Name:     "asset",
1✔
136
                                Expected: "Map",
1✔
137
                                Found:    args[0].TypeName(),
1✔
138
                        }
1✔
139
                }
1✔
140

141
                var ast v1beta2.Asset
1✔
142
                if err := structmap.AsStruct(m, &ast); err != nil {
1✔
143
                        return nil, fmt.Errorf("emit asset: %w", err)
×
144
                }
×
145

146
                emit(models.NewRecord(&ast))
1✔
147

1✔
148
                return tengo.UndefinedValue, nil
1✔
149
        }
150
}
151

152
func executeRequestWrapper(ctx context.Context, concurrency int, executeRequest executeRequestFunc) tengo.CallableFunc {
1✔
153
        type job struct {
1✔
154
                i      int
1✔
155
                reqCfg RequestConfig
1✔
156
        }
1✔
157
        requestsChan := func(ctx context.Context, reqs []RequestConfig) <-chan job {
2✔
158
                ch := make(chan job)
1✔
159

1✔
160
                go func() {
2✔
161
                        defer close(ch)
1✔
162

1✔
163
                        for i, r := range reqs {
2✔
164
                                select {
1✔
165
                                case <-ctx.Done():
×
166
                                        return
×
167

168
                                case ch <- job{i, r}:
1✔
169
                                }
170
                        }
171
                }()
172

173
                return ch
1✔
174
        }
175

176
        type result struct {
1✔
177
                resp interface{}
1✔
178
                err  error
1✔
179
        }
1✔
180
        processJobs := func(ctx context.Context, n int, ch <-chan job) []result {
2✔
181
                var wg sync.WaitGroup
1✔
182
                wg.Add(concurrency)
1✔
183

1✔
184
                results := make([]result, n)
1✔
185
                work := func() {
2✔
186
                        defer wg.Done()
1✔
187

1✔
188
                        for {
2✔
189
                                select {
1✔
190
                                case <-ctx.Done():
×
191
                                        return
×
192

193
                                case j, ok := <-ch:
1✔
194
                                        if !ok {
2✔
195
                                                return
1✔
196
                                        }
1✔
197

198
                                        resp, err := executeRequest(ctx, j.reqCfg)
1✔
199
                                        if err != nil {
2✔
200
                                                results[j.i] = result{err: fmt.Errorf("execute request #%d: %w", j.i, err)}
1✔
201
                                                continue
1✔
202
                                        }
203

204
                                        results[j.i] = result{resp: resp}
1✔
205
                                }
206
                        }
207
                }
208

209
                for i := 0; i < concurrency; i++ {
2✔
210
                        go work()
1✔
211
                }
1✔
212

213
                wg.Wait()
1✔
214
                return results
1✔
215
        }
216

217
        validate := validator.New()
1✔
218
        validate.RegisterTagNameFunc(func(fld reflect.StructField) string {
2✔
219
                name := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0]
1✔
220
                if name == "-" {
1✔
221
                        return ""
×
222
                }
×
223
                return name
1✔
224
        })
225
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
226
                if len(args) < 1 {
1✔
227
                        return nil, tengo.ErrWrongNumArguments
×
228
                }
×
229

230
                ctx, cancel := context.WithCancel(ctx)
1✔
231
                defer cancel()
1✔
232

1✔
233
                reqs, err := argsToRequestConfigs(args, validate)
1✔
234
                if err != nil {
2✔
235
                        return nil, fmt.Errorf("execute request: %w", err)
1✔
236
                }
1✔
237

238
                results := processJobs(ctx, len(reqs), requestsChan(ctx, reqs))
1✔
239

1✔
240
                var ret tengo.Array
1✔
241
                for i, res := range results {
2✔
242
                        if res.err != nil {
2✔
243
                                ret.Value = append(ret.Value, &tengo.Error{
1✔
244
                                        Value: &tengo.Map{
1✔
245
                                                Value: map[string]tengo.Object{
1✔
246
                                                        "request": args[i],
1✔
247
                                                        "error":   &tengo.String{Value: res.err.Error()},
1✔
248
                                                },
1✔
249
                                        },
1✔
250
                                })
1✔
251
                                continue
1✔
252
                        }
253

254
                        o, err := tengo.FromInterface(res.resp)
1✔
255
                        if err != nil {
1✔
256
                                return nil, fmt.Errorf("execute request: translate response: %s: %w", args[i], err)
×
257
                        }
×
258

259
                        ret.Value = append(ret.Value, o)
1✔
260
                }
261

262
                return &ret, nil
1✔
263
        }
264
}
265

266
func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) {
1✔
267
        u, ok := typeURLs[typ]
1✔
268
        if !ok {
2✔
269
                return nil, fmt.Errorf("new asset: unexpected type: %s", typ)
1✔
270
        }
1✔
271

272
        return &tengo.Map{
1✔
273
                Value: map[string]tengo.Object{
1✔
274
                        "type": &tengo.String{Value: typ},
1✔
275
                        "data": &tengo.Map{
1✔
276
                                Value: map[string]tengo.Object{
1✔
277
                                        "@type": &tengo.String{Value: u},
1✔
278
                                },
1✔
279
                        },
1✔
280
                },
1✔
281
        }, nil
1✔
282
}
283

284
func argsToRequestConfigs(args []tengo.Object, validate *validator.Validate) ([]RequestConfig, error) {
1✔
285
        reqs := make([]RequestConfig, 0, len(args))
1✔
286
        for _, arg := range args {
2✔
287
                var r RequestConfig
1✔
288
                defaults.SetDefaults(&r)
1✔
289
                if err := structmap.AsStructWithTag("mapstructure", tengo.ToInterface(arg), &r); err != nil {
1✔
290
                        return nil, fmt.Errorf("map arg to request config: %s: %w", arg, err)
×
291
                }
×
292

293
                if err := validate.Struct(r); err != nil {
2✔
294
                        return nil, fmt.Errorf("validate request config: %s, %w", arg, err)
1✔
295
                }
1✔
296

297
                reqs = append(reqs, r)
1✔
298
        }
299
        return reqs, nil
1✔
300
}
301

302
func knownTypeURLs() map[string]string {
1✔
303
        typeURLs := make(map[string]string, 12)
1✔
304
        for _, typ := range []string{
1✔
305
                "bucket", "dashboard", "experiment", "feature_table", "group",
1✔
306
                "job", "metric", "model", "application", "table", "topic", "user",
1✔
307
        } {
2✔
308
                typeURLs[typ] = typeURL(typ)
1✔
309
        }
1✔
310
        return typeURLs
1✔
311
}
312

313
func typeURL(typ string) string {
1✔
314
        const prefix = "type.googleapis.com/"
1✔
315

1✔
316
        var msg proto.Message
1✔
317
        switch typ {
1✔
318
        case "bucket":
1✔
319
                msg = &v1beta2.Bucket{}
1✔
320
        case "dashboard":
1✔
321
                msg = &v1beta2.Dashboard{}
1✔
322
        case "experiment":
1✔
323
                msg = &v1beta2.Experiment{}
1✔
324
        case "feature_table":
1✔
325
                msg = &v1beta2.FeatureTable{}
1✔
326
        case "group":
1✔
327
                msg = &v1beta2.Group{}
1✔
328
        case "job":
1✔
329
                msg = &v1beta2.Job{}
1✔
330
        case "metric":
1✔
331
                msg = &v1beta2.Metric{}
1✔
332
        case "model":
1✔
333
                msg = &v1beta2.Model{}
1✔
334
        case "application":
1✔
335
                msg = &v1beta2.Application{}
1✔
336
        case "table":
1✔
337
                msg = &v1beta2.Table{}
1✔
338
        case "topic":
1✔
339
                msg = &v1beta2.Topic{}
1✔
340
        case "user":
1✔
341
                msg = &v1beta2.User{}
1✔
342
        default:
×
343
                panic(fmt.Errorf("unexpected type name: %s", typ))
×
344
        }
345

346
        return prefix + (string)(msg.ProtoReflect().Descriptor().FullName())
1✔
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc