• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

goto / meteor / 8447455724

27 Mar 2024 06:12AM UTC coverage: 83.099% (-0.2%) from 83.305%
8447455724

push

github

web-flow
chore: fix prestodb tests (#56)

6864 of 8260 relevant lines covered (83.1%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/plugins/sinks/http/http.go
1
package http
2

3
import (
4
        "bytes"
5
        "context"
6
        _ "embed"
7
        "encoding/json"
8
        "fmt"
9
        "html/template"
10
        "io"
11
        "net/http"
12
        "strings"
13

14
        "github.com/MakeNowJust/heredoc"
15
        "github.com/goto/meteor/metrics/otelhttpclient"
16
        "github.com/goto/meteor/models"
17
        v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
18
        "github.com/goto/meteor/plugins"
19
        "github.com/goto/meteor/registry"
20
        "github.com/goto/salt/log"
21
)
22

23
//go:embed README.md
24
var summary string
25

26
type Config struct {
27
        URL         string            `mapstructure:"url" validate:"required"`
28
        Headers     map[string]string `mapstructure:"headers"`
29
        Method      string            `mapstructure:"method" validate:"required"`
30
        SuccessCode int               `mapstructure:"success_code" default:"200"`
31
        Script      *struct {
32
                Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
33
                Source string `mapstructure:"source" validate:"required"`
34
        } `mapstructure:"script"`
35
}
36

37
var info = plugins.Info{
38
        Description: "Send metadata to http service",
39
        Summary:     summary,
40
        Tags:        []string{"http", "sink"},
41
        SampleConfig: heredoc.Doc(`
42
        # The url (hostname and route) of the http service
43
        url: https://compass.requestcatcher.com/{{ .Type }}/{{ .Urn }}
44
        method: "PUT"
45
        # Additional HTTP headers, multiple headers value are separated by a comma
46
        headers:
47
          X-Other-Header: value1, value2
48
        script:
49
          engine: tengo
50
          source: |
51
                payload := {
52
                        details: {
53
                                some_key: asset.urn,
54
                                another_key: asset.name
55
                        }
56
                }
57
                sink(payload)
58
        `),
59
}
60

61
type httpClient interface {
62
        Do(*http.Request) (*http.Response, error)
63
}
64

65
type Sink struct {
66
        plugins.BasePlugin
67
        client httpClient
68
        config Config
69
        logger log.Logger
70
}
71

72
func New(c httpClient, logger log.Logger) plugins.Syncer {
1✔
73
        if cl, ok := c.(*http.Client); ok {
2✔
74
                cl.Transport = otelhttpclient.NewHTTPTransport(cl.Transport)
1✔
75
        }
1✔
76

77
        s := &Sink{
1✔
78
                logger: logger,
1✔
79
                client: c,
1✔
80
        }
1✔
81
        s.BasePlugin = plugins.NewBasePlugin(info, &s.config)
1✔
82

1✔
83
        return s
1✔
84
}
85

86
func (s *Sink) Init(ctx context.Context, config plugins.Config) error {
1✔
87
        return s.BasePlugin.Init(ctx, config)
1✔
88
}
1✔
89

90
func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
1✔
91
        for _, record := range batch {
2✔
92
                metadata := record.Data()
1✔
93
                s.logger.Info("sinking record to http", "record", metadata.Urn)
1✔
94

1✔
95
                if err := s.send(ctx, metadata); err != nil {
2✔
96
                        return fmt.Errorf("send data: %w", err)
1✔
97
                }
1✔
98

99
                s.logger.Info("successfully sinked record to http", "record", metadata.Urn)
1✔
100
        }
101

102
        return nil
1✔
103
}
104

105
func (*Sink) Close() error { return nil }
1✔
106

107
func (s *Sink) send(ctx context.Context, asset *v1beta2.Asset) error {
1✔
108
        t := template.Must(template.New("url").Parse(s.config.URL))
1✔
109
        var buf bytes.Buffer
1✔
110
        if err := t.Execute(&buf, asset); err != nil {
1✔
111
                return fmt.Errorf("build http url: %w", err)
×
112
        }
×
113
        url := buf.String()
1✔
114

1✔
115
        if s.config.Script != nil {
2✔
116
                return s.executeScript(ctx, url, asset)
1✔
117
        }
1✔
118

119
        payload, err := json.Marshal(asset)
1✔
120
        if err != nil {
1✔
121
                return fmt.Errorf("build http payload: %w", err)
×
122
        }
×
123

124
        // send request
125
        return s.makeRequest(ctx, url, payload)
1✔
126
}
127

128
func (s *Sink) makeRequest(ctx context.Context, url string, payload []byte) error {
1✔
129
        req, err := http.NewRequestWithContext(ctx, s.config.Method, url, bytes.NewBuffer(payload))
1✔
130
        if err != nil {
1✔
131
                return err
×
132
        }
×
133

134
        for hdrKey, hdrVal := range s.config.Headers {
2✔
135
                hdrVals := strings.Split(hdrVal, ",")
1✔
136
                for _, val := range hdrVals {
2✔
137
                        req.Header.Add(hdrKey, val)
1✔
138
                }
1✔
139
        }
140

141
        res, err := s.client.Do(req)
1✔
142
        if err != nil {
2✔
143
                return err
1✔
144
        }
1✔
145
        defer plugins.DrainBody(res)
1✔
146

1✔
147
        if res.StatusCode == s.config.SuccessCode {
2✔
148
                return nil
1✔
149
        }
1✔
150

151
        var bodyBytes []byte
1✔
152
        bodyBytes, err = io.ReadAll(res.Body)
1✔
153
        if err != nil {
1✔
154
                return err
×
155
        }
×
156
        err = fmt.Errorf("http returns %d: %v", res.StatusCode, string(bodyBytes))
1✔
157

1✔
158
        switch code := res.StatusCode; {
1✔
159
        case code >= 500:
1✔
160
                return plugins.NewRetryError(err)
1✔
161
        default:
1✔
162
                return err
1✔
163
        }
164
}
165

166
func init() {
1✔
167
        if err := registry.Sinks.Register("http", func() plugins.Syncer {
1✔
168
                return New(&http.Client{}, plugins.GetLog())
×
169
        }); err != nil {
×
170
                panic(err)
×
171
        }
172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc