Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #6773

15 Jun 2016 - 14:50 coverage decreased (-0.03%) to 73.692%
#6773

Pull #2366

circleci

94ccfc629c6862b5950de3512742bcae?size=18&default=identiconbboreham
Publish weavedb container from release script
Pull Request #2366: Publish weavedb container from release script

6493 of 8811 relevant lines covered (73.69%)

120432.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.06
/router/overlay_switch.go
1
package router
2

3
import (
4
        "fmt"
5
        "strings"
6
        "sync"
7

8
        "github.com/weaveworks/mesh"
9
)
10

11
// OverlaySwitch selects which overlay to use, from a set of
12
// subsidiary overlays.  First, it passes a list of supported overlays
13
// in the connection features, and uses that to determine which
14
// overlays are in common.  Then it tries those common overlays, and
15
// uses the best one that seems to be working.
16

17
type OverlaySwitch struct {
18
        overlays      map[string]NetworkOverlay
19
        overlayNames  []string
20
        compatOverlay NetworkOverlay
21
}
22

23
func NewOverlaySwitch() *OverlaySwitch {
93×
24
        return &OverlaySwitch{overlays: make(map[string]NetworkOverlay)}
93×
25
}
93×
26

27
func (osw *OverlaySwitch) Add(name string, overlay NetworkOverlay) {
184×
28
        // check for repeated names
184×
29
        if _, present := osw.overlays[name]; present {
184×
30
                log.Fatal("OverlaySwitch: repeated overlay name")
!
31
        }
!
32

33
        osw.overlays[name] = overlay
184×
34
        osw.overlayNames = append(osw.overlayNames, name)
184×
35
}
36

37
func (osw *OverlaySwitch) SetCompatOverlay(overlay NetworkOverlay) {
93×
38
        osw.compatOverlay = overlay
93×
39
}
93×
40

41
func (osw *OverlaySwitch) AddFeaturesTo(features map[string]string) {
96×
42
        features["Overlays"] = strings.Join(osw.overlayNames, " ")
96×
43
}
96×
44

45
func (osw *OverlaySwitch) Diagnostics() interface{} {
174×
46
        diagnostics := make(map[string]interface{})
174×
47
        for name, overlay := range osw.overlays {
518×
48
                diagnostics[name] = overlay.Diagnostics()
344×
49
        }
344×
50
        return diagnostics
174×
51
}
52

53
func (osw *OverlaySwitch) InvalidateRoutes() {
408×
54
        for _, overlay := range osw.overlays {
1,215×
55
                overlay.InvalidateRoutes()
807×
56
        }
807×
57
}
58

59
func (osw *OverlaySwitch) InvalidateShortIDs() {
1×
60
        for _, overlay := range osw.overlays {
3×
61
                overlay.InvalidateShortIDs()
2×
62
        }
2×
63
}
64

65
func (osw *OverlaySwitch) StartConsumingPackets(localPeer *mesh.Peer, peers *mesh.Peers, consumer OverlayConsumer) error {
93×
66
        for _, overlay := range osw.overlays {
277×
67
                if err := overlay.StartConsumingPackets(localPeer, peers, consumer); err != nil {
184×
68
                        return err
!
69
                }
!
70
        }
71
        return nil
93×
72
}
73

74
type namedOverlay struct {
75
        NetworkOverlay
76
        name string
77
}
78

79
// Find the common set of overlays supported by both sides, with the
80
// ordering being the same on both sides too.
81
func (osw *OverlaySwitch) commonOverlays(params mesh.OverlayConnectionParams) ([]namedOverlay, error) {
86×
82
        var peerOverlays []string
86×
83
        if overlaysFeature, present := params.Features["Overlays"]; present {
172×
84
                peerOverlays = strings.Split(overlaysFeature, " ")
86×
85
        }
86×
86

87
        common := make(map[string]NetworkOverlay)
86×
88
        for _, name := range peerOverlays {
256×
89
                if overlay := osw.overlays[name]; overlay != nil {
340×
90
                        common[name] = overlay
170×
91
                }
170×
92
        }
93

94
        if len(common) == 0 {
86×
95
                return nil, fmt.Errorf("no overlays in common with peer")
!
96
        }
!
97

98
        // we order them according to the connecting node
99
        ordering := osw.overlayNames
86×
100
        if params.RemoteAddr == nil {
86×
101
                // we are the connectee
!
102
                ordering = peerOverlays
!
103
        }
!
104

105
        res := make([]namedOverlay, 0, len(common))
86×
106
        for _, name := range ordering {
256×
107
                overlay := common[name]
170×
108
                if overlay != nil {
340×
109
                        res = append(res, namedOverlay{overlay, name})
170×
110
                }
170×
111
        }
112

113
        // we use bytes to represent forwarder indices in control
114
        // messages, so just in case:
115
        if len(res) > 256 {
86×
116
                res = res[:256]
!
117
        }
!
118

119
        return res, nil
86×
120
}
121

122
type overlaySwitchForwarder struct {
123
        remotePeer *mesh.Peer
124

125
        lock sync.Mutex
126

127
        // the index of the forwarder to send on
128
        best int
129

130
        // the subsidiary forwarders
131
        forwarders []subForwarder
132

133
        // closed to tell the main goroutine to stop
134
        stopChan chan<- struct{}
135

136
        alreadyEstablished bool
137
        establishedChan    chan struct{}
138
        errorChan          chan error
139
}
140

141
// A subsidiary forwarder
142
type subForwarder struct {
143
        fwd         OverlayForwarder
144
        overlayName string
145

146
        // Has the forwarder signalled that it is established?
147
        established bool
148

149
        // closed to tell the forwarder monitor goroutine to stop
150
        stopChan chan<- struct{}
151
}
152

153
// An event from a subsidiary forwarder
154
type subForwarderEvent struct {
155
        // the index of the forwarder
156
        index int
157

158
        // is this an "established" event?
159
        established bool
160

161
        // is this an error event?
162
        err error
163
}
164

165
func (osw *OverlaySwitch) PrepareConnection(params mesh.OverlayConnectionParams) (mesh.OverlayConnection, error) {
86×
166
        if _, present := params.Features["Overlays"]; !present && osw.compatOverlay != nil {
86×
167
                return osw.compatOverlay.PrepareConnection(params)
!
168
        }
!
169

170
        overlays, err := osw.commonOverlays(params)
86×
171
        if err != nil {
86×
172
                return nil, err
!
173
        }
!
174

175
        // channel to carry events from the subforwarder monitors to
176
        // the main goroutine
177
        eventsChan := make(chan subForwarderEvent)
86×
178

86×
179
        // channel to stop the main goroutine
86×
180
        stopChan := make(chan struct{})
86×
181

86×
182
        fwd := &overlaySwitchForwarder{
86×
183
                remotePeer: params.RemotePeer,
86×
184

86×
185
                best:       -1,
86×
186
                forwarders: make([]subForwarder, len(overlays)),
86×
187
                stopChan:   stopChan,
86×
188

86×
189
                establishedChan: make(chan struct{}),
86×
190
                errorChan:       make(chan error, 1),
86×
191
        }
86×
192

86×
193
        origSendControlMessage := params.SendControlMessage
86×
194
        for i, overlay := range overlays {
256×
195
                // Prefix control messages to indicate the relevant forwarder
170×
196
                index := i
170×
197
                params.SendControlMessage = func(tag byte, msg []byte) error {
477×
198
                        xmsg := make([]byte, len(msg)+2)
307×
199
                        xmsg[0] = byte(index)
307×
200
                        xmsg[1] = tag
307×
201
                        copy(xmsg[2:], msg)
307×
202
                        return origSendControlMessage(mesh.ProtocolOverlayControlMsg, xmsg)
307×
203
                }
307×
204

205
                subConn, err := overlay.PrepareConnection(params)
170×
206
                if err != nil {
172×
207
                        log.Infof("Unable to use %s for connection to %s(%s): %s",
2×
208
                                overlay.name,
2×
209
                                params.RemotePeer.Name,
2×
210
                                params.RemotePeer.NickName,
2×
211
                                err)
2×
212
                        // failed to start subforwarder - record overlay name and continue
2×
213
                        fwd.forwarders[i] = subForwarder{
2×
214
                                overlayName: overlay.name,
2×
215
                        }
2×
216
                        continue
2×
217
                }
218
                subFwd := subConn.(OverlayForwarder)
168×
219

168×
220
                subStopChan := make(chan struct{})
168×
221
                go monitorForwarder(i, eventsChan, subStopChan, subFwd)
168×
222
                fwd.forwarders[i] = subForwarder{
168×
223
                        fwd:         subFwd,
168×
224
                        overlayName: overlay.name,
168×
225
                        stopChan:    subStopChan,
168×
226
                }
168×
227
        }
228

229
        fwd.chooseBest()
86×
230
        go fwd.run(eventsChan, stopChan)
86×
231
        return fwd, nil
86×
232
}
233

234
func monitorForwarder(index int, eventsChan chan<- subForwarderEvent, stopChan <-chan struct{}, fwd OverlayForwarder) {
168×
235
        establishedChan := fwd.EstablishedChannel()
168×
236
loop:
168×
237
        for {
484×
238
                e := subForwarderEvent{index: index}
316×
239

316×
240
                select {
316×
241
                case <-establishedChan:
148×
242
                        e.established = true
148×
243
                        establishedChan = nil
148×
244

UNCOV
245
                case err := <-fwd.ErrorChannel():
!
UNCOV
246
                        e.err = err
!
247

248
                case <-stopChan:
96×
249
                        break loop
96×
250
                }
251

252
                select {
148×
253
                case eventsChan <- e:
148×
254
                case <-stopChan:
!
255
                        break loop
!
256
                }
257

258
                if e.err != nil {
148×
UNCOV
259
                        break loop
!
260
                }
261
        }
262

263
        fwd.Stop()
96×
264
}
265

266
func (fwd *overlaySwitchForwarder) run(eventsChan <-chan subForwarderEvent, stopChan <-chan struct{}) {
86×
267
loop:
86×
268
        for {
320×
269
                select {
234×
270
                case <-stopChan:
!
271
                        break loop
!
272

273
                case e := <-eventsChan:
148×
274
                        switch {
148×
275
                        case e.established:
148×
276
                                fwd.established(e.index)
148×
UNCOV
277
                        case e.err != nil:
!
UNCOV
278
                                fwd.error(e.index, e.err)
!
279
                        }
280
                }
281
        }
282

283
        fwd.lock.Lock()
!
284
        defer fwd.lock.Unlock()
!
285
        fwd.stopFrom(0)
!
286
}
287

288
func (fwd *overlaySwitchForwarder) established(index int) {
148×
289
        fwd.lock.Lock()
148×
290
        defer fwd.lock.Unlock()
148×
291

148×
292
        fwd.forwarders[index].established = true
148×
293

148×
294
        if !fwd.alreadyEstablished {
224×
295
                fwd.alreadyEstablished = true
76×
296
                close(fwd.establishedChan)
76×
297
        }
76×
298

299
        fwd.chooseBest()
148×
300
}
301

302
func (fwd *overlaySwitchForwarder) logPrefix() string {
152×
303
        return fmt.Sprintf("overlay_switch ->[%s] ", fwd.remotePeer)
152×
304
}
152×
305

UNCOV
306
func (fwd *overlaySwitchForwarder) error(index int, err error) {
!
UNCOV
307
        fwd.lock.Lock()
!
UNCOV
308
        defer fwd.lock.Unlock()
!
UNCOV
309

!
UNCOV
310
        log.Info(fwd.logPrefix(), fwd.forwarders[index].overlayName, " ", err)
!
UNCOV
311
        fwd.forwarders[index].fwd = nil
!
UNCOV
312
        fwd.chooseBest()
!
UNCOV
313
}
!
314

315
func (fwd *overlaySwitchForwarder) stopFrom(index int) {
49×
316
        for index < len(fwd.forwarders) {
146×
317
                subFwd := &fwd.forwarders[index]
97×
318
                if subFwd.fwd != nil {
193×
319
                        subFwd.fwd = nil
96×
320
                        close(subFwd.stopChan)
96×
321
                }
96×
322
                index++
97×
323
        }
324
}
325

326
func (fwd *overlaySwitchForwarder) chooseBest() {
234×
327
        // the most preferred established forwarder is the best
234×
328
        // otherwise, the most preferred working forwarder is the best
234×
329
        bestEstablished := -1
234×
330
        bestWorking := -1
234×
331

234×
332
        for i := range fwd.forwarders {
698×
333
                subFwd := &fwd.forwarders[i]
464×
334
                if subFwd.fwd == nil {
468×
335
                        continue
4×
336
                }
337

338
                if bestWorking < 0 {
694×
339
                        bestWorking = i
234×
340
                }
234×
341

342
                if bestEstablished < 0 && subFwd.established {
608×
343
                        bestEstablished = i
148×
344
                }
148×
345
        }
346

347
        best := bestEstablished
234×
348
        if best < 0 {
320×
349
                if bestWorking < 0 {
86×
350
                        select {
!
351
                        case fwd.errorChan <- fmt.Errorf("no working forwarders to %s", fwd.remotePeer):
!
352
                        default:
!
353
                        }
354

355
                        return
!
356
                }
357

358
                best = bestWorking
86×
359
        }
360

361
        if fwd.best != best {
386×
362
                fwd.best = best
152×
363
                log.Info(fwd.logPrefix(), "using ", fwd.forwarders[best].overlayName)
152×
364
        }
152×
365
}
366

367
func (fwd *overlaySwitchForwarder) Confirm() {
81×
368
        var forwarders []OverlayForwarder
81×
369

81×
370
        fwd.lock.Lock()
81×
371
        for _, subFwd := range fwd.forwarders {
241×
372
                if subFwd.fwd != nil {
318×
373
                        forwarders = append(forwarders, subFwd.fwd)
158×
374
                }
158×
375
        }
376
        fwd.lock.Unlock()
81×
377

81×
378
        for _, subFwd := range forwarders {
239×
379
                subFwd.Confirm()
158×
380
        }
158×
381
}
382

383
func (fwd *overlaySwitchForwarder) Forward(pk ForwardPacketKey) FlowOp {
644×
384
        fwd.lock.Lock()
644×
385

644×
386
        if fwd.best >= 0 {
1,288×
387
                for i := fwd.best; i < len(fwd.forwarders); i++ {
1,288×
388
                        best := fwd.forwarders[i].fwd
644×
389
                        if best != nil {
1,288×
390
                                fwd.lock.Unlock()
644×
391
                                if op := best.Forward(pk); op != nil {
1,288×
392
                                        return op
644×
393
                                }
644×
394
                                fwd.lock.Lock()
!
395
                        }
396
                }
397
        }
398

399
        fwd.lock.Unlock()
!
400

!
401
        return DiscardingFlowOp{}
!
402
}
403

404
func (fwd *overlaySwitchForwarder) EstablishedChannel() <-chan struct{} {
81×
405
        return fwd.establishedChan
81×
406
}
81×
407

408
func (fwd *overlaySwitchForwarder) ErrorChannel() <-chan error {
81×
409
        return fwd.errorChan
81×
410
}
81×
411

412
func (fwd *overlaySwitchForwarder) Stop() {
49×
413
        fwd.lock.Lock()
49×
414
        defer fwd.lock.Unlock()
49×
415
        fwd.stopFrom(0)
49×
416
}
49×
417

418
func (fwd *overlaySwitchForwarder) ControlMessage(tag byte, msg []byte) {
305×
419
        fwd.lock.Lock()
305×
420
        subFwd := fwd.forwarders[msg[0]].fwd
305×
421
        fwd.lock.Unlock()
305×
422
        if subFwd != nil {
610×
423
                subFwd.ControlMessage(msg[1], msg[2:])
305×
424
        }
305×
425
}
426

427
func (fwd *overlaySwitchForwarder) DisplayName() string {
66×
428
        var best OverlayForwarder
66×
429

66×
430
        fwd.lock.Lock()
66×
431
        if fwd.best >= 0 {
132×
432
                best = fwd.forwarders[fwd.best].fwd
66×
433
        }
66×
434
        fwd.lock.Unlock()
66×
435

66×
436
        if best != nil {
132×
437
                return best.DisplayName()
66×
438
        }
66×
439

440
        return "none"
!
441
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc