Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / 10517

2 Aug 2018 - 10:34 coverage increased (+0.2%) to 71.094%
10517

Pull #3270

circleci

Bryan Boreham
Specify https for security
Pull Request #3270: Convert CI build to CircleCI 2.0

8522 of 11987 relevant lines covered (71.09%)

85434.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.43
/router/overlay_switch.go
1
package router
2

3
import (
4
        "fmt"
5
        "strings"
6
        "sync"
7

8
        "github.com/weaveworks/mesh"
9
)
10

11
// OverlaySwitch selects which overlay to use, from a set of
12
// subsidiary overlays.  First, it passes a list of supported overlays
13
// in the connection features, and uses that to determine which
14
// overlays are in common.  Then it tries those common overlays, and
15
// uses the best one that seems to be working.
16

17
type OverlaySwitch struct {
18
        overlays      map[string]NetworkOverlay
19
        overlayNames  []string
20
        compatOverlay NetworkOverlay
21
}
22

23
func NewOverlaySwitch() *OverlaySwitch {
109×
24
        return &OverlaySwitch{overlays: make(map[string]NetworkOverlay)}
109×
25
}
109×
26

27
func (osw *OverlaySwitch) Add(name string, overlay NetworkOverlay) {
213×
28
        // check for repeated names
213×
29
        if _, present := osw.overlays[name]; present {
213×
30
                log.Fatal("OverlaySwitch: repeated overlay name")
!
31
        }
!
32

33
        osw.overlays[name] = overlay
213×
34
        osw.overlayNames = append(osw.overlayNames, name)
213×
35
}
36

37
func (osw *OverlaySwitch) SetCompatOverlay(overlay NetworkOverlay) {
109×
38
        osw.compatOverlay = overlay
109×
39
}
109×
40

41
func (osw *OverlaySwitch) AddFeaturesTo(features map[string]string) {
116×
42
        features["Overlays"] = strings.Join(osw.overlayNames, " ")
116×
43
}
116×
44

45
func (osw *OverlaySwitch) Diagnostics() interface{} {
285×
46
        diagnostics := make(map[string]interface{})
285×
47
        for name, overlay := range osw.overlays {
842×
48
                diagnostics[name] = overlay.Diagnostics()
557×
49
        }
557×
50
        return diagnostics
285×
51
}
52

53
func (osw *OverlaySwitch) Stop() {
109×
54
        for _, overlay := range osw.overlays {
322×
55
                overlay.Stop()
213×
56
        }
213×
57
}
58

59
func (osw *OverlaySwitch) InvalidateRoutes() {
514×
60
        for _, overlay := range osw.overlays {
1,501×
61
                overlay.InvalidateRoutes()
987×
62
        }
987×
63
}
64

65
func (osw *OverlaySwitch) InvalidateShortIDs() {
18×
66
        for _, overlay := range osw.overlays {
54×
67
                overlay.InvalidateShortIDs()
36×
68
        }
36×
69
}
70

71
func (osw *OverlaySwitch) StartConsumingPackets(localPeer *mesh.Peer, peers *mesh.Peers, consumer OverlayConsumer) error {
109×
72
        for _, overlay := range osw.overlays {
322×
73
                if err := overlay.StartConsumingPackets(localPeer, peers, consumer); err != nil {
213×
74
                        return err
!
75
                }
!
76
        }
77
        return nil
109×
78
}
79

80
type namedOverlay struct {
81
        NetworkOverlay
82
        name string
83
}
84

85
// Find the common set of overlays supported by both sides, with the
86
// ordering being the same on both sides too.
87
func (osw *OverlaySwitch) commonOverlays(params mesh.OverlayConnectionParams) ([]namedOverlay, error) {
106×
88
        var peerOverlays []string
106×
89
        if overlaysFeature, present := params.Features["Overlays"]; present {
212×
90
                peerOverlays = strings.Split(overlaysFeature, " ")
106×
91
        }
106×
92

93
        common := make(map[string]NetworkOverlay)
106×
94
        for _, name := range peerOverlays {
312×
95
                if overlay := osw.overlays[name]; overlay != nil {
409×
96
                        common[name] = overlay
203×
97
                }
203×
98
        }
99

100
        if len(common) == 0 {
106×
101
                return nil, fmt.Errorf("no overlays in common with peer")
!
102
        }
!
103

104
        // we order them according to the connecting node
105
        ordering := osw.overlayNames
106×
106
        if params.RemoteAddr == nil {
106×
107
                // we are the connectee
!
108
                ordering = peerOverlays
!
109
        }
!
110

111
        res := make([]namedOverlay, 0, len(common))
106×
112
        for _, name := range ordering {
309×
113
                overlay := common[name]
203×
114
                if overlay != nil {
406×
115
                        res = append(res, namedOverlay{overlay, name})
203×
116
                }
203×
117
        }
118

119
        // we use bytes to represent forwarder indices in control
120
        // messages, so just in case:
121
        if len(res) > 256 {
106×
122
                res = res[:256]
!
123
        }
!
124

125
        return res, nil
106×
126
}
127

128
type overlaySwitchForwarder struct {
129
        remotePeer *mesh.Peer
130

131
        lock sync.Mutex
132

133
        // the index of the forwarder to send on
134
        best int
135

136
        // the subsidiary forwarders
137
        forwarders []subForwarder
138

139
        // closed to tell the main goroutine to stop
140
        stopChan chan<- struct{}
141

142
        alreadyEstablished bool
143
        establishedChan    chan struct{}
144
        errorChan          chan error
145
}
146

147
// A subsidiary forwarder
148
type subForwarder struct {
149
        fwd         OverlayForwarder
150
        overlayName string
151

152
        // Has the forwarder signalled that it is established?
153
        established bool
154

155
        // closed to tell the forwarder monitor goroutine to stop
156
        stopChan chan<- struct{}
157
}
158

159
// An event from a subsidiary forwarder
160
type subForwarderEvent struct {
161
        // the index of the forwarder
162
        index int
163

164
        // is this an "established" event?
165
        established bool
166

167
        // is this an error event?
168
        err error
169
}
170

171
func (osw *OverlaySwitch) PrepareConnection(params mesh.OverlayConnectionParams) (mesh.OverlayConnection, error) {
106×
172
        if _, present := params.Features["Overlays"]; !present && osw.compatOverlay != nil {
106×
173
                return osw.compatOverlay.PrepareConnection(params)
!
174
        }
!
175

176
        overlays, err := osw.commonOverlays(params)
106×
177
        if err != nil {
106×
178
                return nil, err
!
179
        }
!
180

181
        // channel to carry events from the subforwarder monitors to
182
        // the main goroutine
183
        eventsChan := make(chan subForwarderEvent)
106×
184

106×
185
        // channel to stop the main goroutine
106×
186
        stopChan := make(chan struct{})
106×
187

106×
188
        fwd := &overlaySwitchForwarder{
106×
189
                remotePeer: params.RemotePeer,
106×
190

106×
191
                best:       -1,
106×
192
                forwarders: make([]subForwarder, len(overlays)),
106×
193
                stopChan:   stopChan,
106×
194

106×
195
                establishedChan: make(chan struct{}),
106×
196
                errorChan:       make(chan error, 1),
106×
197
        }
106×
198

106×
199
        origSendControlMessage := params.SendControlMessage
106×
200
        for i, overlay := range overlays {
309×
201
                // Prefix control messages to indicate the relevant forwarder
203×
202
                index := i
203×
203
                params.SendControlMessage = func(tag byte, msg []byte) error {
561×
204
                        xmsg := make([]byte, len(msg)+2)
358×
205
                        xmsg[0] = byte(index)
358×
206
                        xmsg[1] = tag
358×
207
                        copy(xmsg[2:], msg)
358×
208
                        return origSendControlMessage(mesh.ProtocolOverlayControlMsg, xmsg)
358×
209
                }
358×
210

211
                subConn, err := overlay.PrepareConnection(params)
203×
212
                if err != nil {
203×
213
                        log.Infof("Unable to use %s for connection to %s(%s): %s",
!
214
                                overlay.name,
!
215
                                params.RemotePeer.Name,
!
216
                                params.RemotePeer.NickName,
!
217
                                err)
!
218
                        // failed to start subforwarder - record overlay name and continue
!
219
                        fwd.forwarders[i] = subForwarder{
!
220
                                overlayName: overlay.name,
!
221
                        }
!
222
                        continue
!
223
                }
224
                subFwd := subConn.(OverlayForwarder)
203×
225

203×
226
                subStopChan := make(chan struct{})
203×
227
                go monitorForwarder(i, eventsChan, subStopChan, subFwd)
203×
228
                fwd.forwarders[i] = subForwarder{
203×
229
                        fwd:         subFwd,
203×
230
                        overlayName: overlay.name,
203×
231
                        stopChan:    subStopChan,
203×
232
                }
203×
233
        }
234

235
        fwd.chooseBest()
106×
236
        go fwd.run(eventsChan, stopChan)
106×
237
        return fwd, nil
106×
238
}
239

240
func monitorForwarder(index int, eventsChan chan<- subForwarderEvent, stopChan <-chan struct{}, fwd OverlayForwarder) {
203×
241
        establishedChan := fwd.EstablishedChannel()
203×
242
loop:
203×
243
        for {
568×
244
                e := subForwarderEvent{index: index}
365×
245

365×
246
                select {
365×
247
                case <-establishedChan:
162×
248
                        e.established = true
162×
249
                        establishedChan = nil
162×
250

251
                case err := <-fwd.ErrorChannel():
4×
252
                        e.err = err
4×
253

254
                case <-stopChan:
131×
255
                        break loop
131×
256
                }
257

258
                select {
166×
259
                case eventsChan <- e:
166×
UNCOV
260
                case <-stopChan:
!
UNCOV
261
                        break loop
!
262
                }
263

264
                if e.err != nil {
170×
265
                        break loop
4×
266
                }
267
        }
268

269
        fwd.Stop()
135×
270
}
271

272
func (fwd *overlaySwitchForwarder) run(eventsChan <-chan subForwarderEvent, stopChan <-chan struct{}) {
106×
273
loop:
106×
274
        for {
378×
275
                select {
272×
276
                case <-stopChan:
!
277
                        break loop
!
278

279
                case e := <-eventsChan:
166×
280
                        switch {
166×
281
                        case e.established:
162×
282
                                fwd.established(e.index)
162×
283
                        case e.err != nil:
4×
284
                                fwd.error(e.index, e.err)
4×
285
                        }
286
                }
287
        }
288

289
        fwd.lock.Lock()
!
290
        defer fwd.lock.Unlock()
!
291
        fwd.stopFrom(0)
!
292
}
293

294
func (fwd *overlaySwitchForwarder) established(index int) {
162×
295
        fwd.lock.Lock()
162×
296
        defer fwd.lock.Unlock()
162×
297

162×
298
        fwd.forwarders[index].established = true
162×
299

162×
300
        if !fwd.alreadyEstablished {
248×
301
                fwd.alreadyEstablished = true
86×
302
                close(fwd.establishedChan)
86×
303
        }
86×
304

305
        fwd.chooseBest()
162×
306
}
307

308
func (fwd *overlaySwitchForwarder) logPrefix() string {
160×
309
        return fmt.Sprintf("overlay_switch ->[%s] ", fwd.remotePeer)
160×
310
}
160×
311

312
func (fwd *overlaySwitchForwarder) error(index int, err error) {
4×
313
        fwd.lock.Lock()
4×
314
        defer fwd.lock.Unlock()
4×
315

4×
316
        log.Info(fwd.logPrefix(), fwd.forwarders[index].overlayName, " ", err)
4×
317
        fwd.forwarders[index].fwd = nil
4×
318
        fwd.chooseBest()
4×
319
}
4×
320

321
func (fwd *overlaySwitchForwarder) stopFrom(index int) {
71×
322
        for index < len(fwd.forwarders) {
206×
323
                subFwd := &fwd.forwarders[index]
135×
324
                if subFwd.fwd != nil {
266×
325
                        subFwd.fwd = nil
131×
326
                        close(subFwd.stopChan)
131×
327
                }
131×
328
                index++
135×
329
        }
330
}
331

332
func (fwd *overlaySwitchForwarder) chooseBest() {
272×
333
        // the most preferred established forwarder is the best
272×
334
        // otherwise, the most preferred working forwarder is the best
272×
335
        bestEstablished := -1
272×
336
        bestWorking := -1
272×
337

272×
338
        for i := range fwd.forwarders {
799×
339
                subFwd := &fwd.forwarders[i]
527×
340
                if subFwd.fwd == nil {
532×
341
                        continue
5×
342
                }
343

344
                if bestWorking < 0 {
793×
345
                        bestWorking = i
271×
346
                }
271×
347

348
                if bestEstablished < 0 && subFwd.established {
684×
349
                        bestEstablished = i
162×
350
                }
162×
351
        }
352

353
        best := bestEstablished
272×
354
        if best < 0 {
382×
355
                if bestWorking < 0 {
111×
356
                        select {
1×
357
                        case fwd.errorChan <- fmt.Errorf("no working forwarders to %s", fwd.remotePeer):
1×
358
                        default:
!
359
                        }
360

361
                        return
1×
362
                }
363

364
                best = bestWorking
109×
365
        }
366

367
        if fwd.best != best {
427×
368
                fwd.best = best
156×
369
                log.Info(fwd.logPrefix(), "using ", fwd.forwarders[best].overlayName)
156×
370
        }
156×
371
}
372

373
func (fwd *overlaySwitchForwarder) Confirm() {
104×
374
        var forwarders []OverlayForwarder
104×
375

104×
376
        fwd.lock.Lock()
104×
377
        for _, subFwd := range fwd.forwarders {
303×
378
                if subFwd.fwd != nil {
398×
379
                        forwarders = append(forwarders, subFwd.fwd)
199×
380
                }
199×
381
        }
382
        fwd.lock.Unlock()
104×
383

104×
384
        for _, subFwd := range forwarders {
303×
385
                subFwd.Confirm()
199×
386
        }
199×
387
}
388

389
func (fwd *overlaySwitchForwarder) Forward(pk ForwardPacketKey) FlowOp {
695×
390
        fwd.lock.Lock()
695×
391

695×
392
        if fwd.best >= 0 {
1,390×
393
                for i := fwd.best; i < len(fwd.forwarders); i++ {
1,390×
394
                        best := fwd.forwarders[i].fwd
695×
395
                        if best != nil {
1,390×
396
                                fwd.lock.Unlock()
695×
397
                                if op := best.Forward(pk); op != nil {
1,390×
398
                                        return op
695×
399
                                }
695×
400
                                fwd.lock.Lock()
!
401
                        }
402
                }
403
        }
404

405
        fwd.lock.Unlock()
!
406

!
407
        return DiscardingFlowOp{}
!
408
}
409

410
func (fwd *overlaySwitchForwarder) EstablishedChannel() <-chan struct{} {
104×
411
        return fwd.establishedChan
104×
412
}
104×
413

414
func (fwd *overlaySwitchForwarder) ErrorChannel() <-chan error {
104×
415
        return fwd.errorChan
104×
416
}
104×
417

418
func (fwd *overlaySwitchForwarder) Stop() {
71×
419
        fwd.lock.Lock()
71×
420
        defer fwd.lock.Unlock()
71×
421
        fwd.stopFrom(0)
71×
422
}
71×
423

424
func (fwd *overlaySwitchForwarder) ControlMessage(tag byte, msg []byte) {
349×
425
        fwd.lock.Lock()
349×
426
        subFwd := fwd.forwarders[msg[0]].fwd
349×
427
        fwd.lock.Unlock()
349×
428
        if subFwd != nil {
698×
429
                subFwd.ControlMessage(msg[1], msg[2:])
349×
430
        }
349×
431
}
432

433
func (fwd *overlaySwitchForwarder) Attrs() map[string]interface{} {
102×
434
        var best OverlayForwarder
102×
435

102×
436
        fwd.lock.Lock()
102×
437
        if fwd.best >= 0 {
204×
438
                best = fwd.forwarders[fwd.best].fwd
102×
439
        }
102×
440
        fwd.lock.Unlock()
102×
441

102×
442
        if best != nil {
204×
443
                return best.Attrs()
102×
444
        }
102×
445

446
        return nil
!
447
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc