• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 23730456890

30 Mar 2026 06:06AM UTC coverage: 71.003% (+20.5%) from 50.458%
23730456890

push

github

web-flow
feat: replace Asset model with native Entity+Edge model (#512)

* feat: replace Asset model with native Entity+Edge model

Replace v1beta2.Asset (rigid proto with typed Any data, separate Owners,
Lineage, Labels fields) with a native Entity+Edge model defined in
raystack/meteor/v1beta1.

Entity has flat properties (structpb.Struct) instead of typed Any wrappers.
All relationships (lineage, ownership) are now first-class Edge objects.
This aligns Meteor with Compass v2's entity-graph model and enables an
open type system where new entity types need no proto changes.

Key changes:
- New proto: raystack/meteor/v1beta1/record.proto (Entity + Edge)
- Record wraps Entity + []Edge instead of *Asset
- All 31 extractors migrated to produce Entity + Edges
- All 3 processors work with Entity.Properties directly
- Compass sink is now a thin Entity->UpsertEntity + Edge->UpsertEdge mapper
- All other sinks (kafka, file, http, stencil, frontier, gcs) updated
- Deleted all 14 typed data schemas (Table, Dashboard, Topic, etc.)
- Fixed structpb.NewStruct compatibility with map[string]string
- Fixed tengo script processor property loss during roundtrip

* chore: remove stale generated files and unused test helpers

- Delete duplicate models/models/raystack/ directory (buf generation artifact)
- Delete test/utils/any.go (BuildAny no longer needed without anypb)
- Delete test/utils/struct.go (BuildStruct no longer needed)

* chore: remove dead TryParseMapToProto and parseMapToProto functions

No production callers remain after the Entity migration.
Test helper replaced with local newStruct() using structpb.NewStruct directly.

* refactor: simplify codebase after Entity model migration

- Delete utils/custom_properties.go — inline AsMap/NewStruct in enrich processor
- Delete knownEntityTypes() whitelist in HTTP extractor — open type system
- Inline getSource() wrapper in stencil sink
- Simplify labels processor to use AsMap/NewStruct pattern (preserves all properties)
- Si... (continued)

952 of 1299 new or added lines in 48 files covered. (73.29%)

14 existing lines in 7 files now uncovered.

5889 of 8294 relevant lines covered (71.0%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.14
/plugins/extractors/http/execute_script.go
1
package http
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "reflect"
9
        "strings"
10
        "sync"
11

12
        "github.com/d5/tengo/v2"
13
        "github.com/go-playground/validator/v10"
14
        "github.com/mcuadros/go-defaults"
15
        "github.com/raystack/meteor/models"
16
        "github.com/raystack/meteor/plugins"
17
        "github.com/raystack/meteor/plugins/internal/tengoutil"
18
        "github.com/raystack/meteor/plugins/internal/tengoutil/structmap"
19
)
20

21
func (e *Extractor) executeScript(ctx context.Context, res interface{}, scriptCfg Script, emit plugins.Emit) error {
1✔
22
        s, err := tengoutil.NewSecureScript(
1✔
23
                ([]byte)(scriptCfg.Source), e.scriptGlobals(ctx, res, emit),
1✔
24
        )
1✔
25
        if err != nil {
1✔
26
                return err
×
27
        }
×
28

29
        s.SetMaxAllocs(scriptCfg.MaxAllocs)
1✔
30
        s.SetMaxConstObjects(scriptCfg.MaxConstObjects)
1✔
31

1✔
32
        c, err := s.Compile()
1✔
33
        if err != nil {
1✔
34
                return fmt.Errorf("compile: %w", err)
×
35
        }
×
36

37
        if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) {
2✔
38
                return fmt.Errorf("run: %w", err)
1✔
39
        }
1✔
40

41
        err = e.convertTengoObjToRequest(c.Get("request").Value())
1✔
42
        if err != nil {
1✔
43
                return err
×
44
        }
×
45

46
        return nil
1✔
47
}
48

49
func (e *Extractor) scriptGlobals(ctx context.Context, res interface{}, emit plugins.Emit) map[string]interface{} {
1✔
50
        req, err := e.convertRequestToTengoObj()
1✔
51
        if err != nil {
1✔
52
                e.logger.Error(err.Error())
×
53
        }
×
54

55
        return map[string]interface{}{
1✔
56
                "recipe_scope": &tengo.String{Value: e.UrnScope},
1✔
57
                "request":      req,
1✔
58
                "response":     res,
1✔
59
                "new_asset": &tengo.UserFunction{
1✔
60
                        Name:  "new_asset",
1✔
61
                        Value: newAssetWrapper(),
1✔
62
                },
1✔
63
                "emit": &tengo.UserFunction{
1✔
64
                        Name:  "emit",
1✔
65
                        Value: emitWrapper(emit),
1✔
66
                },
1✔
67
                "execute_request": &tengo.UserFunction{
1✔
68
                        Name:  "execute_request",
1✔
69
                        Value: executeRequestWrapper(ctx, e.config.Concurrency, e.executeRequest),
1✔
70
                },
1✔
71
                "exit": &tengo.UserFunction{
1✔
72
                        Name: "exit",
1✔
73
                        Value: func(...tengo.Object) (tengo.Object, error) {
2✔
74
                                return nil, errUserExit
1✔
75
                        },
1✔
76
                },
77
        }
78
}
79

80
func (e *Extractor) convertTengoObjToRequest(obj interface{}) error {
1✔
81
        r, err := json.Marshal(obj)
1✔
82
        if err != nil {
1✔
83
                return err
×
84
        }
×
85
        err = json.Unmarshal(r, &e.config.Request)
1✔
86
        if err != nil {
1✔
87
                return err
×
88
        }
×
89
        return nil
1✔
90
}
91
func (e *Extractor) convertRequestToTengoObj() (tengo.Object, error) {
1✔
92
        var res map[string]interface{}
1✔
93
        r, err := json.Marshal(e.config.Request)
1✔
94
        if err != nil {
1✔
95
                return nil, err
×
96
        }
×
97
        err = json.Unmarshal(r, &res)
1✔
98
        if err != nil {
1✔
99
                return nil, err
×
100
        }
×
101
        return tengo.FromInterface(res)
1✔
102
}
103

104
func newAssetWrapper() tengo.CallableFunc {
1✔
105
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
106
                if len(args) != 1 {
2✔
107
                        return nil, tengo.ErrWrongNumArguments
1✔
108
                }
1✔
109

110
                typ, ok := tengo.ToString(args[0])
1✔
111
                if !ok {
1✔
112
                        return nil, tengo.ErrInvalidArgumentType{
×
113
                                Name:     "typ",
×
114
                                Expected: "string(compatible)",
×
115
                                Found:    args[0].TypeName(),
×
116
                        }
×
117
                }
×
118

119
                return newAsset(typ)
1✔
120
        }
121
}
122

123
func emitWrapper(emit plugins.Emit) tengo.CallableFunc {
1✔
124
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
125
                if len(args) != 1 {
2✔
126
                        return nil, tengo.ErrWrongNumArguments
1✔
127
                }
1✔
128

129
                m, ok := tengo.ToInterface(args[0]).(map[string]interface{})
1✔
130
                if !ok {
2✔
131
                        return nil, tengo.ErrInvalidArgumentType{
1✔
132
                                Name:     "asset",
1✔
133
                                Expected: "Map",
1✔
134
                                Found:    args[0].TypeName(),
1✔
135
                        }
1✔
136
                }
1✔
137

138
                // Extract known fields from the map to build an Entity
139
                urn, _ := m["urn"].(string)
1✔
140
                typ, _ := m["type"].(string)
1✔
141
                name, _ := m["name"].(string)
1✔
142
                source, _ := m["service"].(string)
1✔
143

1✔
144
                // Everything else goes into properties
1✔
145
                props := make(map[string]interface{})
1✔
146
                for k, v := range m {
2✔
147
                        switch k {
1✔
148
                        case "urn", "type", "name", "service":
1✔
149
                                // already handled
150
                        default:
1✔
151
                                props[k] = v
1✔
152
                        }
153
                }
154

155
                entity := models.NewEntity(urn, typ, name, source, props)
1✔
156

1✔
157
                // Set description if present
1✔
158
                if desc, ok := m["description"].(string); ok && desc != "" {
1✔
NEW
159
                        entity.Description = desc
×
UNCOV
160
                }
×
161

162
                emit(models.NewRecord(entity))
1✔
163

1✔
164
                return tengo.UndefinedValue, nil
1✔
165
        }
166
}
167

168
func executeRequestWrapper(ctx context.Context, concurrency int, executeRequest executeRequestFunc) tengo.CallableFunc {
1✔
169
        type job struct {
1✔
170
                i      int
1✔
171
                reqCfg RequestConfig
1✔
172
        }
1✔
173
        requestsChan := func(ctx context.Context, reqs []RequestConfig) <-chan job {
2✔
174
                ch := make(chan job)
1✔
175

1✔
176
                go func() {
2✔
177
                        defer close(ch)
1✔
178

1✔
179
                        for i, r := range reqs {
2✔
180
                                select {
1✔
181
                                case <-ctx.Done():
×
182
                                        return
×
183

184
                                case ch <- job{i, r}:
1✔
185
                                }
186
                        }
187
                }()
188

189
                return ch
1✔
190
        }
191

192
        type result struct {
1✔
193
                resp interface{}
1✔
194
                err  error
1✔
195
        }
1✔
196
        processJobs := func(ctx context.Context, n int, ch <-chan job) []result {
2✔
197
                var wg sync.WaitGroup
1✔
198
                wg.Add(concurrency)
1✔
199

1✔
200
                results := make([]result, n)
1✔
201
                work := func() {
2✔
202
                        defer wg.Done()
1✔
203

1✔
204
                        for {
2✔
205
                                select {
1✔
206
                                case <-ctx.Done():
×
207
                                        return
×
208

209
                                case j, ok := <-ch:
1✔
210
                                        if !ok {
2✔
211
                                                return
1✔
212
                                        }
1✔
213

214
                                        resp, err := executeRequest(ctx, j.reqCfg)
1✔
215
                                        if err != nil {
2✔
216
                                                results[j.i] = result{err: fmt.Errorf("execute request #%d: %w", j.i, err)}
1✔
217
                                                continue
1✔
218
                                        }
219

220
                                        results[j.i] = result{resp: resp}
1✔
221
                                }
222
                        }
223
                }
224

225
                for i := 0; i < concurrency; i++ {
2✔
226
                        go work()
1✔
227
                }
1✔
228

229
                wg.Wait()
1✔
230
                return results
1✔
231
        }
232

233
        validate := validator.New()
1✔
234
        validate.RegisterTagNameFunc(func(fld reflect.StructField) string {
2✔
235
                name := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0]
1✔
236
                if name == "-" {
1✔
237
                        return ""
×
238
                }
×
239
                return name
1✔
240
        })
241
        return func(args ...tengo.Object) (tengo.Object, error) {
2✔
242
                if len(args) < 1 {
1✔
243
                        return nil, tengo.ErrWrongNumArguments
×
244
                }
×
245

246
                ctx, cancel := context.WithCancel(ctx)
1✔
247
                defer cancel()
1✔
248

1✔
249
                reqs, err := argsToRequestConfigs(args, validate)
1✔
250
                if err != nil {
2✔
251
                        return nil, fmt.Errorf("execute request: %w", err)
1✔
252
                }
1✔
253

254
                results := processJobs(ctx, len(reqs), requestsChan(ctx, reqs))
1✔
255

1✔
256
                var ret tengo.Array
1✔
257
                for i, res := range results {
2✔
258
                        if res.err != nil {
2✔
259
                                ret.Value = append(ret.Value, &tengo.Error{
1✔
260
                                        Value: &tengo.Map{
1✔
261
                                                Value: map[string]tengo.Object{
1✔
262
                                                        "request": args[i],
1✔
263
                                                        "error":   &tengo.String{Value: res.err.Error()},
1✔
264
                                                },
1✔
265
                                        },
1✔
266
                                })
1✔
267
                                continue
1✔
268
                        }
269

270
                        o, err := tengo.FromInterface(res.resp)
1✔
271
                        if err != nil {
1✔
272
                                return nil, fmt.Errorf("execute request: translate response: %s: %w", args[i], err)
×
273
                        }
×
274

275
                        ret.Value = append(ret.Value, o)
1✔
276
                }
277

278
                return &ret, nil
1✔
279
        }
280
}
281

282
func newAsset(typ string) (tengo.Object, error) {
1✔
283
        if typ == "" {
2✔
284
                return nil, fmt.Errorf("new asset: type must not be empty")
1✔
285
        }
1✔
286

287
        return &tengo.Map{
1✔
288
                Value: map[string]tengo.Object{
1✔
289
                        "type":    &tengo.String{Value: typ},
1✔
290
                        "data":    &tengo.Map{Value: map[string]tengo.Object{}},
1✔
291
                        "lineage": &tengo.Map{Value: map[string]tengo.Object{}},
1✔
292
                        "labels":  &tengo.Map{Value: map[string]tengo.Object{}},
1✔
293
                },
1✔
294
        }, nil
1✔
295
}
296

297
func argsToRequestConfigs(args []tengo.Object, validate *validator.Validate) ([]RequestConfig, error) {
1✔
298
        reqs := make([]RequestConfig, 0, len(args))
1✔
299
        for _, arg := range args {
2✔
300
                var r RequestConfig
1✔
301
                defaults.SetDefaults(&r)
1✔
302
                if err := structmap.AsStructWithTag("mapstructure", tengo.ToInterface(arg), &r); err != nil {
1✔
303
                        return nil, fmt.Errorf("map arg to request config: %s: %w", arg, err)
×
304
                }
×
305

306
                if err := validate.Struct(r); err != nil {
2✔
307
                        return nil, fmt.Errorf("validate request config: %s, %w", arg, err)
1✔
308
                }
1✔
309

310
                reqs = append(reqs, r)
1✔
311
        }
312
        return reqs, nil
1✔
313
}
314

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc