• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / locate / 1541

11 Mar 2025 01:13PM UTC coverage: 96.688% (+0.2%) from 96.493%
1541

release

travis-pro

web-flow
Update rate limiter to have different configurable limits for IP-only and IP+UA (#219)

54 of 63 new or added lines in 2 files covered. (85.71%)

3 existing lines in 2 files now uncovered.

1985 of 2053 relevant lines covered (96.69%)

1.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.45
/handler/handler.go
1
// Package handler provides a client and handlers for responding to locate
2
// requests.
3
package handler
4

5
import (
6
        "bytes"
7
        "context"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "html/template"
12
        "math/rand"
13
        "net/http"
14
        "net/url"
15
        "path"
16
        "strconv"
17
        "strings"
18
        "time"
19

20
        "github.com/google/uuid"
21
        log "github.com/sirupsen/logrus"
22
        "gopkg.in/square/go-jose.v2/jwt"
23

24
        "github.com/m-lab/go/rtx"
25
        v2 "github.com/m-lab/locate/api/v2"
26
        "github.com/m-lab/locate/clientgeo"
27
        "github.com/m-lab/locate/heartbeat"
28
        "github.com/m-lab/locate/limits"
29
        "github.com/m-lab/locate/metrics"
30
        "github.com/m-lab/locate/siteinfo"
31
        "github.com/m-lab/locate/static"
32
        prom "github.com/prometheus/client_golang/api/prometheus/v1"
33
        "github.com/prometheus/common/model"
34
)
35

36
var (
37
        errFailedToLookupClient = errors.New("Failed to look up client location")
38
        tooManyRequests         = "Too many periodic requests. Please contact support@measurementlab.net."
39
)
40

41
// Signer defines how access tokens are signed.
42
type Signer interface {
43
        Sign(cl jwt.Claims) (string, error)
44
}
45

46
type Limiter interface {
47
        IsLimited(ip, ua string) (limits.LimitStatus, error)
48
}
49

50
// Client contains state needed for xyz.
51
type Client struct {
52
        Signer
53
        project string
54
        LocatorV2
55
        ClientLocator
56
        PrometheusClient
57
        targetTmpl  *template.Template
58
        agentLimits limits.Agents
59
        ipLimiter   Limiter
60
}
61

62
// LocatorV2 defines how the Nearest handler requests machines nearest to the
63
// client.
64
type LocatorV2 interface {
65
        Nearest(service string, lat, lon float64, opts *heartbeat.NearestOptions) (*heartbeat.TargetInfo, error)
66
        heartbeat.StatusTracker
67
}
68

69
// ClientLocator defines the interfeace for looking up the client geo location.
70
type ClientLocator interface {
71
        Locate(req *http.Request) (*clientgeo.Location, error)
72
}
73

74
// PrometheusClient defines the interface to query Prometheus.
75
type PrometheusClient interface {
76
        Query(ctx context.Context, query string, ts time.Time, opts ...prom.Option) (model.Value, prom.Warnings, error)
77
}
78

79
type paramOpts struct {
80
        raw       url.Values
81
        version   string
82
        ranks     map[string]int
83
        svcParams map[string]float64
84
}
85

86
func init() {
1✔
87
        log.SetFormatter(&log.JSONFormatter{})
1✔
88
        log.SetLevel(log.InfoLevel)
1✔
89
}
1✔
90

91
// NewClient creates a new client.
92
func NewClient(project string, private Signer, locatorV2 LocatorV2, client ClientLocator,
93
        prom PrometheusClient, lmts limits.Agents, limiter Limiter) *Client {
1✔
94
        return &Client{
1✔
95
                Signer:           private,
1✔
96
                project:          project,
1✔
97
                LocatorV2:        locatorV2,
1✔
98
                ClientLocator:    client,
1✔
99
                PrometheusClient: prom,
1✔
100
                targetTmpl:       template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
101
                agentLimits:      lmts,
1✔
102
                ipLimiter:        limiter,
1✔
103
        }
1✔
104
}
1✔
105

106
// NewClientDirect creates a new client with a target template using only the target machine.
107
func NewClientDirect(project string, private Signer, locatorV2 LocatorV2, client ClientLocator, prom PrometheusClient) *Client {
1✔
108
        return &Client{
1✔
109
                Signer:           private,
1✔
110
                project:          project,
1✔
111
                LocatorV2:        locatorV2,
1✔
112
                ClientLocator:    client,
1✔
113
                PrometheusClient: prom,
1✔
114
                // Useful for the locatetest package when running a local server.
1✔
115
                targetTmpl: template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
116
        }
1✔
117
}
1✔
118

119
func extraParams(hostname string, index int, p paramOpts) url.Values {
1✔
120
        v := url.Values{}
1✔
121
        // Add client parameters.
1✔
122
        for key := range p.raw {
2✔
123
                if strings.HasPrefix(key, "client_") {
2✔
124
                        // note: we only use the first value.
1✔
125
                        v.Set(key, p.raw.Get(key))
1✔
126
                }
1✔
127

128
                val, ok := p.svcParams[key]
1✔
129
                if ok && rand.Float64() < val {
2✔
130
                        v.Set(key, p.raw.Get(key))
1✔
131
                }
1✔
132
        }
133

134
        // Add Locate Service version.
135
        v.Set("locate_version", p.version)
1✔
136

1✔
137
        // Add metro rank.
1✔
138
        rank, ok := p.ranks[hostname]
1✔
139
        if ok {
2✔
140
                v.Set("metro_rank", strconv.Itoa(rank))
1✔
141
        }
1✔
142

143
        // Add result index.
144
        v.Set("index", strconv.Itoa(index))
1✔
145

1✔
146
        return v
1✔
147
}
148

149
// Nearest uses an implementation of the LocatorV2 interface to look up
150
// nearest servers.
151
func (c *Client) Nearest(rw http.ResponseWriter, req *http.Request) {
1✔
152
        req.ParseForm()
1✔
153
        result := v2.NearestResult{}
1✔
154
        setHeaders(rw)
1✔
155

1✔
156
        if c.limitRequest(time.Now().UTC(), req) {
2✔
157
                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
158
                writeResult(rw, result.Error.Status, &result)
1✔
159
                metrics.RequestsTotal.WithLabelValues("nearest", "request limit", http.StatusText(result.Error.Status)).Inc()
1✔
160
                return
1✔
161
        }
1✔
162

163
        // Check rate limit for IP and UA.
164
        if c.ipLimiter != nil {
2✔
165
                // Get the IP address from the request. X-Forwarded-For is guaranteed to
1✔
166
                // be set by AppEngine.
1✔
167
                ip := req.Header.Get("X-Forwarded-For")
1✔
168
                ips := strings.Split(ip, ",")
1✔
169
                if len(ips) > 0 {
2✔
170
                        ip = strings.TrimSpace(ips[0])
1✔
171
                }
1✔
172
                if ip != "" {
2✔
173
                        // An empty UA is technically possible.
1✔
174
                        ua := req.Header.Get("User-Agent")
1✔
175
                        status, err := c.ipLimiter.IsLimited(ip, ua)
1✔
176
                        if err != nil {
2✔
177
                                // Log error but don't block request (fail open).
1✔
178
                                // TODO: Add tests for this path.
1✔
179
                                log.Printf("Rate limiter error: %v", err)
1✔
180
                        } else if status.IsLimited {
3✔
181
                                // Log IP and UA and block the request.
1✔
182
                                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
183
                                metrics.RequestsTotal.WithLabelValues("nearest", "rate limit",
1✔
184
                                        http.StatusText(result.Error.Status)).Inc()
1✔
185
                                // If the client provided a client_name, we want to know how many times
1✔
186
                                // that client_name was rate limited. This may be empty, which is fine.
1✔
187
                                clientName := req.Form.Get("client_name")
1✔
188
                                metrics.RateLimitedTotal.WithLabelValues(clientName, status.LimitType).Inc()
1✔
189

1✔
190
                                log.Printf("Rate limit (%s) exceeded for IP: %s, client: %s, UA: %s", ip,
1✔
191
                                        status.LimitType, clientName, ua)
1✔
192
                                writeResult(rw, result.Error.Status, &result)
1✔
193
                                return
1✔
194
                        }
1✔
195
                } else {
1✔
196
                        // This should never happen if Locate is deployed on AppEngine.
1✔
197
                        log.Println("Cannot find IP address for rate limiting.")
1✔
198
                }
1✔
199
        }
200

201
        experiment, service := getExperimentAndService(req.URL.Path)
1✔
202

1✔
203
        // Look up client location.
1✔
204
        loc, err := c.checkClientLocation(rw, req)
1✔
205
        if err != nil {
2✔
206
                status := http.StatusServiceUnavailable
1✔
207
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", status)
1✔
208
                writeResult(rw, result.Error.Status, &result)
1✔
209
                metrics.RequestsTotal.WithLabelValues("nearest", "client location",
1✔
210
                        http.StatusText(result.Error.Status)).Inc()
1✔
211
                return
1✔
212
        }
1✔
213

214
        // Parse client location.
215
        lat, errLat := strconv.ParseFloat(loc.Latitude, 64)
1✔
216
        lon, errLon := strconv.ParseFloat(loc.Longitude, 64)
1✔
217
        if errLat != nil || errLon != nil {
2✔
218
                result.Error = v2.NewError("client", errFailedToLookupClient.Error(), http.StatusInternalServerError)
1✔
219
                writeResult(rw, result.Error.Status, &result)
1✔
220
                metrics.RequestsTotal.WithLabelValues("nearest", "parse client location",
1✔
221
                        http.StatusText(result.Error.Status)).Inc()
1✔
222
                return
1✔
223
        }
1✔
224

225
        // Find the nearest targets using the client parameters.
226
        q := req.URL.Query()
1✔
227
        t := q.Get("machine-type")
1✔
228
        country := req.Header.Get("X-AppEngine-Country")
1✔
229
        sites := q["site"]
1✔
230
        org := q.Get("org")
1✔
231
        strict := false
1✔
232
        if qsStrict, err := strconv.ParseBool(q.Get("strict")); err == nil {
1✔
233
                strict = qsStrict
×
UNCOV
234
        }
×
235
        // If strict, override the country from the AppEngine header with the one in
236
        // the querystring.
237
        if strict {
1✔
238
                country = q.Get("country")
×
UNCOV
239
        }
×
240
        opts := &heartbeat.NearestOptions{Type: t, Country: country, Sites: sites, Org: org, Strict: strict}
1✔
241
        targetInfo, err := c.LocatorV2.Nearest(service, lat, lon, opts)
1✔
242
        if err != nil {
2✔
243
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", http.StatusInternalServerError)
1✔
244
                writeResult(rw, result.Error.Status, &result)
1✔
245
                metrics.RequestsTotal.WithLabelValues("nearest", "server location",
1✔
246
                        http.StatusText(result.Error.Status)).Inc()
1✔
247
                return
1✔
248
        }
1✔
249

250
        pOpts := paramOpts{
1✔
251
                raw:       req.Form,
1✔
252
                version:   "v2",
1✔
253
                ranks:     targetInfo.Ranks,
1✔
254
                svcParams: static.ServiceParams,
1✔
255
        }
1✔
256
        // Populate target URLs and write out response.
1✔
257
        c.populateURLs(targetInfo.Targets, targetInfo.URLs, experiment, pOpts)
1✔
258
        result.Results = targetInfo.Targets
1✔
259
        writeResult(rw, http.StatusOK, &result)
1✔
260
        metrics.RequestsTotal.WithLabelValues("nearest", "success", http.StatusText(http.StatusOK)).Inc()
1✔
261
}
262

263
// Live is a minimal handler to indicate that the server is operating at all.
264
func (c *Client) Live(rw http.ResponseWriter, req *http.Request) {
1✔
265
        fmt.Fprintf(rw, "ok")
1✔
266
}
1✔
267

268
// Ready reports whether the server is working as expected and ready to serve requests.
269
func (c *Client) Ready(rw http.ResponseWriter, req *http.Request) {
1✔
270
        if c.LocatorV2.Ready() {
2✔
271
                fmt.Fprintf(rw, "ok")
1✔
272
        } else {
2✔
273
                rw.WriteHeader(http.StatusInternalServerError)
1✔
274
                fmt.Fprintf(rw, "not ready")
1✔
275
        }
1✔
276
}
277

278
// Registrations returns information about registered machines. There are 3
279
// supported query parameters:
280
//
281
// * format - defines the format of the returned JSON
282
// * org - limits results to only records for the given organization
283
// * exp - limits results to only records for the given experiment (e.g., ndt)
284
func (c *Client) Registrations(rw http.ResponseWriter, req *http.Request) {
1✔
285
        var err error
1✔
286
        var result interface{}
1✔
287

1✔
288
        q := req.URL.Query()
1✔
289
        format := q.Get("format")
1✔
290

1✔
291
        switch format {
1✔
292
        default:
1✔
293
                result, err = siteinfo.Machines(c.LocatorV2.Instances(), q)
1✔
294
        }
295

296
        if err != nil {
2✔
297
                v2Error := v2.NewError("siteinfo", err.Error(), http.StatusInternalServerError)
1✔
298
                writeResult(rw, http.StatusInternalServerError, v2Error)
1✔
299
                return
1✔
300
        }
1✔
301

302
        writeResult(rw, http.StatusOK, result)
1✔
303
}
304

305
// checkClientLocation looks up the client location and copies the location
306
// headers to the response writer.
307
func (c *Client) checkClientLocation(rw http.ResponseWriter, req *http.Request) (*clientgeo.Location, error) {
1✔
308
        // Lookup the client location using the client request.
1✔
309
        loc, err := c.Locate(req)
1✔
310
        if err != nil {
2✔
311
                return nil, errFailedToLookupClient
1✔
312
        }
1✔
313

314
        // Copy location headers to response writer.
315
        for key := range loc.Headers {
2✔
316
                rw.Header().Set(key, loc.Headers.Get(key))
1✔
317
        }
1✔
318

319
        return loc, nil
1✔
320
}
321

322
// populateURLs populates each set of URLs using the target configuration.
323
func (c *Client) populateURLs(targets []v2.Target, ports static.Ports, exp string, pOpts paramOpts) {
1✔
324
        for i, target := range targets {
2✔
325
                token := c.getAccessToken(target.Machine, exp)
1✔
326
                params := extraParams(target.Machine, i, pOpts)
1✔
327
                targets[i].URLs = c.getURLs(ports, target.Hostname, token, params)
1✔
328
        }
1✔
329
}
330

331
// getAccessToken allocates a new access token using the given machine name as
332
// the intended audience and the subject as the target service.
333
func (c *Client) getAccessToken(machine, subject string) string {
1✔
334
        // Create the token. The same access token is reused for every URL of a
1✔
335
        // target port.
1✔
336
        // A uuid is added to the claims so that each new token is unique.
1✔
337
        cl := jwt.Claims{
1✔
338
                Issuer:   static.IssuerLocate,
1✔
339
                Subject:  subject,
1✔
340
                Audience: jwt.Audience{machine},
1✔
341
                Expiry:   jwt.NewNumericDate(time.Now().Add(time.Minute)),
1✔
342
                ID:       uuid.NewString(),
1✔
343
        }
1✔
344
        token, err := c.Sign(cl)
1✔
345
        // Sign errors can only happen due to a misconfiguration of the key.
1✔
346
        // A good config will remain good.
1✔
347
        rtx.PanicOnError(err, "signing claims has failed")
1✔
348
        return token
1✔
349
}
1✔
350

351
// getURLs creates URLs for the named experiment, running on the named machine
352
// for each given port. Every URL will include an `access_token=` parameter,
353
// authorizing the measurement.
354
func (c *Client) getURLs(ports static.Ports, hostname, token string, extra url.Values) map[string]string {
1✔
355
        urls := map[string]string{}
1✔
356
        // For each port config, prepare the target url with access_token and
1✔
357
        // complete host field.
1✔
358
        for _, target := range ports {
2✔
359
                name := target.String()
1✔
360
                params := url.Values{}
1✔
361
                params.Set("access_token", token)
1✔
362
                for key := range extra {
2✔
363
                        // note: we only use the first value.
1✔
364
                        params.Set(key, extra.Get(key))
1✔
365
                }
1✔
366
                target.RawQuery = params.Encode()
1✔
367

1✔
368
                host := &bytes.Buffer{}
1✔
369
                err := c.targetTmpl.Execute(host, map[string]string{
1✔
370
                        "Hostname": hostname,
1✔
371
                        "Ports":    target.Host, // from URL template, so typically just the ":port".
1✔
372
                })
1✔
373
                rtx.PanicOnError(err, "bad template evaluation")
1✔
374
                target.Host = host.String()
1✔
375
                urls[name] = target.String()
1✔
376
        }
377
        return urls
1✔
378
}
379

380
// limitRequest determines whether a client request should be rate-limited.
381
func (c *Client) limitRequest(now time.Time, req *http.Request) bool {
1✔
382
        agent := req.Header.Get("User-Agent")
1✔
383
        l, ok := c.agentLimits[agent]
1✔
384
        if !ok {
2✔
385
                // No limit defined for user agent.
1✔
386
                return false
1✔
387
        }
1✔
388
        return l.IsLimited(now)
1✔
389
}
390

391
// setHeaders sets the response headers for "nearest" requests.
392
func setHeaders(rw http.ResponseWriter) {
1✔
393
        // Set CORS policy to allow third-party websites to use returned resources.
1✔
394
        rw.Header().Set("Content-Type", "application/json")
1✔
395
        rw.Header().Set("Access-Control-Allow-Origin", "*")
1✔
396
        // Prevent caching of result.
1✔
397
        // See also: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
1✔
398
        rw.Header().Set("Cache-Control", "no-store")
1✔
399
}
1✔
400

401
// writeResult marshals the result and writes the result to the response writer.
402
func writeResult(rw http.ResponseWriter, status int, result interface{}) {
1✔
403
        b, err := json.MarshalIndent(result, "", "  ")
1✔
404
        // Errors are only possible when marshalling incompatible types, like functions.
1✔
405
        rtx.PanicOnError(err, "Failed to format result")
1✔
406
        rw.WriteHeader(status)
1✔
407
        rw.Write(b)
1✔
408
}
1✔
409

410
// getExperimentAndService takes an http request path and extracts the last two
411
// fields. For correct requests (e.g. "/v2/nearest/ndt/ndt5"), this will be the
412
// experiment name (e.g. "ndt") and the datatype (e.g. "ndt5").
413
func getExperimentAndService(p string) (string, string) {
1✔
414
        datatype := path.Base(p)
1✔
415
        experiment := path.Base(path.Dir(p))
1✔
416
        return experiment, experiment + "/" + datatype
1✔
417
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc