• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / traceroute-caller / 640

16 Oct 2024 09:13PM UTC coverage: 94.825% (-1.2%) from 95.983%
640

push

travis-pro

web-flow
Add json output format (#168)

* Support output format as json
* Add -output.format flag to caller
* Add -input.format flag to trex
* Use travis-pro for coverage target
* Build generate-schemas in Dockerfile
* Rename HopAnnotation2 type and schema
* Make file extension configurable
* Set extension based on output format
* Generalize ptr and waitprobe options for traceroutes
* Update flags for ptr and waitprobe

62 of 65 new or added lines in 7 files covered. (95.38%)

19 existing lines in 1 file now uncovered.

733 of 773 relevant lines covered (94.83%)

1.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.78
/internal/triggertrace/triggertrace.go
1
// Package triggertrace triggers a traceroute operation to a destination
2
// after the destination closes its connection with our host (local IP).
3
// Once a traceroute is obtained, the IP addresses of the hops in that
4
// traceroute are annotated and archived.
5
package triggertrace
6

7
import (
8
        "context"
9
        "errors"
10
        "fmt"
11
        "log"
12
        "net"
13
        "os"
14
        "strings"
15
        "sync"
16
        "time"
17

18
        "github.com/m-lab/go/anonymize"
19
        "github.com/m-lab/tcp-info/inetdiag"
20
        "github.com/m-lab/traceroute-caller/hopannotation"
21
        "github.com/m-lab/traceroute-caller/internal/ipcache"
22
        "github.com/m-lab/traceroute-caller/parser"
23
        "github.com/m-lab/uuid-annotator/annotator"
24
)
25

26
var (
27
        // Variables to aid in black-box testing.
28
        netInterfaceAddrs = net.InterfaceAddrs
29
        metadataDir       = "/metadata"
30
)
31

32
// Destination is the host to run a traceroute to.
33
type Destination struct {
34
        RemoteIP string
35
}
36

37
// FetchTracer is the interface for obtaining a traceroute.  The
38
// implementation will return the traceroute from a recent entry in the
39
// cache (if it exists) in order to avoid running multiple traceroutes to
40
// the same destination in a short time.
41
type FetchTracer interface {
42
        FetchTrace(remoteIP, uuid string) ([]byte, error)
43
}
44

45
// ParseTracer is the interface for parsing raw traceroutes obtained
46
// from the traceroute tool.
47
type ParseTracer interface {
48
        ParseRawData(rawData []byte) (parser.ParsedData, error)
49
        Format() string
50
}
51

52
// AnnotateAndArchiver is the interface for annotating IP addresses and
53
// archiving them.
54
type AnnotateAndArchiver interface {
55
        Annotate(context.Context, []string, time.Time) (map[string]*annotator.ClientAnnotations, []error)
56
        WriteAnnotations(map[string]*annotator.ClientAnnotations, time.Time) []error
57
}
58

59
// TracerWriter provides the interface for issuing traces and writing results.
60
type TracerWriter interface {
61
        ipcache.Tracer
62
        WriteFile(uuid string, t time.Time, b []byte) error
63
}
64

65
// Handler implements the tcp-info/eventsocket.Handler's interface.
66
type Handler struct {
67
        Destinations     map[string]Destination // key is UUID
68
        DestinationsLock sync.Mutex
69
        LocalIPs         []net.IP
70
        IPCache          FetchTracer
71
        Parser           ParseTracer
72
        HopAnnotator     AnnotateAndArchiver
73
        Tracetool        TracerWriter
74
        Anonymizer       anonymize.IPAnonymizer
75
        done             chan struct{} // For testing.
76
}
77

78
// NewHandler returns a new instance of Handler.
79
func NewHandler(ctx context.Context, tracetool TracerWriter, ipcCfg ipcache.Config, newParser parser.TracerouteParser, haCfg hopannotation.Config) (*Handler, error) {
1✔
80
        ipCache, err := ipcache.New(ctx, tracetool, ipcCfg)
1✔
81
        if err != nil {
1✔
UNCOV
82
                return nil, err
×
UNCOV
83
        }
×
84
        myIPs, err := localIPs(metadataDir)
1✔
85
        if err != nil {
2✔
86
                return nil, err
1✔
87
        }
1✔
88
        hopCache, err := hopannotation.New(ctx, haCfg)
1✔
89
        if err != nil {
1✔
UNCOV
90
                return nil, err
×
UNCOV
91
        }
×
92
        return &Handler{
1✔
93
                Destinations: make(map[string]Destination),
1✔
94
                LocalIPs:     myIPs,
1✔
95
                IPCache:      ipCache,
1✔
96
                Parser:       newParser,
1✔
97
                HopAnnotator: hopCache,
1✔
98
                Tracetool:    tracetool,
1✔
99
                Anonymizer:   anonymize.New(anonymize.IPAnonymizationFlag),
1✔
100
        }, nil
1✔
101
}
102

103
// Open is called when a network connection is opened.
104
// Note that this function doesn't use timestamp.
105
func (h *Handler) Open(ctx context.Context, timestamp time.Time, uuid string, sockID *inetdiag.SockID) {
1✔
106
        if sockID == nil {
2✔
107
                // TODO(SaiedKazemi): Add a metric here.
1✔
108
                log.Printf("error: tcp-info passed a nil sockID\n")
1✔
109
                return
1✔
110
        }
1✔
111
        if uuid == "" {
2✔
112
                // TODO(SaiedKazemi): Add a metric here.
1✔
113
                log.Printf("error: tcp-info passed an empty uuid for sockID %+v\n", *sockID)
1✔
114
                return
1✔
115
        }
1✔
116

117
        // TODO(SaiedKazemi): Determine whether the lock can be moved
118
        //     to right before accessing the map.
119
        h.DestinationsLock.Lock()
1✔
120
        defer h.DestinationsLock.Unlock()
1✔
121
        destination, err := h.findDestination(sockID)
1✔
122
        if err != nil {
2✔
123
                log.Printf("context %p: failed to find destination from SockID %+v: %v\n", ctx, *sockID, err)
1✔
124
                return
1✔
125
        }
1✔
126
        h.Destinations[uuid] = destination
1✔
127
}
128

129
// Close is called when a network connection is closed.
130
// Note that this function doesn't use timestamp.
131
func (h *Handler) Close(ctx context.Context, timestamp time.Time, uuid string) {
1✔
132
        h.DestinationsLock.Lock()
1✔
133
        destination, ok := h.Destinations[uuid]
1✔
134
        if !ok {
2✔
135
                h.DestinationsLock.Unlock()
1✔
136
                log.Printf("context %p: failed to find destination for UUID %q", ctx, uuid)
1✔
137
                return
1✔
138
        }
1✔
139
        delete(h.Destinations, uuid)
1✔
140
        h.DestinationsLock.Unlock()
1✔
141
        // This goroutine will live for a few minutes and terminate
1✔
142
        // after all hop annotations are archived.
1✔
143
        go h.traceAnnotateAndArchive(ctx, uuid, destination)
1✔
144
}
145

146
// traceAnnotateAndArchive runs a traceroute, annotates the hops
147
// in the traceroute output, and archives the annotations.
148
func (h *Handler) traceAnnotateAndArchive(ctx context.Context, uuid string, dest Destination) {
1✔
149
        defer func() {
2✔
150
                if h.done != nil {
2✔
151
                        close(h.done)
1✔
152
                }
1✔
153
        }()
154
        rawData, err := h.IPCache.FetchTrace(dest.RemoteIP, uuid)
1✔
155
        if err != nil {
2✔
156
                log.Printf("context %p: failed to run a traceroute to %q (error: %v)\n", ctx, dest, err)
1✔
157
                return
1✔
158
        }
1✔
159
        parsedData, err := h.Parser.ParseRawData(rawData)
1✔
160
        if err != nil {
2✔
161
                log.Printf("context %p: failed to parse traceroute output (error: %v)\n", ctx, err)
1✔
162
                return
1✔
163
        }
1✔
164

165
        // Anonymize the parsed data in place.
166
        parsedData.Anonymize(h.Anonymizer)
1✔
167
        // Remarshal anonymized data for writing.
1✔
168
        rawOut, err := parsedData.Marshal(h.Parser.Format())
1✔
169
        if err != nil {
1✔
NEW
170
                log.Printf("context %p: failed to marshal traceroute output (error: %v)\n", ctx, err)
×
NEW
171
                return
×
NEW
172
        }
×
173

174
        traceStartTime := parsedData.StartTime()
1✔
175
        err = h.Tracetool.WriteFile(uuid, traceStartTime, rawOut)
1✔
176
        if err != nil {
2✔
177
                log.Printf("context %p: failed to write trace file for uuid: %s: (error: %v)\n", ctx, uuid, err)
1✔
178
        }
1✔
179

180
        hops := parsedData.ExtractHops()
1✔
181
        if len(hops) == 0 {
2✔
182
                log.Printf("context %p: failed to extract hops from traceroute %+v\n", ctx, string(rawData))
1✔
183
                return
1✔
184
        }
1✔
185
        annotations, allErrs := h.HopAnnotator.Annotate(ctx, hops, traceStartTime)
1✔
186
        if allErrs != nil {
2✔
187
                log.Printf("context %p: failed to annotate some or all hops (errors: %+v)\n", ctx, allErrs)
1✔
188
        }
1✔
189
        if len(annotations) > 0 {
2✔
190
                allErrs := h.HopAnnotator.WriteAnnotations(annotations, traceStartTime)
1✔
191
                if allErrs != nil {
1✔
UNCOV
192
                        log.Printf("context %p: failed to write some or all annotations due to the following error(s):\n", ctx)
×
UNCOV
193
                        for _, err := range allErrs {
×
UNCOV
194
                                log.Printf("error: %v\n", err)
×
UNCOV
195
                        }
×
196
                }
197
        }
198
}
199

200
// findDestination iterates through the local IPs to find which one of
201
// the source and destination IPs specified in the given socket is indeed
202
// the destination IP.
203
func (h *Handler) findDestination(sockid *inetdiag.SockID) (Destination, error) {
1✔
204
        srcIP := net.ParseIP(sockid.SrcIP)
1✔
205
        if srcIP == nil {
2✔
206
                return Destination{}, fmt.Errorf("failed to parse source IP %q", sockid.SrcIP)
1✔
207
        }
1✔
208
        dstIP := net.ParseIP(sockid.DstIP)
1✔
209
        if dstIP == nil {
2✔
210
                return Destination{}, fmt.Errorf("failed to parse destination IP %q", sockid.DstIP)
1✔
211
        }
1✔
212
        srcLocal := false
1✔
213
        dstLocal := false
1✔
214
        for _, localIP := range h.LocalIPs {
2✔
215
                srcLocal = srcLocal || localIP.Equal(srcIP)
1✔
216
                dstLocal = dstLocal || localIP.Equal(dstIP)
1✔
217
        }
1✔
218
        if srcLocal && !dstLocal {
2✔
219
                return Destination{
1✔
220
                        RemoteIP: sockid.DstIP,
1✔
221
                }, nil
1✔
222
        }
1✔
223
        if !srcLocal && dstLocal {
2✔
224
                return Destination{
1✔
225
                        RemoteIP: sockid.SrcIP,
1✔
226
                }, nil
1✔
227
        }
1✔
228
        return Destination{}, fmt.Errorf("failed to find a local/remote IP pair in %+v", sockid)
1✔
229
}
230

231
// localIPs returns the list of system's unicast interface addresses.
232
func localIPs(metadataDir string) ([]net.IP, error) {
1✔
233
        localIPs := make([]net.IP, 0)
1✔
234
        addrs, err := netInterfaceAddrs()
1✔
235
        if err != nil {
2✔
236
                return localIPs, err
1✔
237
        }
1✔
238

239
        for _, addr := range addrs {
2✔
240
                var ip net.IP
1✔
241
                switch a := addr.(type) {
1✔
242
                case *net.IPNet:
1✔
243
                        ip = a.IP
1✔
244
                case *net.IPAddr:
1✔
245
                        ip = a.IP
1✔
UNCOV
246
                default:
×
UNCOV
247
                        return localIPs, fmt.Errorf("unknown address type %q", addr.String())
×
248
                }
249
                if ip != nil {
2✔
250
                        localIPs = append(localIPs, ip)
1✔
251
                }
1✔
252
        }
253

254
        localIPs, err = loadbalancerIPs(localIPs, metadataDir)
1✔
255
        if err != nil {
1✔
UNCOV
256
                return localIPs, err
×
UNCOV
257
        }
×
258

259
        return localIPs, nil
1✔
260
}
261

262
// loadbalancerIPs returns the public IP addresses, if any, of a load balancer
263
// that may sit in front of the machine. Not all machines site in front of a
264
// load balancer, so this function may return the the same []*net.IP that was
265
// passed to it. This function is necessary because traceroute-caller needs to
266
// recognize the load balancer IPs as "local", else it will fail to identify a
267
// proper destination, and will exit with an error, producing no traceroute data.
268
func loadbalancerIPs(localIPs []net.IP, metadataDir string) ([]net.IP, error) {
1✔
269
        var ip net.IP
1✔
270

1✔
271
        // While every machine _should_ have a /metadata/loadbalanced file, for now
1✔
272
        // consider its non-existence to mean that the machine is not load balanced.
1✔
273
        if _, err := os.Stat(metadataDir + "/loadbalanced"); errors.Is(err, os.ErrNotExist) {
2✔
274
                return localIPs, nil
1✔
275
        }
1✔
276

277
        lb, err := os.ReadFile(metadataDir + "/loadbalanced")
1✔
278
        if err != nil {
1✔
UNCOV
279
                return localIPs, fmt.Errorf("unable to read file %s/loadbalanced: %v", metadataDir, err)
×
UNCOV
280
        }
×
281

282
        // If the machine isn't load balanced, then just return localIPs unmodified.
283
        if string(lb) == "false" {
2✔
284
                return localIPs, nil
1✔
285
        }
1✔
286

287
        for _, f := range []string{"external-ip", "external-ipv6"} {
2✔
288
                ipBytes, err := os.ReadFile(metadataDir + "/" + f)
1✔
289
                if err != nil {
1✔
UNCOV
290
                        return localIPs, fmt.Errorf("unable to read file %s/%s: %v", metadataDir, f, err)
×
UNCOV
291
                }
×
292
                ipString := string(ipBytes)
1✔
293

1✔
294
                // GCE metadata for key "forwarded-ipv6s" is returned in CIDR format.
1✔
295
                if strings.Contains(ipString, "/") {
1✔
UNCOV
296
                        ip, _, _ = net.ParseCIDR(ipString)
×
297
                } else {
1✔
298
                        ip = net.ParseIP(ipString)
1✔
299
                }
1✔
300
                if ip == nil {
1✔
UNCOV
301
                        return localIPs, fmt.Errorf("failed to parse IP: %s", ipString)
×
UNCOV
302
                }
×
303
                localIPs = append(localIPs, ip)
1✔
304
                log.Printf("added load balancer IP %s to localIPs\n", ip.String())
1✔
305
        }
306

307
        return localIPs, nil
1✔
308
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc