• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 24607997127

18 Apr 2026 03:36PM UTC coverage: 73.994% (-1.0%) from 74.959%
24607997127

push

github

ravisuhag
fix: update test fixtures to match enriched extractor output

- elastic: add number_of_shards and number_of_replicas to expected props
- gcs: add requester_pays and versioning_enabled to expected fixture
- redash: add created_at, updated_at, is_archived, is_draft to expected
- grafana: add datasource entities and dashboard props (folder, tags, etc.)
- kafka: add cleanup_policy, min_insync_replicas, replication_factor, retention_ms
- snowflake: add WithSkipForeignKeys option for VCR test compatibility
- bigquery: handle Docker image unavailability gracefully in TestMain
- file sink: update test output files for derived_from edge type

6 of 10 new or added lines in 1 file covered. (60.0%)

439 existing lines in 17 files now uncovered.

6655 of 8994 relevant lines covered (73.99%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.48
/plugins/extractors/http/execute_script.go
1
package http
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "reflect"
9
        "strings"
10
        "sync"
11

12
        "github.com/d5/tengo/v2"
13
        "github.com/go-playground/validator/v10"
14
        "github.com/mcuadros/go-defaults"
15
        "github.com/raystack/meteor/models"
16
        "github.com/raystack/meteor/plugins"
17
        "github.com/raystack/meteor/plugins/internal/tengoutil"
18
        "github.com/raystack/meteor/plugins/internal/tengoutil/structmap"
19
)
20

21
func (e *Extractor) executeScript(ctx context.Context, res any, scriptCfg Script, emit plugins.Emit) error {
1✔
22
        s, err := tengoutil.NewSecureScript(
1✔
23
                ([]byte)(scriptCfg.Source), e.scriptGlobals(ctx, res, emit),
1✔
24
        )
1✔
25
        if err != nil {
1✔
26
                return err
×
27
        }
×
28

29
        s.SetMaxAllocs(scriptCfg.MaxAllocs)
1✔
30
        s.SetMaxConstObjects(scriptCfg.MaxConstObjects)
1✔
31

1✔
32
        c, err := s.Compile()
1✔
33
        if err != nil {
1✔
34
                return fmt.Errorf("compile: %w", err)
×
35
        }
×
36

37
        if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) {
2✔
38
                return fmt.Errorf("run: %w", err)
1✔
39
        }
1✔
40

41
        err = e.convertTengoObjToRequest(c.Get("request").Value())
1✔
42
        if err != nil {
1✔
43
                return err
×
44
        }
×
45

46
        return nil
1✔
47
}
48

49
func (e *Extractor) scriptGlobals(ctx context.Context, res any, emit plugins.Emit) map[string]any {
1✔
50
        req, err := e.convertRequestToTengoObj()
1✔
51
        if err != nil {
1✔
52
                e.logger.Error(err.Error())
×
53
        }
×
54

55
        return map[string]any{
1✔
56
                "recipe_scope": &tengo.String{Value: e.UrnScope},
1✔
57
                "request":      req,
1✔
58
                "response":     res,
1✔
59
                "new_entity": &tengo.UserFunction{
1✔
60
                        Name:  "new_entity",
1✔
61
                        Value: newEntityWrapper(),
1✔
62
                },
1✔
63
                "emit": &tengo.UserFunction{
1✔
64
                        Name:  "emit",
1✔
65
                        Value: emitWrapper(emit),
1✔
66
                },
1✔
67
                "execute_request": &tengo.UserFunction{
1✔
68
                        Name:  "execute_request",
1✔
69
                        Value: executeRequestWrapper(ctx, e.config.Concurrency, e.executeRequest),
1✔
70
                },
1✔
71
                "exit": &tengo.UserFunction{
1✔
72
                        Name: "exit",
1✔
73
                        Value: func(...tengo.Object) (tengo.Object, error) {
2✔
74
                                return nil, errUserExit
1✔
75
                        },
1✔
76
                },
77
        }
78
}
79

80
func (e *Extractor) convertTengoObjToRequest(obj any) error {
1✔
81
        r, err := json.Marshal(obj)
1✔
82
        if err != nil {
1✔
83
                return err
×
84
        }
×
85
        err = json.Unmarshal(r, &e.config.Request)
1✔
86
        if err != nil {
1✔
87
                return err
×
88
        }
×
89
        return nil
1✔
90
}
91
func (e *Extractor) convertRequestToTengoObj() (tengo.Object, error) {
1✔
92
        var res map[string]any
1✔
93
        r, err := json.Marshal(e.config.Request)
1✔
94
        if err != nil {
1✔
95
                return nil, err
×
96
        }
×
97
        err = json.Unmarshal(r, &res)
1✔
98
        if err != nil {
1✔
99
                return nil, err
×
100
        }
×
101
        return tengo.FromInterface(res)
1✔
102
}
103

104
func newEntityWrapper() tengo.CallableFunc {
1✔
105
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
106
                if len(args) != 1 {
2✔
107
                        return nil, tengo.ErrWrongNumArguments
1✔
108
                }
1✔
109

110
                typ, ok := tengo.ToString(args[0])
1✔
111
                if !ok {
1✔
112
                        return nil, tengo.ErrInvalidArgumentType{
×
113
                                Name:     "typ",
×
114
                                Expected: "string(compatible)",
×
115
                                Found:    args[0].TypeName(),
×
116
                        }
×
117
                }
×
118

119
                return newEntity(typ)
1✔
120
        }
121
}
122

123
func emitWrapper(emit plugins.Emit) tengo.CallableFunc {
1✔
124
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
125
                if len(args) != 1 {
2✔
126
                        return nil, tengo.ErrWrongNumArguments
1✔
127
                }
1✔
128

129
                m, ok := tengo.ToInterface(args[0]).(map[string]any)
1✔
130
                if !ok {
2✔
131
                        return nil, tengo.ErrInvalidArgumentType{
1✔
132
                                Name:     "entity",
1✔
133
                                Expected: "Map",
1✔
134
                                Found:    args[0].TypeName(),
1✔
135
                        }
1✔
136
                }
1✔
137

138
                // Extract known fields from the map to build an Entity
139
                urn, _ := m["urn"].(string)
1✔
140
                typ, _ := m["type"].(string)
1✔
141
                name, _ := m["name"].(string)
1✔
142
                source, _ := m["source"].(string)
1✔
143

1✔
144
                // Build properties from the map. If a "properties" key exists, merge
1✔
145
                // its contents directly (avoid nesting properties.properties).
1✔
146
                props := make(map[string]any)
1✔
147
                if p, ok := m["properties"].(map[string]any); ok {
2✔
148
                        for k, v := range p {
2✔
149
                                props[k] = v
1✔
150
                        }
1✔
151
                }
152
                for k, v := range m {
2✔
153
                        switch k {
1✔
154
                        case "urn", "type", "name", "source", "description", "properties":
1✔
155
                                // already handled
UNCOV
156
                        default:
×
UNCOV
157
                                props[k] = v
×
158
                        }
159
                }
160

161
                entity := models.NewEntity(urn, typ, name, source, props)
1✔
162

1✔
163
                // Set description if present
1✔
164
                if desc, ok := m["description"].(string); ok && desc != "" {
1✔
UNCOV
165
                        entity.Description = desc
×
UNCOV
166
                }
×
167

168
                emit(models.NewRecord(entity))
1✔
169

1✔
170
                return tengo.UndefinedValue, nil
1✔
171
        }
172
}
173

174
func executeRequestWrapper(ctx context.Context, concurrency int, executeRequest executeRequestFunc) tengo.CallableFunc {
1✔
175
        type job struct {
1✔
176
                i      int
1✔
177
                reqCfg RequestConfig
1✔
178
        }
1✔
179
        requestsChan := func(ctx context.Context, reqs []RequestConfig) <-chan job {
2✔
180
                ch := make(chan job)
1✔
181

1✔
182
                go func() {
2✔
183
                        defer close(ch)
1✔
184

1✔
185
                        for i, r := range reqs {
2✔
186
                                select {
1✔
UNCOV
187
                                case <-ctx.Done():
×
UNCOV
188
                                        return
×
189

190
                                case ch <- job{i, r}:
1✔
191
                                }
192
                        }
193
                }()
194

195
                return ch
1✔
196
        }
197

198
        type result struct {
1✔
199
                resp any
1✔
200
                err  error
1✔
201
        }
1✔
202
        processJobs := func(ctx context.Context, n int, ch <-chan job) []result {
2✔
203
                var wg sync.WaitGroup
1✔
204
                wg.Add(concurrency)
1✔
205

1✔
206
                results := make([]result, n)
1✔
207
                work := func() {
2✔
208
                        defer wg.Done()
1✔
209

1✔
210
                        for {
2✔
211
                                select {
1✔
UNCOV
212
                                case <-ctx.Done():
×
UNCOV
213
                                        return
×
214

215
                                case j, ok := <-ch:
1✔
216
                                        if !ok {
2✔
217
                                                return
1✔
218
                                        }
1✔
219

220
                                        resp, err := executeRequest(ctx, j.reqCfg)
1✔
221
                                        if err != nil {
2✔
222
                                                results[j.i] = result{err: fmt.Errorf("execute request #%d: %w", j.i, err)}
1✔
223
                                                continue
1✔
224
                                        }
225

226
                                        results[j.i] = result{resp: resp}
1✔
227
                                }
228
                        }
229
                }
230

231
                for i := 0; i < concurrency; i++ {
2✔
232
                        go work()
1✔
233
                }
1✔
234

235
                wg.Wait()
1✔
236
                return results
1✔
237
        }
238

239
        validate := validator.New()
1✔
240
        validate.RegisterTagNameFunc(func(fld reflect.StructField) string {
2✔
241
                name := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0]
1✔
242
                if name == "-" {
1✔
243
                        return ""
×
244
                }
×
245
                return name
1✔
246
        })
247
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
248
                if len(args) < 1 {
1✔
UNCOV
249
                        return nil, tengo.ErrWrongNumArguments
×
UNCOV
250
                }
×
251

252
                ctx, cancel := context.WithCancel(ctx)
1✔
253
                defer cancel()
1✔
254

1✔
255
                reqs, err := argsToRequestConfigs(args, validate)
1✔
256
                if err != nil {
2✔
257
                        return nil, fmt.Errorf("execute request: %w", err)
1✔
258
                }
1✔
259

260
                results := processJobs(ctx, len(reqs), requestsChan(ctx, reqs))
1✔
261

1✔
262
                var ret tengo.Array
1✔
263
                for i, res := range results {
2✔
264
                        if res.err != nil {
2✔
265
                                ret.Value = append(ret.Value, &tengo.Error{
1✔
266
                                        Value: &tengo.Map{
1✔
267
                                                Value: map[string]tengo.Object{
1✔
268
                                                        "request": args[i],
1✔
269
                                                        "error":   &tengo.String{Value: res.err.Error()},
1✔
270
                                                },
1✔
271
                                        },
1✔
272
                                })
1✔
273
                                continue
1✔
274
                        }
275

276
                        o, err := tengo.FromInterface(res.resp)
1✔
277
                        if err != nil {
1✔
UNCOV
278
                                return nil, fmt.Errorf("execute request: translate response: %s: %w", args[i], err)
×
UNCOV
279
                        }
×
280

281
                        ret.Value = append(ret.Value, o)
1✔
282
                }
283

284
                return &ret, nil
1✔
285
        }
286
}
287

288
func newEntity(typ string) (tengo.Object, error) {
1✔
289
        if typ == "" {
2✔
290
                return nil, fmt.Errorf("new entity: type must not be empty")
1✔
291
        }
1✔
292

293
        return &tengo.Map{
1✔
294
                Value: map[string]tengo.Object{
1✔
295
                        "type":       &tengo.String{Value: typ},
1✔
296
                        "properties": &tengo.Map{Value: map[string]tengo.Object{}},
1✔
297
                },
1✔
298
        }, nil
1✔
299
}
300

301
func argsToRequestConfigs(args []tengo.Object, validate *validator.Validate) ([]RequestConfig, error) {
1✔
302
        reqs := make([]RequestConfig, 0, len(args))
1✔
303
        for _, arg := range args {
2✔
304
                var r RequestConfig
1✔
305
                defaults.SetDefaults(&r)
1✔
306
                if err := structmap.AsStructWithTag("mapstructure", tengo.ToInterface(arg), &r); err != nil {
1✔
UNCOV
307
                        return nil, fmt.Errorf("map arg to request config: %s: %w", arg, err)
×
UNCOV
308
                }
×
309

310
                if err := validate.Struct(r); err != nil {
2✔
311
                        return nil, fmt.Errorf("validate request config: %s, %w", arg, err)
1✔
312
                }
1✔
313

314
                reqs = append(reqs, r)
1✔
315
        }
316
        return reqs, nil
1✔
317
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc