• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / locate / 1583

12 Jun 2025 03:42PM UTC coverage: 96.229% (-0.1%) from 96.362%
1583

push

travis-pro

web-flow
Factor filtering hosts into its own function for the registrations endpoint (#226)

* Considers only NDT registrations for siteinfo GeoJSON format

We don't want a separate GeoJSON feature for every experiment on every machine.
Instead, we only want a single feature for each machine. NDT runs on every
machine, so this should accurately represent the entire fleet. This is overly
NDT-centric, unfortunately, but is a reasonable start, I think.

* Factors filtering hosts into a separate function

Both the Hosts() and Geo() function use the filtering. I also renamed
Machines() to Hosts(), since this is more accurate as the returned values
experiment hostnames, not machine names.

* Sets appropriate headers for the registrations endpoint

Namely, we want a permissive CORS policy to allow any origin to make use of the
data provided by this endpoint.

* Refactors the filterHosts() function

The previous logic was not scalable and was difficult to read.

* Adds a machine property to the GeoJSON for registrations geo format

The machine format will be slightly different than the standart M-Lab machine
naming. Standard formatting is like <machine>-<site>, but this is going to be
<site>-<machine>, since that is the order that BYOS now uses.

38 of 39 new or added lines in 2 files covered. (97.44%)

3 existing lines in 1 file now uncovered.

2016 of 2095 relevant lines covered (96.23%)

1.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/handler/handler.go
1
// Package handler provides a client and handlers for responding to locate
2
// requests.
3
package handler
4

5
import (
6
        "bytes"
7
        "context"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "html/template"
12
        "math/rand"
13
        "net/http"
14
        "net/url"
15
        "path"
16
        "strconv"
17
        "strings"
18
        "time"
19

20
        "github.com/google/uuid"
21
        log "github.com/sirupsen/logrus"
22
        "gopkg.in/square/go-jose.v2/jwt"
23

24
        "github.com/m-lab/go/rtx"
25
        v2 "github.com/m-lab/locate/api/v2"
26
        "github.com/m-lab/locate/clientgeo"
27
        "github.com/m-lab/locate/heartbeat"
28
        "github.com/m-lab/locate/limits"
29
        "github.com/m-lab/locate/metrics"
30
        "github.com/m-lab/locate/siteinfo"
31
        "github.com/m-lab/locate/static"
32
        prom "github.com/prometheus/client_golang/api/prometheus/v1"
33
        "github.com/prometheus/common/model"
34
)
35

36
var (
37
        errFailedToLookupClient = errors.New("Failed to look up client location")
38
        tooManyRequests         = "Too many periodic requests. Please contact support@measurementlab.net."
39
)
40

41
// Signer defines how access tokens are signed.
42
type Signer interface {
43
        Sign(cl jwt.Claims) (string, error)
44
}
45

46
type Limiter interface {
47
        IsLimited(ip, ua string) (limits.LimitStatus, error)
48
}
49

50
// Client contains state needed for xyz.
51
type Client struct {
52
        Signer
53
        project string
54
        LocatorV2
55
        ClientLocator
56
        PrometheusClient
57
        targetTmpl       *template.Template
58
        agentLimits      limits.Agents
59
        ipLimiter        Limiter
60
        earlyExitClients map[string]bool
61
}
62

63
// LocatorV2 defines how the Nearest handler requests machines nearest to the
64
// client.
65
type LocatorV2 interface {
66
        Nearest(service string, lat, lon float64, opts *heartbeat.NearestOptions) (*heartbeat.TargetInfo, error)
67
        heartbeat.StatusTracker
68
}
69

70
// ClientLocator defines the interfeace for looking up the client geo location.
71
type ClientLocator interface {
72
        Locate(req *http.Request) (*clientgeo.Location, error)
73
}
74

75
// PrometheusClient defines the interface to query Prometheus.
76
type PrometheusClient interface {
77
        Query(ctx context.Context, query string, ts time.Time, opts ...prom.Option) (model.Value, prom.Warnings, error)
78
}
79

80
type paramOpts struct {
81
        raw       url.Values
82
        version   string
83
        ranks     map[string]int
84
        svcParams map[string]float64
85
}
86

87
func init() {
1✔
88
        log.SetFormatter(&log.JSONFormatter{})
1✔
89
        log.SetLevel(log.InfoLevel)
1✔
90
}
1✔
91

92
// NewClient creates a new client.
93
func NewClient(project string, private Signer, locatorV2 LocatorV2, client ClientLocator,
94
        prom PrometheusClient, lmts limits.Agents, limiter Limiter, earlyExitClients []string) *Client {
1✔
95
        // Convert slice to map for O(1) lookups
1✔
96
        earlyExitMap := make(map[string]bool)
1✔
97
        for _, client := range earlyExitClients {
1✔
98
                earlyExitMap[client] = true
×
99
        }
×
100
        return &Client{
1✔
101
                Signer:           private,
1✔
102
                project:          project,
1✔
103
                LocatorV2:        locatorV2,
1✔
104
                ClientLocator:    client,
1✔
105
                PrometheusClient: prom,
1✔
106
                targetTmpl:       template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
107
                agentLimits:      lmts,
1✔
108
                ipLimiter:        limiter,
1✔
109
                earlyExitClients: earlyExitMap,
1✔
110
        }
1✔
111
}
112

113
// NewClientDirect creates a new client with a target template using only the target machine.
114
func NewClientDirect(project string, private Signer, locatorV2 LocatorV2, client ClientLocator, prom PrometheusClient) *Client {
1✔
115
        return &Client{
1✔
116
                Signer:           private,
1✔
117
                project:          project,
1✔
118
                LocatorV2:        locatorV2,
1✔
119
                ClientLocator:    client,
1✔
120
                PrometheusClient: prom,
1✔
121
                // Useful for the locatetest package when running a local server.
1✔
122
                targetTmpl: template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
123
        }
1✔
124
}
1✔
125

126
func (c *Client) extraParams(hostname string, index int, p paramOpts) url.Values {
1✔
127
        v := url.Values{}
1✔
128

1✔
129
        // Add client parameters.
1✔
130
        for key := range p.raw {
2✔
131
                if strings.HasPrefix(key, "client_") {
2✔
132
                        // note: we only use the first value.
1✔
133
                        v.Set(key, p.raw.Get(key))
1✔
134
                }
1✔
135

136
                val, ok := p.svcParams[key]
1✔
137
                if ok && rand.Float64() < val {
2✔
138
                        v.Set(key, p.raw.Get(key))
1✔
139
                }
1✔
140
        }
141

142
        // Add early_exit parameter for specified clients
143
        clientName := p.raw.Get("client_name")
1✔
144
        if clientName != "" && c.earlyExitClients[clientName] {
2✔
145
                v.Set(static.EarlyExitParameter, static.EarlyExitDefaultValue)
1✔
146
        }
1✔
147

148
        // Add Locate Service version.
149
        v.Set("locate_version", p.version)
1✔
150

1✔
151
        // Add metro rank.
1✔
152
        rank, ok := p.ranks[hostname]
1✔
153
        if ok {
2✔
154
                v.Set("metro_rank", strconv.Itoa(rank))
1✔
155
        }
1✔
156

157
        // Add result index.
158
        v.Set("index", strconv.Itoa(index))
1✔
159

1✔
160
        return v
1✔
161
}
162

163
// Nearest uses an implementation of the LocatorV2 interface to look up
164
// nearest servers.
165
func (c *Client) Nearest(rw http.ResponseWriter, req *http.Request) {
1✔
166
        req.ParseForm()
1✔
167
        result := v2.NearestResult{}
1✔
168
        setHeaders(rw)
1✔
169

1✔
170
        if c.limitRequest(time.Now().UTC(), req) {
2✔
171
                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
172
                writeResult(rw, result.Error.Status, &result)
1✔
173
                metrics.RequestsTotal.WithLabelValues("nearest", "request limit", http.StatusText(result.Error.Status)).Inc()
1✔
174
                return
1✔
175
        }
1✔
176

177
        // Check rate limit for IP and UA.
178
        if c.ipLimiter != nil {
2✔
179
                // Get the IP address from the request. X-Forwarded-For is guaranteed to
1✔
180
                // be set by AppEngine.
1✔
181
                ip := req.Header.Get("X-Forwarded-For")
1✔
182
                ips := strings.Split(ip, ",")
1✔
183
                if len(ips) > 0 {
2✔
184
                        ip = strings.TrimSpace(ips[0])
1✔
185
                }
1✔
186
                if ip != "" {
2✔
187
                        // An empty UA is technically possible.
1✔
188
                        ua := req.Header.Get("User-Agent")
1✔
189
                        status, err := c.ipLimiter.IsLimited(ip, ua)
1✔
190
                        if err != nil {
2✔
191
                                // Log error but don't block request (fail open).
1✔
192
                                // TODO: Add tests for this path.
1✔
193
                                log.Printf("Rate limiter error: %v", err)
1✔
194
                        } else if status.IsLimited {
3✔
195
                                // Log IP and UA and block the request.
1✔
196
                                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
197
                                metrics.RequestsTotal.WithLabelValues("nearest", "rate limit",
1✔
198
                                        http.StatusText(result.Error.Status)).Inc()
1✔
199
                                // If the client provided a client_name, we want to know how many times
1✔
200
                                // that client_name was rate limited. This may be empty, which is fine.
1✔
201
                                clientName := req.Form.Get("client_name")
1✔
202
                                metrics.RateLimitedTotal.WithLabelValues(clientName, status.LimitType).Inc()
1✔
203

1✔
204
                                log.Printf("Rate limit (%s) exceeded for IP: %s, client: %s, UA: %s", ip,
1✔
205
                                        status.LimitType, clientName, ua)
1✔
206
                                writeResult(rw, result.Error.Status, &result)
1✔
207
                                return
1✔
208
                        }
1✔
209
                } else {
1✔
210
                        // This should never happen if Locate is deployed on AppEngine.
1✔
211
                        log.Println("Cannot find IP address for rate limiting.")
1✔
212
                }
1✔
213
        }
214

215
        experiment, service := getExperimentAndService(req.URL.Path)
1✔
216

1✔
217
        // Look up client location.
1✔
218
        loc, err := c.checkClientLocation(rw, req)
1✔
219
        if err != nil {
2✔
220
                status := http.StatusServiceUnavailable
1✔
221
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", status)
1✔
222
                writeResult(rw, result.Error.Status, &result)
1✔
223
                metrics.RequestsTotal.WithLabelValues("nearest", "client location",
1✔
224
                        http.StatusText(result.Error.Status)).Inc()
1✔
225
                return
1✔
226
        }
1✔
227

228
        // Parse client location.
229
        lat, errLat := strconv.ParseFloat(loc.Latitude, 64)
1✔
230
        lon, errLon := strconv.ParseFloat(loc.Longitude, 64)
1✔
231
        if errLat != nil || errLon != nil {
2✔
232
                result.Error = v2.NewError("client", errFailedToLookupClient.Error(), http.StatusInternalServerError)
1✔
233
                writeResult(rw, result.Error.Status, &result)
1✔
234
                metrics.RequestsTotal.WithLabelValues("nearest", "parse client location",
1✔
235
                        http.StatusText(result.Error.Status)).Inc()
1✔
236
                return
1✔
237
        }
1✔
238

239
        // Find the nearest targets using the client parameters.
240
        q := req.URL.Query()
1✔
241
        t := q.Get("machine-type")
1✔
242
        country := req.Header.Get("X-AppEngine-Country")
1✔
243
        sites := q["site"]
1✔
244
        org := q.Get("org")
1✔
245
        strict := false
1✔
246
        if qsStrict, err := strconv.ParseBool(q.Get("strict")); err == nil {
1✔
247
                strict = qsStrict
×
248
        }
×
249
        // If strict, override the country from the AppEngine header with the one in
250
        // the querystring.
251
        if strict {
1✔
252
                country = q.Get("country")
×
253
        }
×
254
        opts := &heartbeat.NearestOptions{Type: t, Country: country, Sites: sites, Org: org, Strict: strict}
1✔
255
        targetInfo, err := c.LocatorV2.Nearest(service, lat, lon, opts)
1✔
256
        if err != nil {
2✔
257
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", http.StatusInternalServerError)
1✔
258
                writeResult(rw, result.Error.Status, &result)
1✔
259
                metrics.RequestsTotal.WithLabelValues("nearest", "server location",
1✔
260
                        http.StatusText(result.Error.Status)).Inc()
1✔
261
                return
1✔
262
        }
1✔
263

264
        pOpts := paramOpts{
1✔
265
                raw:       req.Form,
1✔
266
                version:   "v2",
1✔
267
                ranks:     targetInfo.Ranks,
1✔
268
                svcParams: static.ServiceParams,
1✔
269
        }
1✔
270
        // Populate target URLs and write out response.
1✔
271
        c.populateURLs(targetInfo.Targets, targetInfo.URLs, experiment, pOpts)
1✔
272
        result.Results = targetInfo.Targets
1✔
273
        writeResult(rw, http.StatusOK, &result)
1✔
274
        metrics.RequestsTotal.WithLabelValues("nearest", "success", http.StatusText(http.StatusOK)).Inc()
1✔
275
}
276

277
// Live is a minimal handler to indicate that the server is operating at all.
278
func (c *Client) Live(rw http.ResponseWriter, req *http.Request) {
1✔
279
        fmt.Fprintf(rw, "ok")
1✔
280
}
1✔
281

282
// Ready reports whether the server is working as expected and ready to serve requests.
283
func (c *Client) Ready(rw http.ResponseWriter, req *http.Request) {
1✔
284
        if c.LocatorV2.Ready() {
2✔
285
                fmt.Fprintf(rw, "ok")
1✔
286
        } else {
2✔
287
                rw.WriteHeader(http.StatusInternalServerError)
1✔
288
                fmt.Fprintf(rw, "not ready")
1✔
289
        }
1✔
290
}
291

292
// Registrations returns information about registered machines. There are 3
293
// supported query parameters:
294
//
295
// * format - defines the format of the returned JSON
296
// * org - limits results to only records for the given organization
297
// * exp - limits results to only records for the given experiment (e.g., ndt)
298
//
299
// The "org" and "exp" query parameters are currently only supported by the
300
// default or "machines" format.
301
func (c *Client) Registrations(rw http.ResponseWriter, req *http.Request) {
1✔
302
        var err error
1✔
303
        var result interface{}
1✔
304

1✔
305
        setHeaders(rw)
1✔
306

1✔
307
        q := req.URL.Query()
1✔
308
        format := q.Get("format")
1✔
309

1✔
310
        switch format {
1✔
311
        case "geo":
×
NEW
312
                result, err = siteinfo.Geo(c.LocatorV2.Instances(), q)
×
313
        default:
1✔
314
                result, err = siteinfo.Hosts(c.LocatorV2.Instances(), q)
1✔
315
        }
316

317
        if err != nil {
2✔
318
                v2Error := v2.NewError("siteinfo", err.Error(), http.StatusInternalServerError)
1✔
319
                writeResult(rw, http.StatusInternalServerError, v2Error)
1✔
320
                return
1✔
321
        }
1✔
322

323
        writeResult(rw, http.StatusOK, result)
1✔
324
}
325

326
// checkClientLocation looks up the client location and copies the location
327
// headers to the response writer.
328
func (c *Client) checkClientLocation(rw http.ResponseWriter, req *http.Request) (*clientgeo.Location, error) {
1✔
329
        // Lookup the client location using the client request.
1✔
330
        loc, err := c.Locate(req)
1✔
331
        if err != nil {
2✔
332
                return nil, errFailedToLookupClient
1✔
333
        }
1✔
334

335
        // Copy location headers to response writer.
336
        for key := range loc.Headers {
2✔
337
                rw.Header().Set(key, loc.Headers.Get(key))
1✔
338
        }
1✔
339

340
        return loc, nil
1✔
341
}
342

343
// populateURLs populates each set of URLs using the target configuration.
344
func (c *Client) populateURLs(targets []v2.Target, ports static.Ports, exp string, pOpts paramOpts) {
1✔
345
        for i, target := range targets {
2✔
346
                token := c.getAccessToken(target.Machine, exp)
1✔
347
                params := c.extraParams(target.Machine, i, pOpts)
1✔
348
                targets[i].URLs = c.getURLs(ports, target.Hostname, token, params)
1✔
349
        }
1✔
350
}
351

352
// getAccessToken allocates a new access token using the given machine name as
353
// the intended audience and the subject as the target service.
354
func (c *Client) getAccessToken(machine, subject string) string {
1✔
355
        // Create the token. The same access token is reused for every URL of a
1✔
356
        // target port.
1✔
357
        // A uuid is added to the claims so that each new token is unique.
1✔
358
        cl := jwt.Claims{
1✔
359
                Issuer:   static.IssuerLocate,
1✔
360
                Subject:  subject,
1✔
361
                Audience: jwt.Audience{machine},
1✔
362
                Expiry:   jwt.NewNumericDate(time.Now().Add(time.Minute)),
1✔
363
                ID:       uuid.NewString(),
1✔
364
        }
1✔
365
        token, err := c.Sign(cl)
1✔
366
        // Sign errors can only happen due to a misconfiguration of the key.
1✔
367
        // A good config will remain good.
1✔
368
        rtx.PanicOnError(err, "signing claims has failed")
1✔
369
        return token
1✔
370
}
1✔
371

372
// getURLs creates URLs for the named experiment, running on the named machine
373
// for each given port. Every URL will include an `access_token=` parameter,
374
// authorizing the measurement.
375
func (c *Client) getURLs(ports static.Ports, hostname, token string, extra url.Values) map[string]string {
1✔
376
        urls := map[string]string{}
1✔
377
        // For each port config, prepare the target url with access_token and
1✔
378
        // complete host field.
1✔
379
        for _, target := range ports {
2✔
380
                name := target.String()
1✔
381
                params := url.Values{}
1✔
382
                params.Set("access_token", token)
1✔
383
                for key := range extra {
2✔
384
                        // note: we only use the first value.
1✔
385
                        params.Set(key, extra.Get(key))
1✔
386
                }
1✔
387
                target.RawQuery = params.Encode()
1✔
388

1✔
389
                host := &bytes.Buffer{}
1✔
390
                err := c.targetTmpl.Execute(host, map[string]string{
1✔
391
                        "Hostname": hostname,
1✔
392
                        "Ports":    target.Host, // from URL template, so typically just the ":port".
1✔
393
                })
1✔
394
                rtx.PanicOnError(err, "bad template evaluation")
1✔
395
                target.Host = host.String()
1✔
396
                urls[name] = target.String()
1✔
397
        }
398
        return urls
1✔
399
}
400

401
// limitRequest determines whether a client request should be rate-limited.
402
func (c *Client) limitRequest(now time.Time, req *http.Request) bool {
1✔
403
        agent := req.Header.Get("User-Agent")
1✔
404
        l, ok := c.agentLimits[agent]
1✔
405
        if !ok {
2✔
406
                // No limit defined for user agent.
1✔
407
                return false
1✔
408
        }
1✔
409
        return l.IsLimited(now)
1✔
410
}
411

412
// setHeaders sets the response headers for "nearest" requests.
413
func setHeaders(rw http.ResponseWriter) {
1✔
414
        // Set CORS policy to allow third-party websites to use returned resources.
1✔
415
        rw.Header().Set("Content-Type", "application/json")
1✔
416
        rw.Header().Set("Access-Control-Allow-Origin", "*")
1✔
417
        // Prevent caching of result.
1✔
418
        // See also: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
1✔
419
        rw.Header().Set("Cache-Control", "no-store")
1✔
420
}
1✔
421

422
// writeResult marshals the result and writes the result to the response writer.
423
func writeResult(rw http.ResponseWriter, status int, result interface{}) {
1✔
424
        b, err := json.MarshalIndent(result, "", "  ")
1✔
425
        // Errors are only possible when marshalling incompatible types, like functions.
1✔
426
        rtx.PanicOnError(err, "Failed to format result")
1✔
427
        rw.WriteHeader(status)
1✔
428
        rw.Write(b)
1✔
429
}
1✔
430

431
// getExperimentAndService takes an http request path and extracts the last two
432
// fields. For correct requests (e.g. "/v2/nearest/ndt/ndt5"), this will be the
433
// experiment name (e.g. "ndt") and the datatype (e.g. "ndt5").
434
func getExperimentAndService(p string) (string, string) {
1✔
435
        datatype := path.Base(p)
1✔
436
        experiment := path.Base(path.Dir(p))
1✔
437
        return experiment, experiment + "/" + datatype
1✔
438
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc