Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #4371

9 Nov 2015 - 15:51 coverage increased (+0.1%) to 75.78%
#4371

Pull #1648

circleci

2f1e5f233f2b7283a9bf3277e75bf30a?size=18&default=identiconrade
refactor: move NetworkRouterStatus into its own file
Pull Request #1648: separate the control and data planes

134 of 198 new or added lines in 11 files covered. (67.68%)

56 existing lines in 6 files now uncovered.

6483 of 8555 relevant lines covered (75.78%)

192419.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.58
/router/overlay_switch.go
1
package router
2

3
import (
4
        "fmt"
5
        "strings"
6
        "sync"
7
)
8

9
// OverlaySwitch selects which overlay to use, from a set of
10
// subsidiary overlays.  First, it passes a list of supported overlays
11
// in the connection features, and uses that to determine which
12
// overlays are in common.  Then it tries those common overlays, and
13
// uses the best one that seems to be working.
14

15
type OverlaySwitch struct {
16
        overlays      map[string]NetworkOverlay
17
        overlayNames  []string
18
        compatOverlay NetworkOverlay
19
}
20

21
func NewOverlaySwitch() *OverlaySwitch {
49×
22
        return &OverlaySwitch{overlays: make(map[string]NetworkOverlay)}
49×
23
}
49×
24

25
func (osw *OverlaySwitch) Add(name string, overlay NetworkOverlay) {
94×
26
        // check for repeated names
94×
27
        if _, present := osw.overlays[name]; present {
!
28
                log.Fatal("OverlaySwitch: repeated overlay name")
!
29
        }
!
30

31
        osw.overlays[name] = overlay
94×
32
        osw.overlayNames = append(osw.overlayNames, name)
94×
33
}
34

35
func (osw *OverlaySwitch) SetCompatOverlay(overlay NetworkOverlay) {
49×
36
        osw.compatOverlay = overlay
49×
37
}
49×
38

39
func (osw *OverlaySwitch) AddFeaturesTo(features map[string]string) {
43×
40
        features["Overlays"] = strings.Join(osw.overlayNames, " ")
43×
41
}
43×
42

43
func (osw *OverlaySwitch) Diagnostics() interface{} {
66×
44
        diagnostics := make(map[string]interface{})
66×
45
        for name, overlay := range osw.overlays {
128×
46
                diagnostics[name] = overlay.Diagnostics()
128×
47
        }
128×
48
        return diagnostics
66×
49
}
50

51
func (osw *OverlaySwitch) InvalidateRoutes() {
93×
52
        for _, overlay := range osw.overlays {
176×
53
                overlay.InvalidateRoutes()
176×
54
        }
176×
55
}
56

57
func (osw *OverlaySwitch) InvalidateShortIDs() {
!
58
        for _, overlay := range osw.overlays {
!
59
                overlay.InvalidateShortIDs()
!
60
        }
!
61
}
62

63
func (osw *OverlaySwitch) StartConsumingPackets(localPeer *Peer, peers *Peers, consumer OverlayConsumer) error {
49×
64
        for _, overlay := range osw.overlays {
94×
65
                if err := overlay.StartConsumingPackets(localPeer, peers, consumer); err != nil {
!
66
                        return err
!
67
                }
!
68
        }
69
        return nil
49×
70
}
71

72
type namedOverlay struct {
73
        NetworkOverlay
74
        name string
75
}
76

77
// Find the common set of overlays supported by both sides, with the
78
// ordering being the same on both sides too.
79
func (osw *OverlaySwitch) commonOverlays(params OverlayConnectionParams) ([]namedOverlay, error) {
35×
80
        var peerOverlays []string
35×
81
        if overlaysFeature, present := params.Features["Overlays"]; present {
35×
82
                peerOverlays = strings.Split(overlaysFeature, " ")
35×
83
        }
35×
84

85
        common := make(map[string]NetworkOverlay)
35×
86
        for _, name := range peerOverlays {
66×
87
                if overlay := osw.overlays[name]; overlay != nil {
66×
88
                        common[name] = overlay
66×
89
                }
66×
90
        }
91

92
        if len(common) == 0 {
!
93
                return nil, fmt.Errorf("no overlays in common with peer")
!
94
        }
!
95

96
        // we order them according to the connecting node
97
        ordering := osw.overlayNames
35×
98
        if params.RemoteAddr == nil {
!
99
                // we are the connectee
!
100
                ordering = peerOverlays
!
101
        }
!
102

103
        res := make([]namedOverlay, 0, len(common))
35×
104
        for _, name := range ordering {
66×
105
                overlay := common[name]
66×
106
                if overlay != nil {
66×
107
                        res = append(res, namedOverlay{overlay, name})
66×
108
                }
66×
109
        }
110

111
        // we use bytes to represent forwarder indices in control
112
        // messages, so just in case:
113
        if len(res) > 256 {
!
114
                res = res[:256]
!
115
        }
!
116

117
        return res, nil
35×
118
}
119

120
type overlaySwitchForwarder struct {
121
        remotePeer *Peer
122

123
        lock sync.Mutex
124

125
        // the index of the forwarder to send on
126
        best int
127

128
        // the subsidiary forwarders
129
        forwarders []subForwarder
130

131
        // closed to tell the main goroutine to stop
132
        stopChan chan<- struct{}
133

134
        alreadyEstablished bool
135
        establishedChan    chan struct{}
136
        errorChan          chan error
137
}
138

139
// A subsidiary forwarder
140
type subForwarder struct {
141
        fwd         OverlayForwarder
142
        overlayName string
143

144
        // Has the forwarder signalled that it is established?
145
        established bool
146

147
        // closed to tell the forwarder monitor goroutine to stop
148
        stopChan chan<- struct{}
149
}
150

151
// An event from a subsidiary forwarder
152
type subForwarderEvent struct {
153
        // the index of the forwarder
154
        index int
155

156
        // is this an "established" event?
157
        established bool
158

159
        // is this an error event?
160
        err error
161
}
162

163
func (osw *OverlaySwitch) PrepareConnection(params OverlayConnectionParams) (OverlayConnection, error) {
35×
164
        if _, present := params.Features["Overlays"]; !present && osw.compatOverlay != nil {
!
NEW
165
                return osw.compatOverlay.PrepareConnection(params)
!
166
        }
!
167

168
        overlays, err := osw.commonOverlays(params)
35×
169
        if err != nil {
!
170
                return nil, err
!
171
        }
!
172

173
        // channel to carry events from the subforwarder monitors to
174
        // the main goroutine
175
        eventsChan := make(chan subForwarderEvent)
35×
176

35×
177
        // channel to stop the main goroutine
35×
178
        stopChan := make(chan struct{})
35×
179

35×
180
        fwd := &overlaySwitchForwarder{
35×
181
                remotePeer: params.RemotePeer,
35×
182

35×
183
                best:       -1,
35×
184
                forwarders: make([]subForwarder, len(overlays)),
35×
185
                stopChan:   stopChan,
35×
186

35×
187
                establishedChan: make(chan struct{}),
35×
188
                errorChan:       make(chan error, 1),
35×
189
        }
35×
190

35×
191
        origSendControlMessage := params.SendControlMessage
35×
192
        for i, overlay := range overlays {
66×
193
                // Prefix control messages to indicate the relevant forwarder
66×
194
                index := i
66×
195
                params.SendControlMessage = func(tag byte, msg []byte) error {
132×
196
                        xmsg := make([]byte, len(msg)+2)
132×
197
                        xmsg[0] = byte(index)
132×
198
                        xmsg[1] = tag
132×
199
                        copy(xmsg[2:], msg)
132×
200
                        return origSendControlMessage(ProtocolOverlayControlMsg, xmsg)
132×
201
                }
132×
202

203
                subConn, err := overlay.PrepareConnection(params)
66×
204
                if err != nil {
!
205
                        fwd.stopFrom(0)
!
206
                        return nil, err
!
207
                }
!
208
                subFwd := subConn.(OverlayForwarder)
66×
209

66×
210
                subStopChan := make(chan struct{})
66×
211
                go monitorForwarder(i, eventsChan, subStopChan, subFwd)
66×
212
                fwd.forwarders[i] = subForwarder{
66×
213
                        fwd:         subFwd,
66×
214
                        overlayName: overlay.name,
66×
215
                        stopChan:    subStopChan,
66×
216
                }
66×
217
        }
218

219
        fwd.chooseBest()
35×
220
        go fwd.run(eventsChan, stopChan)
35×
221
        return fwd, nil
35×
222
}
223

224
func monitorForwarder(index int, eventsChan chan<- subForwarderEvent, stopChan <-chan struct{}, fwd OverlayForwarder) {
66×
225
        establishedChan := fwd.EstablishedChannel()
66×
226
loop:
66×
227
        for {
128×
228
                e := subForwarderEvent{index: index}
128×
229

128×
230
                select {
128×
231
                case <-establishedChan:
62×
232
                        e.established = true
62×
233
                        establishedChan = nil
62×
234

235
                case err := <-fwd.ErrorChannel():
!
236
                        e.err = err
!
237

238
                case <-stopChan:
34×
239
                        break loop
34×
240
                }
241

242
                select {
62×
243
                case eventsChan <- e:
62×
244
                case <-stopChan:
!
245
                        break loop
!
246
                }
247

248
                if e.err != nil {
!
249
                        break loop
!
250
                }
251
        }
252

253
        fwd.Stop()
34×
254
}
255

256
func (fwd *overlaySwitchForwarder) run(eventsChan <-chan subForwarderEvent, stopChan <-chan struct{}) {
35×
257
loop:
35×
258
        for {
97×
259
                select {
97×
260
                case <-stopChan:
!
261
                        break loop
!
262

263
                case e := <-eventsChan:
62×
264
                        switch {
62×
265
                        case e.established:
62×
266
                                fwd.established(e.index)
62×
267
                        case e.err != nil:
!
268
                                fwd.error(e.index, e.err)
!
269
                        }
270
                }
271
        }
272

273
        fwd.lock.Lock()
!
274
        defer fwd.lock.Unlock()
!
275
        fwd.stopFrom(0)
!
276
}
277

278
func (fwd *overlaySwitchForwarder) established(index int) {
62×
279
        fwd.lock.Lock()
62×
280
        defer fwd.lock.Unlock()
62×
281

62×
282
        fwd.forwarders[index].established = true
62×
283

62×
284
        if !fwd.alreadyEstablished {
33×
285
                fwd.alreadyEstablished = true
33×
286
                close(fwd.establishedChan)
33×
287
        }
33×
288

289
        fwd.chooseBest()
62×
290
}
291

292
func (fwd *overlaySwitchForwarder) logPrefix() string {
57×
293
        return fmt.Sprintf("overlay_switch ->[%s] ", fwd.remotePeer)
57×
294
}
57×
295

296
func (fwd *overlaySwitchForwarder) error(index int, err error) {
!
297
        fwd.lock.Lock()
!
298
        defer fwd.lock.Unlock()
!
299

!
300
        log.Info(fwd.logPrefix(), fwd.forwarders[index].overlayName, " ", err)
!
301
        fwd.forwarders[index].fwd = nil
!
302
        fwd.chooseBest()
!
303
}
!
304

305
func (fwd *overlaySwitchForwarder) stopFrom(index int) {
18×
306
        for index < len(fwd.forwarders) {
34×
307
                subFwd := &fwd.forwarders[index]
34×
308
                if subFwd.fwd != nil {
34×
309
                        subFwd.fwd = nil
34×
310
                        close(subFwd.stopChan)
34×
311
                }
34×
312
                index++
34×
313
        }
314
}
315

316
func (fwd *overlaySwitchForwarder) chooseBest() {
97×
317
        // the most preferred established forwarder is the best
97×
318
        // otherwise, the most preferred working forwarder is the best
97×
319
        bestEstablished := -1
97×
320
        bestWorking := -1
97×
321

97×
322
        for i := range fwd.forwarders {
186×
323
                subFwd := &fwd.forwarders[i]
186×
324
                if subFwd.fwd == nil {
!
325
                        continue
!
326
                }
327

328
                if bestWorking < 0 {
97×
329
                        bestWorking = i
97×
330
                }
97×
331

332
                if bestEstablished < 0 && subFwd.established {
62×
333
                        bestEstablished = i
62×
334
                }
62×
335
        }
336

337
        best := bestEstablished
97×
338
        if best < 0 {
35×
339
                if bestWorking < 0 {
!
340
                        select {
!
341
                        case fwd.errorChan <- fmt.Errorf("no working forwarders to %s", fwd.remotePeer):
!
342
                        default:
!
343
                        }
344

345
                        return
!
346
                }
347

348
                best = bestWorking
35×
349
        }
350

351
        if fwd.best != best {
57×
352
                fwd.best = best
57×
353
                log.Info(fwd.logPrefix(), "using ", fwd.forwarders[best].overlayName)
57×
354
        }
57×
355
}
356

357
func (fwd *overlaySwitchForwarder) Confirm() {
35×
358
        var forwarders []OverlayForwarder
35×
359

35×
360
        fwd.lock.Lock()
35×
361
        for _, subFwd := range fwd.forwarders {
66×
362
                if subFwd.fwd != nil {
66×
363
                        forwarders = append(forwarders, subFwd.fwd)
66×
364
                }
66×
365
        }
366
        fwd.lock.Unlock()
35×
367

35×
368
        for _, subFwd := range forwarders {
66×
369
                subFwd.Confirm()
66×
370
        }
66×
371
}
372

373
func (fwd *overlaySwitchForwarder) Forward(pk ForwardPacketKey) FlowOp {
530×
374
        fwd.lock.Lock()
530×
375

530×
376
        if fwd.best >= 0 {
530×
377
                for i := fwd.best; i < len(fwd.forwarders); i++ {
530×
378
                        best := fwd.forwarders[i].fwd
530×
379
                        if best != nil {
530×
380
                                fwd.lock.Unlock()
530×
381
                                if op := best.Forward(pk); op != nil {
530×
382
                                        return op
530×
383
                                }
530×
384
                                fwd.lock.Lock()
!
385
                        }
386
                }
387
        }
388

389
        fwd.lock.Unlock()
!
390

!
391
        return DiscardingFlowOp{}
!
392
}
393

394
func (fwd *overlaySwitchForwarder) EstablishedChannel() <-chan struct{} {
35×
395
        return fwd.establishedChan
35×
396
}
35×
397

398
func (fwd *overlaySwitchForwarder) ErrorChannel() <-chan error {
35×
399
        return fwd.errorChan
35×
400
}
35×
401

402
func (fwd *overlaySwitchForwarder) Stop() {
18×
403
        fwd.lock.Lock()
18×
404
        defer fwd.lock.Unlock()
18×
405
        fwd.stopFrom(0)
18×
406
}
18×
407

408
func (fwd *overlaySwitchForwarder) ControlMessage(tag byte, msg []byte) {
133×
409
        fwd.lock.Lock()
133×
410
        subFwd := fwd.forwarders[msg[0]].fwd
133×
411
        fwd.lock.Unlock()
133×
412
        if subFwd != nil {
133×
413
                subFwd.ControlMessage(msg[1], msg[2:])
133×
414
        }
133×
415
}
416

417
func (fwd *overlaySwitchForwarder) DisplayName() string {
16×
418
        var best OverlayForwarder
16×
419

16×
420
        fwd.lock.Lock()
16×
421
        if fwd.best >= 0 {
16×
422
                best = fwd.forwarders[fwd.best].fwd
16×
423
        }
16×
424
        fwd.lock.Unlock()
16×
425

16×
426
        if best != nil {
16×
427
                return best.DisplayName()
16×
428
        }
16×
429

430
        return "none"
!
431
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc