• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / locate / 1638

24 Nov 2025 04:33PM UTC coverage: 91.206% (-1.1%) from 92.348%
1638

Pull #230

travis-pro

web-flow
Merge 2e97a3a44 into f5dbf2c0b
Pull Request #230: feat: improve support for local testing

204 of 248 new or added lines in 7 files covered. (82.26%)

6 existing lines in 2 files now uncovered.

2261 of 2479 relevant lines covered (91.21%)

1.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.8
/handler/handler.go
1
// Package handler provides a client and handlers for responding to locate
2
// requests.
3
package handler
4

5
import (
6
        "bytes"
7
        "context"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "html/template"
12
        "math/rand"
13
        "net"
14
        "net/http"
15
        "net/url"
16
        "path"
17
        "strconv"
18
        "strings"
19
        "time"
20

21
        "github.com/google/uuid"
22
        log "github.com/sirupsen/logrus"
23
        "gopkg.in/square/go-jose.v2/jwt"
24

25
        "github.com/m-lab/go/rtx"
26
        v2 "github.com/m-lab/locate/api/v2"
27
        "github.com/m-lab/locate/clientgeo"
28
        "github.com/m-lab/locate/heartbeat"
29
        "github.com/m-lab/locate/limits"
30
        "github.com/m-lab/locate/metrics"
31
        "github.com/m-lab/locate/siteinfo"
32
        "github.com/m-lab/locate/static"
33
        prom "github.com/prometheus/client_golang/api/prometheus/v1"
34
        "github.com/prometheus/common/model"
35
)
36

37
var (
38
        errFailedToLookupClient = errors.New("Failed to look up client location")
39
        tooManyRequests         = "Too many periodic requests. Please contact support@measurementlab.net."
40
)
41

42
// Signer defines how access tokens are signed.
43
type Signer interface {
44
        Sign(cl jwt.Claims) (string, error)
45
}
46

47
type Limiter interface {
48
        IsLimited(ip, ua string) (limits.LimitStatus, error)
49
}
50

51
// Client contains state needed for xyz.
52
type Client struct {
53
        Signer
54
        project string
55
        LocatorV2
56
        ClientLocator
57
        PrometheusClient
58
        targetTmpl       *template.Template
59
        agentLimits      limits.Agents
60
        ipLimiter        Limiter
61
        earlyExitClients map[string]bool
62
        jwtVerifier      Verifier
63
}
64

65
// LocatorV2 defines how the Nearest handler requests machines nearest to the
66
// client.
67
type LocatorV2 interface {
68
        Nearest(service string, lat, lon float64, opts *heartbeat.NearestOptions) (*heartbeat.TargetInfo, error)
69
        heartbeat.StatusTracker
70
}
71

72
// ClientLocator defines the interface for looking up the client geolocation.
73
type ClientLocator interface {
74
        Locate(req *http.Request) (*clientgeo.Location, error)
75
}
76

77
// PrometheusClient defines the interface to query Prometheus.
78
type PrometheusClient interface {
79
        Query(ctx context.Context, query string, ts time.Time, opts ...prom.Option) (model.Value, prom.Warnings, error)
80
}
81

82
type paramOpts struct {
83
        raw       url.Values
84
        version   string
85
        ranks     map[string]int
86
        svcParams map[string]float64
87
}
88

89
func init() {
1✔
90
        log.SetFormatter(&log.JSONFormatter{})
1✔
91
        log.SetLevel(log.InfoLevel)
1✔
92
}
1✔
93

94
// NewClient creates a new client.
95
func NewClient(project string, private Signer, locatorV2 LocatorV2, client ClientLocator,
96
        prom PrometheusClient, lmts limits.Agents, limiter Limiter, earlyExitClients []string, jwtVerifier Verifier) *Client {
1✔
97
        // Convert slice to map for O(1) lookups
1✔
98
        earlyExitMap := make(map[string]bool)
1✔
99
        for _, client := range earlyExitClients {
1✔
100
                earlyExitMap[client] = true
×
101
        }
×
102
        return &Client{
1✔
103
                Signer:           private,
1✔
104
                project:          project,
1✔
105
                LocatorV2:        locatorV2,
1✔
106
                ClientLocator:    client,
1✔
107
                PrometheusClient: prom,
1✔
108
                targetTmpl:       template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
109
                agentLimits:      lmts,
1✔
110
                ipLimiter:        limiter,
1✔
111
                earlyExitClients: earlyExitMap,
1✔
112
                jwtVerifier:      jwtVerifier,
1✔
113
        }
1✔
114
}
115

116
// NewClientDirect creates a new client with a target template using only the target machine.
117
func NewClientDirect(project string, private Signer, locatorV2 LocatorV2, client ClientLocator, prom PrometheusClient) *Client {
1✔
118
        return &Client{
1✔
119
                Signer:           private,
1✔
120
                project:          project,
1✔
121
                LocatorV2:        locatorV2,
1✔
122
                ClientLocator:    client,
1✔
123
                PrometheusClient: prom,
1✔
124
                // Useful for the locatetest package when running a local server.
1✔
125
                targetTmpl: template.Must(template.New("name").Parse("{{.Hostname}}{{.Ports}}")),
1✔
126
        }
1✔
127
}
1✔
128

129
func (c *Client) extraParams(hostname string, index int, p paramOpts) url.Values {
1✔
130
        v := url.Values{}
1✔
131

1✔
132
        // Add client parameters.
1✔
133
        for key := range p.raw {
2✔
134
                if strings.HasPrefix(key, "client_") {
2✔
135
                        // note: we only use the first value.
1✔
136
                        v.Set(key, p.raw.Get(key))
1✔
137
                }
1✔
138

139
                val, ok := p.svcParams[key]
1✔
140
                if ok && rand.Float64() < val {
2✔
141
                        v.Set(key, p.raw.Get(key))
1✔
142
                }
1✔
143
        }
144

145
        // Add early_exit parameter for specified clients
146
        clientName := p.raw.Get("client_name")
1✔
147
        if clientName != "" && c.earlyExitClients[clientName] {
2✔
148
                v.Set(static.EarlyExitParameter, static.EarlyExitDefaultValue)
1✔
149
        }
1✔
150

151
        // Add Locate Service version.
152
        v.Set("locate_version", p.version)
1✔
153

1✔
154
        // Add metro rank.
1✔
155
        rank, ok := p.ranks[hostname]
1✔
156
        if ok {
2✔
157
                v.Set("metro_rank", strconv.Itoa(rank))
1✔
158
        }
1✔
159

160
        // Add result index.
161
        v.Set("index", strconv.Itoa(index))
1✔
162

1✔
163
        return v
1✔
164
}
165

166
// Nearest uses an implementation of the LocatorV2 interface to look up
167
// nearest servers.
168
func (c *Client) Nearest(rw http.ResponseWriter, req *http.Request) {
1✔
169
        req.ParseForm()
1✔
170
        result := v2.NearestResult{}
1✔
171
        setHeaders(rw)
1✔
172

1✔
173
        if c.limitRequest(time.Now().UTC(), req) {
2✔
174
                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
175
                writeResult(rw, result.Error.Status, &result)
1✔
176
                metrics.RequestsTotal.WithLabelValues("nearest", "request limit", http.StatusText(result.Error.Status)).Inc()
1✔
177
                return
1✔
178
        }
1✔
179

180
        // Check rate limit for IP and UA.
181
        if c.ipLimiter != nil {
2✔
182
                ip := getRemoteAddr(req)
1✔
183
                if ip != "" {
2✔
184
                        // An empty UA is technically possible.
1✔
185
                        ua := req.Header.Get("User-Agent")
1✔
186
                        status, err := c.ipLimiter.IsLimited(ip, ua)
1✔
187
                        if err != nil {
2✔
188
                                // Log error but don't block request (fail open).
1✔
189
                                // TODO: Add tests for this path.
1✔
190
                                log.Printf("Rate limiter error: %v", err)
1✔
191
                        } else if status.IsLimited {
3✔
192
                                // Log IP and UA and block the request.
1✔
193
                                result.Error = v2.NewError("client", tooManyRequests, http.StatusTooManyRequests)
1✔
194
                                metrics.RequestsTotal.WithLabelValues("nearest", "rate limit",
1✔
195
                                        http.StatusText(result.Error.Status)).Inc()
1✔
196
                                // If the client provided a client_name, we want to know how many times
1✔
197
                                // that client_name was rate limited. This may be empty, which is fine.
1✔
198
                                clientName := req.Form.Get("client_name")
1✔
199
                                metrics.RateLimitedTotal.WithLabelValues(clientName, status.LimitType).Inc()
1✔
200

1✔
201
                                log.Printf("Rate limit (%s) exceeded for IP: %s, client: %s, UA: %s", ip,
1✔
202
                                        status.LimitType, clientName, ua)
1✔
203
                                writeResult(rw, result.Error.Status, &result)
1✔
204
                                return
1✔
205
                        }
1✔
UNCOV
206
                } else {
×
UNCOV
207
                        // This should never happen if Locate is deployed on AppEngine.
×
UNCOV
208
                        log.Println("Cannot find IP address for rate limiting.")
×
UNCOV
209
                }
×
210
        }
211

212
        experiment, service := getExperimentAndService(req.URL.Path)
1✔
213

1✔
214
        // Look up client location.
1✔
215
        loc, err := c.checkClientLocation(rw, req)
1✔
216
        if err != nil {
2✔
217
                status := http.StatusServiceUnavailable
1✔
218
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", status)
1✔
219
                writeResult(rw, result.Error.Status, &result)
1✔
220
                metrics.RequestsTotal.WithLabelValues("nearest", "client location",
1✔
221
                        http.StatusText(result.Error.Status)).Inc()
1✔
222
                return
1✔
223
        }
1✔
224

225
        // Parse client location.
226
        lat, errLat := strconv.ParseFloat(loc.Latitude, 64)
1✔
227
        lon, errLon := strconv.ParseFloat(loc.Longitude, 64)
1✔
228
        if errLat != nil || errLon != nil {
2✔
229
                result.Error = v2.NewError("client", errFailedToLookupClient.Error(), http.StatusInternalServerError)
1✔
230
                writeResult(rw, result.Error.Status, &result)
1✔
231
                metrics.RequestsTotal.WithLabelValues("nearest", "parse client location",
1✔
232
                        http.StatusText(result.Error.Status)).Inc()
1✔
233
                return
1✔
234
        }
1✔
235

236
        // Find the nearest targets using the client parameters.
237
        q := req.URL.Query()
1✔
238
        t := q.Get("machine-type")
1✔
239
        country := req.Header.Get("X-AppEngine-Country")
1✔
240
        sites := q["site"]
1✔
241
        org := q.Get("org")
1✔
242
        strict := false
1✔
243
        if qsStrict, err := strconv.ParseBool(q.Get("strict")); err == nil {
1✔
244
                strict = qsStrict
×
245
        }
×
246
        // If strict, override the country from the AppEngine header with the one in
247
        // the querystring.
248
        if strict {
1✔
249
                country = q.Get("country")
×
250
        }
×
251
        opts := &heartbeat.NearestOptions{Type: t, Country: country, Sites: sites, Org: org, Strict: strict}
1✔
252
        targetInfo, err := c.LocatorV2.Nearest(service, lat, lon, opts)
1✔
253
        if err != nil {
2✔
254
                result.Error = v2.NewError("nearest", "Failed to lookup nearest machines", http.StatusInternalServerError)
1✔
255
                writeResult(rw, result.Error.Status, &result)
1✔
256
                metrics.RequestsTotal.WithLabelValues("nearest", "server location",
1✔
257
                        http.StatusText(result.Error.Status)).Inc()
1✔
258
                return
1✔
259
        }
1✔
260

261
        pOpts := paramOpts{
1✔
262
                raw:       req.Form,
1✔
263
                version:   "v2",
1✔
264
                ranks:     targetInfo.Ranks,
1✔
265
                svcParams: static.ServiceParams,
1✔
266
        }
1✔
267
        // Populate target URLs and write out response.
1✔
268
        c.populateURLs(targetInfo.Targets, targetInfo.URLs, experiment, pOpts)
1✔
269
        result.Results = targetInfo.Targets
1✔
270
        writeResult(rw, http.StatusOK, &result)
1✔
271
        metrics.RequestsTotal.WithLabelValues("nearest", "success", http.StatusText(http.StatusOK)).Inc()
1✔
272
}
273

274
// Live is a minimal handler to indicate that the server is operating at all.
275
func (c *Client) Live(rw http.ResponseWriter, req *http.Request) {
1✔
276
        fmt.Fprintf(rw, "ok")
1✔
277
}
1✔
278

279
// Ready reports whether the server is working as expected and ready to serve requests.
280
func (c *Client) Ready(rw http.ResponseWriter, req *http.Request) {
1✔
281
        if c.LocatorV2.Ready() {
2✔
282
                fmt.Fprintf(rw, "ok")
1✔
283
        } else {
2✔
284
                rw.WriteHeader(http.StatusInternalServerError)
1✔
285
                fmt.Fprintf(rw, "not ready")
1✔
286
        }
1✔
287
}
288

289
// Registrations returns information about registered machines. There are 3
290
// supported query parameters:
291
//
292
// * format - defines the format of the returned JSON
293
// * org - limits results to only records for the given organization
294
// * exp - limits results to only records for the given experiment (e.g., ndt)
295
//
296
// The "org" and "exp" query parameters are currently only supported by the
297
// default or "machines" format.
298
func (c *Client) Registrations(rw http.ResponseWriter, req *http.Request) {
1✔
299
        var err error
1✔
300
        var result interface{}
1✔
301

1✔
302
        setHeaders(rw)
1✔
303

1✔
304
        q := req.URL.Query()
1✔
305
        format := q.Get("format")
1✔
306

1✔
307
        switch format {
1✔
308
        case "geo":
×
309
                result, err = siteinfo.Geo(c.LocatorV2.Instances(), q)
×
310
        default:
1✔
311
                result, err = siteinfo.Hosts(c.LocatorV2.Instances(), q)
1✔
312
        }
313

314
        if err != nil {
2✔
315
                v2Error := v2.NewError("siteinfo", err.Error(), http.StatusInternalServerError)
1✔
316
                writeResult(rw, http.StatusInternalServerError, v2Error)
1✔
317
                return
1✔
318
        }
1✔
319

320
        writeResult(rw, http.StatusOK, result)
1✔
321
}
322

323
// checkClientLocation looks up the client location and copies the location
324
// headers to the response writer.
325
func (c *Client) checkClientLocation(rw http.ResponseWriter, req *http.Request) (*clientgeo.Location, error) {
1✔
326
        // Lookup the client location using the client request.
1✔
327
        loc, err := c.Locate(req)
1✔
328
        if err != nil {
2✔
329
                return nil, errFailedToLookupClient
1✔
330
        }
1✔
331

332
        // Copy location headers to response writer.
333
        for key := range loc.Headers {
2✔
334
                rw.Header().Set(key, loc.Headers.Get(key))
1✔
335
        }
1✔
336

337
        return loc, nil
1✔
338
}
339

340
// populateURLs populates each set of URLs using the target configuration.
341
func (c *Client) populateURLs(targets []v2.Target, ports static.Ports, exp string, pOpts paramOpts) {
1✔
342
        for i, target := range targets {
2✔
343
                token := c.getAccessToken(target.Machine, exp)
1✔
344
                params := c.extraParams(target.Machine, i, pOpts)
1✔
345
                targets[i].URLs = c.getURLs(ports, target.Hostname, token, params)
1✔
346
        }
1✔
347
}
348

349
// getAccessToken allocates a new access token using the given machine name as
350
// the intended audience and the subject as the target service.
351
func (c *Client) getAccessToken(machine, subject string) string {
1✔
352
        // Create the token. The same access token is reused for every URL of a
1✔
353
        // target port.
1✔
354
        // A uuid is added to the claims so that each new token is unique.
1✔
355
        cl := jwt.Claims{
1✔
356
                Issuer:   static.IssuerLocate,
1✔
357
                Subject:  subject,
1✔
358
                Audience: jwt.Audience{machine},
1✔
359
                Expiry:   jwt.NewNumericDate(time.Now().Add(time.Minute)),
1✔
360
                ID:       uuid.NewString(),
1✔
361
        }
1✔
362
        token, err := c.Sign(cl)
1✔
363
        // Sign errors can only happen due to a misconfiguration of the key.
1✔
364
        // A good config will remain good.
1✔
365
        rtx.PanicOnError(err, "signing claims has failed")
1✔
366
        return token
1✔
367
}
1✔
368

369
// getURLs creates URLs for the named experiment, running on the named machine
370
// for each given port. Every URL will include an `access_token=` parameter,
371
// authorizing the measurement.
372
func (c *Client) getURLs(ports static.Ports, hostname, token string, extra url.Values) map[string]string {
1✔
373
        urls := map[string]string{}
1✔
374
        // For each port config, prepare the target url with access_token and
1✔
375
        // complete host field.
1✔
376
        for _, target := range ports {
2✔
377
                name := target.String()
1✔
378
                params := url.Values{}
1✔
379
                params.Set("access_token", token)
1✔
380
                for key := range extra {
2✔
381
                        // note: we only use the first value.
1✔
382
                        params.Set(key, extra.Get(key))
1✔
383
                }
1✔
384
                target.RawQuery = params.Encode()
1✔
385

1✔
386
                host := &bytes.Buffer{}
1✔
387
                err := c.targetTmpl.Execute(host, map[string]string{
1✔
388
                        "Hostname": hostname,
1✔
389
                        "Ports":    target.Host, // from URL template, so typically just the ":port".
1✔
390
                })
1✔
391
                rtx.PanicOnError(err, "bad template evaluation")
1✔
392
                target.Host = host.String()
1✔
393
                urls[name] = target.String()
1✔
394
        }
395
        return urls
1✔
396
}
397

398
// limitRequest determines whether a client request should be rate-limited.
399
func (c *Client) limitRequest(now time.Time, req *http.Request) bool {
1✔
400
        agent := req.Header.Get("User-Agent")
1✔
401
        l, ok := c.agentLimits[agent]
1✔
402
        if !ok {
2✔
403
                // No limit defined for user agent.
1✔
404
                return false
1✔
405
        }
1✔
406
        return l.IsLimited(now)
1✔
407
}
408

409
// setHeaders sets the response headers for "nearest" requests.
410
func setHeaders(rw http.ResponseWriter) {
1✔
411
        // Set CORS policy to allow third-party websites to use returned resources.
1✔
412
        rw.Header().Set("Content-Type", "application/json")
1✔
413
        rw.Header().Set("Access-Control-Allow-Origin", "*")
1✔
414
        // Prevent caching of result.
1✔
415
        // See also: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
1✔
416
        rw.Header().Set("Cache-Control", "no-store")
1✔
417
}
1✔
418

419
// writeResult marshals the result and writes the result to the response writer.
420
func writeResult(rw http.ResponseWriter, status int, result interface{}) {
1✔
421
        b, err := json.MarshalIndent(result, "", "  ")
1✔
422
        // Errors are only possible when marshalling incompatible types, like functions.
1✔
423
        rtx.PanicOnError(err, "Failed to format result")
1✔
424
        rw.WriteHeader(status)
1✔
425
        rw.Write(b)
1✔
426
}
1✔
427

428
// getExperimentAndService takes an http request path and extracts the last two
429
// fields. For correct requests (e.g. "/v2/nearest/ndt/ndt5"), this will be the
430
// experiment name (e.g. "ndt") and the datatype (e.g. "ndt5").
431
func getExperimentAndService(p string) (string, string) {
1✔
432
        datatype := path.Base(p)
1✔
433
        experiment := path.Base(path.Dir(p))
1✔
434
        return experiment, experiment + "/" + datatype
1✔
435
}
1✔
436

437
// getRemoteAddr extracts the remote address from the request. When running on
438
// Google App Engine, the X-Forwarded-For is guaranteed to be set. The GCP load
439
// balancer appends the actual client IP and the load balancer IP to any existing
440
// X-Forwarded-For header, so the second-to-last IP is the real client address.
441
// When running elsewhere (including on the local machine), the RemoteAddr from
442
// the request is used instead.
443
func getRemoteAddr(req *http.Request) string {
1✔
444
        xff := req.Header.Get("X-Forwarded-For")
1✔
445
        if xff != "" {
2✔
446
                // Split by comma and trim spaces from each IP
1✔
447
                ips := strings.Split(xff, ",")
1✔
448
                for i := range ips {
2✔
449
                        ips[i] = strings.TrimSpace(ips[i])
1✔
450
                }
1✔
451

452
                // GCP load balancer appends: <original-header>, <client-ip>, <lb-ip>
453
                // The second-to-last IP is the actual client address
454
                if len(ips) >= 2 {
2✔
455
                        return ips[len(ips)-2]
1✔
456
                } else if len(ips) == 1 && ips[0] != "" {
3✔
457
                        return ips[0]
1✔
458
                }
1✔
459
        }
460

461
        // Fall back to RemoteAddr for local testing or deployments outside of GAE.
462
        host, _, err := net.SplitHostPort(req.RemoteAddr)
1✔
463
        if err != nil {
2✔
464
                // As a last resort, use the whole RemoteAddr.
1✔
465
                return strings.TrimSpace(req.RemoteAddr)
1✔
466
        }
1✔
467
        return host
1✔
468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc