• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 11607751930

31 Oct 2024 07:23AM UTC coverage: 82.993% (-0.1%) from 83.127%
11607751930

Pull #493

github

ravisuhag
feat: support templating for http sinks
Pull Request #493: feat: support templating for http sinks

45 of 67 new or added lines in 2 files covered. (67.16%)

1 existing line in 1 file now uncovered.

6827 of 8226 relevant lines covered (82.99%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/plugins/sinks/http/http.go
1
package http
2

3
import (
4
        "bytes"
5
        "context"
6
        _ "embed"
7
        "encoding/json"
8
        "fmt"
9
        "io"
10
        "net/http"
11
        "strings"
12
        "text/template"
13

14
        "github.com/MakeNowJust/heredoc"
15
        "github.com/raystack/meteor/metrics/otelhttpclient"
16
        "github.com/raystack/meteor/models"
17
        v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
18
        "github.com/raystack/meteor/plugins"
19
        "github.com/raystack/meteor/registry"
20
        "github.com/raystack/salt/log"
21
)
22

23
//go:embed README.md
24
var summary string
25

26
type Config struct {
27
        URL         string            `mapstructure:"url" validate:"required"`
28
        Headers     map[string]string `mapstructure:"headers"`
29
        Method      string            `mapstructure:"method" validate:"required"`
30
        SuccessCode int               `mapstructure:"success_code" default:"200"`
31
        Script      *struct {
32
                Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
33
                Source string `mapstructure:"source" validate:"required"`
34
        } `mapstructure:"script"`
35
}
36

37
var info = plugins.Info{
38
        Description: "Send metadata to http service",
39
        Summary:     summary,
40
        Tags:        []string{"http", "sink"},
41
        SampleConfig: heredoc.Doc(`
42
        # The url (hostname and route) of the http service
43
        url: https://compass.requestcatcher.com/{{ .Type }}/{{ .Urn }}
44
        method: "PUT"
45
        # Additional HTTP headers, multiple headers value are separated by a comma
46
        headers:
47
          X-Other-Header: value1, value2
48
          script:
49
          engine: tengo
50
          source: |
51
                payload := {
52
                        details: {
53
                                some_key: asset.urn,
54
                                another_key: asset.name
55
                        }
56
                }
57
                sink(payload)
58
        `),
59
}
60

61
type httpClient interface {
62
        Do(*http.Request) (*http.Response, error)
63
}
64

65
type Sink struct {
66
        plugins.BasePlugin
67
        client httpClient
68
        config Config
69
        logger log.Logger
70
}
71

72
func New(c httpClient, logger log.Logger) plugins.Syncer {
1✔
73
        if cl, ok := c.(*http.Client); ok {
2✔
74
                cl.Transport = otelhttpclient.NewHTTPTransport(cl.Transport)
1✔
75
        }
1✔
76

77
        s := &Sink{
1✔
78
                logger: logger,
1✔
79
                client: c,
1✔
80
        }
1✔
81
        s.BasePlugin = plugins.NewBasePlugin(info, &s.config)
1✔
82

1✔
83
        return s
1✔
84
}
85

86
func (s *Sink) Init(ctx context.Context, config plugins.Config) error {
1✔
87
        return s.BasePlugin.Init(ctx, config)
1✔
88
}
1✔
89

90
func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
1✔
91
        for _, record := range batch {
2✔
92
                metadata := record.Data()
1✔
93
                s.logger.Info("sinking record to http", "record", metadata.Urn)
1✔
94
                if err := s.send(ctx, metadata); err != nil {
2✔
95
                        return fmt.Errorf("send data: %w", err)
1✔
96
                }
1✔
97

98
                s.logger.Info("successfully sinked record to http", "record", metadata.Urn)
1✔
99
        }
100

101
        return nil
1✔
102
}
103

104
func (*Sink) Close() error { return nil }
1✔
105

106
func (s *Sink) send(ctx context.Context, asset *v1beta2.Asset) error {
1✔
107
        t := template.Must(template.New("url").Parse(s.config.URL))
1✔
108
        var buf bytes.Buffer
1✔
109
        if err := t.Execute(&buf, asset); err != nil {
1✔
NEW
110
                return fmt.Errorf("build http url: %w", err)
×
NEW
111
        }
×
112
        url := buf.String()
1✔
113
        if s.config.Script != nil {
2✔
114
                return s.executeScript(ctx, url, asset)
1✔
115
        }
1✔
116
        payload, err := json.Marshal(asset)
1✔
117
        if err != nil {
1✔
NEW
118
                return fmt.Errorf("build http payload: %w", err)
×
NEW
119
        }
×
120
        // send request
121
        return s.makeRequest(ctx, url, payload)
1✔
122
}
123
func (s *Sink) makeRequest(ctx context.Context, url string, payload []byte) error {
1✔
124
        req, err := http.NewRequestWithContext(ctx, s.config.Method, url, bytes.NewBuffer(payload))
1✔
125
        if err != nil {
1✔
126
                return err
×
127
        }
×
128

129
        for hdrKey, hdrVal := range s.config.Headers {
2✔
130
                hdrVals := strings.Split(hdrVal, ",")
1✔
131
                for _, val := range hdrVals {
2✔
132
                        req.Header.Add(hdrKey, val)
1✔
133
                }
1✔
134
        }
135

136
        res, err := s.client.Do(req)
1✔
137
        if err != nil {
2✔
138
                return err
1✔
139
        }
1✔
140
        defer plugins.DrainBody(res)
1✔
141

1✔
142
        if res.StatusCode == s.config.SuccessCode {
2✔
143
                return nil
1✔
144
        }
1✔
145

146
        var bodyBytes []byte
1✔
147
        bodyBytes, err = io.ReadAll(res.Body)
1✔
148
        if err != nil {
1✔
149
                return err
×
150
        }
×
151
        err = fmt.Errorf("http returns %d: %v", res.StatusCode, string(bodyBytes))
1✔
152

1✔
153
        switch code := res.StatusCode; {
1✔
154
        case code >= 500:
1✔
155
                return plugins.NewRetryError(err)
1✔
156
        default:
1✔
157
                return err
1✔
158
        }
159
}
160

161
func init() {
1✔
162
        if err := registry.Sinks.Register("http", func() plugins.Syncer {
1✔
163
                return New(&http.Client{}, plugins.GetLog())
×
164
        }); err != nil {
×
165
                panic(err)
×
166
        }
167
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc