• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 23730456890

30 Mar 2026 06:06AM UTC coverage: 71.003% (+20.5%) from 50.458%
23730456890

push

github

web-flow
feat: replace Asset model with native Entity+Edge model (#512)

* feat: replace Asset model with native Entity+Edge model

Replace v1beta2.Asset (rigid proto with typed Any data, separate Owners,
Lineage, Labels fields) with a native Entity+Edge model defined in
raystack/meteor/v1beta1.

Entity has flat properties (structpb.Struct) instead of typed Any wrappers.
All relationships (lineage, ownership) are now first-class Edge objects.
This aligns Meteor with Compass v2's entity-graph model and enables an
open type system where new entity types need no proto changes.

Key changes:
- New proto: raystack/meteor/v1beta1/record.proto (Entity + Edge)
- Record wraps Entity + []Edge instead of *Asset
- All 31 extractors migrated to produce Entity + Edges
- All 3 processors work with Entity.Properties directly
- Compass sink is now a thin Entity->UpsertEntity + Edge->UpsertEdge mapper
- All other sinks (kafka, file, http, stencil, frontier, gcs) updated
- Deleted all 14 typed data schemas (Table, Dashboard, Topic, etc.)
- Fixed structpb.NewStruct compatibility with map[string]string
- Fixed tengo script processor property loss during roundtrip

* chore: remove stale generated files and unused test helpers

- Delete duplicate models/models/raystack/ directory (buf generation artifact)
- Delete test/utils/any.go (BuildAny no longer needed without anypb)
- Delete test/utils/struct.go (BuildStruct no longer needed)

* chore: remove dead TryParseMapToProto and parseMapToProto functions

No production callers remain after the Entity migration.
Test helper replaced with local newStruct() using structpb.NewStruct directly.

* refactor: simplify codebase after Entity model migration

- Delete utils/custom_properties.go — inline AsMap/NewStruct in enrich processor
- Delete knownEntityTypes() whitelist in HTTP extractor — open type system
- Inline getSource() wrapper in stencil sink
- Simplify labels processor to use AsMap/NewStruct pattern (preserves all properties)
- Si... (continued)

952 of 1299 new or added lines in 48 files covered. (73.29%)

14 existing lines in 7 files now uncovered.

5889 of 8294 relevant lines covered (71.0%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/plugins/processors/script/tengo_script.go
1
package script
2

3
import (
4
        "context"
5
        _ "embed" // used to print the embedded assets
6
        "fmt"
7

8
        "github.com/MakeNowJust/heredoc"
9
        "github.com/d5/tengo/v2"
10
        "github.com/raystack/meteor/models"
11
        meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1"
12
        "github.com/raystack/meteor/plugins"
13
        "github.com/raystack/meteor/plugins/internal/tengoutil"
14
        "github.com/raystack/meteor/plugins/internal/tengoutil/structmap"
15
        "github.com/raystack/meteor/registry"
16
        log "github.com/raystack/salt/observability/logger"
17
)
18

19
func init() {
1✔
20
        if err := registry.Processors.Register("script", func() plugins.Processor {
1✔
21
                return New(plugins.GetLog())
×
22
        }); err != nil {
×
23
                return
×
24
        }
×
25
}
26

27
//go:embed README.md
28
var summary string
29

30
type Config struct {
31
        Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
32
        Script string `mapstructure:"script" validate:"required"`
33
}
34

35
// Processor executes the configured Tengo script to transform the given asset
36
// record.
37
type Processor struct {
38
        plugins.BasePlugin
39
        config Config
40
        logger log.Logger
41

42
        compiled *tengo.Compiled
43
}
44

45
var sampleConfig = heredoc.Doc(`
46
        engine: tengo
47
        script: |
48
          asset.name = asset.name + " (modified)"
49
`)
50

51
var info = plugins.Info{
52
        Description:  "Transform the asset with a Tengo script",
53
        SampleConfig: sampleConfig,
54
        Summary:      summary,
55
        Tags:         []string{"processor", "transform", "script"},
56
}
57

58
// New create a new processor
59
func New(logger log.Logger) *Processor {
1✔
60
        p := &Processor{
1✔
61
                logger: logger,
1✔
62
        }
1✔
63
        p.BasePlugin = plugins.NewBasePlugin(info, &p.config)
1✔
64

1✔
65
        return p
1✔
66
}
1✔
67

68
func (p *Processor) Init(ctx context.Context, config plugins.Config) error {
1✔
69
        if err := p.BasePlugin.Init(ctx, config); err != nil {
2✔
70
                return fmt.Errorf("script processor init: %w", err)
1✔
71
        }
1✔
72

73
        s, err := tengoutil.NewSecureScript(([]byte)(p.config.Script), map[string]interface{}{
1✔
74
                "asset": map[string]interface{}{},
1✔
75
        })
1✔
76
        if err != nil {
1✔
77
                return fmt.Errorf("script processor init: %w", err)
×
78
        }
×
79

80
        compiled, err := s.Compile()
1✔
81
        if err != nil {
2✔
82
                return fmt.Errorf("script processor init: compile script: %w", err)
1✔
83
        }
1✔
84

85
        p.compiled = compiled
1✔
86

1✔
87
        return nil
1✔
88
}
89

90
// Process processes the data
91
func (p *Processor) Process(ctx context.Context, src models.Record) (models.Record, error) {
1✔
92
        m, err := structmap.AsMap(src.Entity())
1✔
93
        if err != nil {
1✔
UNCOV
94
                return models.Record{}, fmt.Errorf("script processor: %w", err)
×
UNCOV
95
        }
×
96

97
        assetMap, ok := m.(map[string]interface{})
1✔
98
        if !ok {
1✔
NEW
99
                return models.Record{}, fmt.Errorf("script processor: expected map[string]interface{}, got %T", m)
×
NEW
100
        }
×
101

102
        c := p.compiled.Clone()
1✔
103
        if err := c.Set("asset", assetMap); err != nil {
1✔
104
                return models.Record{}, fmt.Errorf("script processor: set asset into vm: %w", err)
×
105
        }
×
106

107
        if err := c.RunContext(ctx); err != nil {
2✔
108
                return models.Record{}, fmt.Errorf("script processor: run script: %w", err)
1✔
109
        }
1✔
110

111
        // Merge the result back into the original map.
112
        // Tengo returns only modified fields from an ImmutableMap, so we merge
113
        // the script output on top of the original to preserve unmodified fields.
114
        resultMap := c.Get("asset").Map()
1✔
115
        for k, v := range resultMap {
2✔
116
                assetMap[k] = v
1✔
117
        }
1✔
118

119
        var transformed *meteorv1beta1.Entity
1✔
120
        if err := structmap.AsStruct(assetMap, &transformed); err != nil {
2✔
121
                return models.Record{}, fmt.Errorf("script processor: overwrite asset: %w", err)
1✔
122
        }
1✔
123

124
        return models.NewRecord(transformed, src.Edges()...), nil
1✔
125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc