• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 23730456890

30 Mar 2026 06:06AM UTC coverage: 71.003% (+20.5%) from 50.458%
23730456890

push

github

web-flow
feat: replace Asset model with native Entity+Edge model (#512)

* feat: replace Asset model with native Entity+Edge model

Replace v1beta2.Asset (rigid proto with typed Any data, separate Owners,
Lineage, Labels fields) with a native Entity+Edge model defined in
raystack/meteor/v1beta1.

Entity has flat properties (structpb.Struct) instead of typed Any wrappers.
All relationships (lineage, ownership) are now first-class Edge objects.
This aligns Meteor with Compass v2's entity-graph model and enables an
open type system where new entity types need no proto changes.

Key changes:
- New proto: raystack/meteor/v1beta1/record.proto (Entity + Edge)
- Record wraps Entity + []Edge instead of *Asset
- All 31 extractors migrated to produce Entity + Edges
- All 3 processors work with Entity.Properties directly
- Compass sink is now a thin Entity->UpsertEntity + Edge->UpsertEdge mapper
- All other sinks (kafka, file, http, stencil, frontier, gcs) updated
- Deleted all 14 typed data schemas (Table, Dashboard, Topic, etc.)
- Fixed structpb.NewStruct compatibility with map[string]string
- Fixed tengo script processor property loss during roundtrip

* chore: remove stale generated files and unused test helpers

- Delete duplicate models/models/raystack/ directory (buf generation artifact)
- Delete test/utils/any.go (BuildAny no longer needed without anypb)
- Delete test/utils/struct.go (BuildStruct no longer needed)

* chore: remove dead TryParseMapToProto and parseMapToProto functions

No production callers remain after the Entity migration.
Test helper replaced with local newStruct() using structpb.NewStruct directly.

* refactor: simplify codebase after Entity model migration

- Delete utils/custom_properties.go — inline AsMap/NewStruct in enrich processor
- Delete knownEntityTypes() whitelist in HTTP extractor — open type system
- Inline getSource() wrapper in stencil sink
- Simplify labels processor to use AsMap/NewStruct pattern (preserves all properties)
- Si... (continued)

952 of 1299 new or added lines in 48 files covered. (73.29%)

14 existing lines in 7 files now uncovered.

5889 of 8294 relevant lines covered (71.0%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/plugins/processors/enrich/processor.go
1
package enrich
2

3
import (
4
        "context"
5
        _ "embed"
6
        "fmt"
7

8
        "github.com/raystack/meteor/models"
9
        "github.com/raystack/meteor/plugins"
10
        "github.com/raystack/meteor/registry"
11
        log "github.com/raystack/salt/observability/logger"
12
        "google.golang.org/protobuf/types/known/structpb"
13
)
14

15
//go:embed README.md
16
var summary string
17

18
type Config struct {
19
        Attributes map[string]interface{} `mapstructure:"attributes" validate:"required"`
20
}
21

22
// Processor work in a list of data
23
type Processor struct {
24
        plugins.BasePlugin
25
        config Config
26
        logger log.Logger
27
}
28

29
var sampleConfig = `
30
# Enrichment configuration
31
# attributes:
32
#   fieldA: valueA
33
#   fieldB: valueB`
34

35
var info = plugins.Info{
36
        Description:  "Append custom fields to records",
37
        SampleConfig: sampleConfig,
38
        Summary:      summary,
39
        Tags:         []string{"processor", "transform"},
40
}
41

42
// New create a new processor
43
func New(logger log.Logger) *Processor {
×
44
        p := &Processor{
×
45
                logger: logger,
×
46
        }
×
47
        p.BasePlugin = plugins.NewBasePlugin(info, &p.config)
×
48

×
49
        return p
×
50
}
×
51

52
// Process processes the data
53
func (p *Processor) Init(ctx context.Context, config plugins.Config) (err error) {
×
54
        if err = p.BasePlugin.Init(ctx, config); err != nil {
×
55
                return err
×
56
        }
×
57

58
        return
×
59
}
60

61
// Process processes the data
62
func (p *Processor) Process(ctx context.Context, src models.Record) (dst models.Record, err error) {
×
NEW
63
        entity := src.Entity()
×
NEW
64
        p.logger.Debug("enriching record", "record", entity.GetUrn())
×
65

×
NEW
66
        props := entity.GetProperties()
×
NEW
67
        var customProps map[string]interface{}
×
NEW
68
        if props != nil {
×
NEW
69
                customProps = props.AsMap()
×
NEW
70
        }
×
NEW
71
        if customProps == nil {
×
NEW
72
                customProps = make(map[string]interface{})
×
NEW
73
        }
×
74

75
        // update custom properties using value from config
76
        for key, value := range p.config.Attributes {
×
77
                stringVal, ok := value.(string)
×
78
                if ok {
×
79
                        customProps[key] = stringVal
×
80
                }
×
81
        }
82

83
        // save custom properties
NEW
84
        newProps, err := structpb.NewStruct(customProps)
×
85
        if err != nil {
×
NEW
86
                return src, fmt.Errorf("set properties: %w", err)
×
87
        }
×
NEW
88
        entity.Properties = newProps
×
89

×
NEW
90
        return models.NewRecord(entity, src.Edges()...), nil
×
91
}
92

93
func init() {
×
94
        if err := registry.Processors.Register("enrich", func() plugins.Processor {
×
95
                return New(plugins.GetLog())
×
96
        }); err != nil {
×
97
                return
×
98
        }
×
99
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc