Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #4371

9 Nov 2015 - 15:51 coverage increased (+0.1%) to 75.78%
#4371

Pull #1648

circleci

2f1e5f233f2b7283a9bf3277e75bf30a?size=18&default=identiconrade
refactor: move NetworkRouterStatus into its own file
Pull Request #1648: separate the control and data planes

134 of 198 new or added lines in 11 files covered. (67.68%)

56 existing lines in 6 files now uncovered.

6483 of 8555 relevant lines covered (75.78%)

192419.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.92
/router/peers.go
1
package router
2

3
import (
4
        "bytes"
5
        "encoding/gob"
6
        "fmt"
7
        "io"
8
        "math/rand"
9
        "sync"
10
)
11

12
var void = struct{}{}
13

14
type Peers struct {
15
        sync.RWMutex
16
        ourself   *LocalPeer
17
        byName    map[PeerName]*Peer
18
        byShortID map[PeerShortID]ShortIDPeers
19
        onGC      []func(*Peer)
20

21
        // Called when the mapping from short ids to peers changes
22
        onInvalidateShortIDs []func()
23
}
24

25
type ShortIDPeers struct {
26
        // If we know about a single peer with the short id, this is
27
        // that peer.  If there is a collision, this is the peer with
28
        // the lowest Name.
29
        peer *Peer
30

31
        // In case of a collision, this holds the other peers.
32
        others []*Peer
33
}
34

35
type UnknownPeerError struct {
36
        Name PeerName
37
}
38

39
func (upe UnknownPeerError) Error() string {
3×
40
        return fmt.Sprint("Reference to unknown peer ", upe.Name)
3×
41
}
3×
42

43
type NameCollisionError struct {
44
        Name PeerName
45
}
46

UNCOV
47
func (nce NameCollisionError) Error() string {
!
UNCOV
48
        return fmt.Sprint("Multiple peers found with same name: ", nce.Name)
!
UNCOV
49
}
!
50

51
type PeerNameSet map[PeerName]struct{}
52

53
type ConnectionSummary struct {
54
        NameByte      []byte
55
        RemoteTCPAddr string
56
        Outbound      bool
57
        Established   bool
58
}
59

60
// Pending notifications due to changes to Peers that need to be sent
61
// out once the Peers is unlocked.
62
type PeersPendingNotifications struct {
63
        // Peers that have been GCed
64
        removed []*Peer
65

66
        // The mapping from shorts ids to peers changed
67
        invalidateShortIDs bool
68

69
        // The local short ID needs reassigning due to a collision
70
        reassignLocalShortID bool
71
}
72

73
func NewPeers(ourself *LocalPeer) *Peers {
10,201×
74
        peers := &Peers{
10,201×
75
                ourself:   ourself,
10,201×
76
                byName:    make(map[PeerName]*Peer),
10,201×
77
                byShortID: make(map[PeerShortID]ShortIDPeers),
10,201×
78
        }
10,201×
79
        peers.FetchWithDefault(ourself.Peer)
10,201×
80
        return peers
10,201×
81
}
10,201×
82

83
func (peers *Peers) OnGC(callback func(*Peer)) {
169×
84
        peers.Lock()
169×
85
        defer peers.Unlock()
169×
86

169×
87
        // Although the array underlying peers.onGC might be accessed
169×
88
        // without holding the lock in unlockAndNotify, we don't
169×
89
        // support removing callbacks, so a simple append here is
169×
90
        // safe.
169×
91
        peers.onGC = append(peers.onGC, callback)
169×
92
}
169×
93

94
func (peers *Peers) OnInvalidateShortIDs(callback func()) {
49×
95
        peers.Lock()
49×
96
        defer peers.Unlock()
49×
97

49×
98
        // Safe, as in OnGC
49×
99
        peers.onInvalidateShortIDs = append(peers.onInvalidateShortIDs, callback)
49×
100
}
49×
101

102
func (peers *Peers) unlockAndNotify(pending *PeersPendingNotifications) {
24,424×
103
        broadcastLocalPeer := pending.reassignLocalShortID && peers.reassignLocalShortID(pending)
24,424×
104
        onGC := peers.onGC
24,424×
105
        onInvalidateShortIDs := peers.onInvalidateShortIDs
24,424×
106
        peers.Unlock()
24,424×
107

24,424×
108
        if pending.removed != nil {
950×
109
                for _, callback := range onGC {
58×
110
                        for _, peer := range pending.removed {
64×
111
                                callback(peer)
64×
112
                        }
64×
113
                }
114
        }
115

116
        if pending.invalidateShortIDs {
427×
UNCOV
117
                for _, callback := range onInvalidateShortIDs {
!
UNCOV
118
                        callback()
!
UNCOV
119
                }
!
120
        }
121

122
        if broadcastLocalPeer {
17×
123
                peers.ourself.broadcastPeerUpdate()
17×
124
        }
17×
125
}
126

127
func (peers *Peers) addByShortID(peer *Peer, pending *PeersPendingNotifications) {
47,479×
UNCOV
128
        if !peer.HasShortID {
!
UNCOV
129
                return
!
UNCOV
130
        }
!
131

132
        entry, ok := peers.byShortID[peer.ShortID]
47,479×
133
        if !ok {
44,039×
134
                entry = ShortIDPeers{peer: peer}
44,039×
135
        } else if entry.peer == nil {
412×
136
                // This short ID is free, but was used in the past.
412×
137
                // Because we are reusing it, it's an invalidation
412×
138
                // event.
412×
139
                entry.peer = peer
412×
140
                pending.invalidateShortIDs = true
412×
141
        } else if peer.Name < entry.peer.Name {
1,401×
142
                // Short ID collision, this peer becomes the principal
1,401×
143
                // peer for the short ID, bumping the previous one
1,401×
144
                // into others.
1,401×
145

1,401×
146
                if entry.peer == peers.ourself.Peer {
21×
147
                        // The bumped peer is peers.ourself, so we
21×
148
                        // need to look for a new short id
21×
149
                        pending.reassignLocalShortID = true
21×
150
                }
21×
151

152
                entry.others = append(entry.others, entry.peer)
1,401×
153
                entry.peer = peer
1,401×
154
                pending.invalidateShortIDs = true
1,401×
155
        } else {
1,627×
156
                // Short ID collision, this peer is secondary
1,627×
157
                entry.others = append(entry.others, peer)
1,627×
158
        }
1,627×
159

160
        peers.byShortID[peer.ShortID] = entry
47,479×
161
}
162

163
func (peers *Peers) deleteByShortID(peer *Peer, pending *PeersPendingNotifications) {
17,401×
UNCOV
164
        if !peer.HasShortID {
!
UNCOV
165
                return
!
UNCOV
166
        }
!
167

168
        entry := peers.byShortID[peer.ShortID]
17,401×
169
        var otherIndex int
17,401×
170

17,401×
171
        if peer != entry.peer {
1,628×
172
                // peer is secondary, find its index in others
1,628×
173
                otherIndex = -1
1,628×
174

1,628×
175
                for i, other := range entry.others {
1,998×
176
                        if peer == other {
1,628×
177
                                otherIndex = i
1,628×
178
                                break
1,628×
179
                        }
180
                }
181

UNCOV
182
                if otherIndex < 0 {
!
UNCOV
183
                        return
!
UNCOV
184
                }
!
185
        } else if len(entry.others) != 0 {
1,400×
186
                // need to find the peer with the lowest name to
1,400×
187
                // become the new principal one
1,400×
188
                otherIndex = 0
1,400×
189
                minName := entry.others[0].Name
1,400×
190

1,400×
191
                for i := 1; i < len(entry.others); i++ {
351×
192
                        otherName := entry.others[i].Name
351×
193
                        if otherName < minName {
222×
194
                                minName = otherName
222×
195
                                otherIndex = i
222×
196
                        }
222×
197
                }
198

199
                entry.peer = entry.others[otherIndex]
1,400×
200
                pending.invalidateShortIDs = true
1,400×
201
        } else {
14,373×
202
                // This is the last peer with the short id.  We clear
14,373×
203
                // the entry, don't delete it, in order to detect when
14,373×
204
                // it gets re-used.
14,373×
205
                peers.byShortID[peer.ShortID] = ShortIDPeers{}
14,373×
206
                return
14,373×
207
        }
14,373×
208

209
        entry.others[otherIndex] = entry.others[len(entry.others)-1]
3,028×
210
        entry.others = entry.others[:len(entry.others)-1]
3,028×
211
        peers.byShortID[peer.ShortID] = entry
3,028×
212
}
213

214
func (peers *Peers) reassignLocalShortID(pending *PeersPendingNotifications) bool {
27×
215
        newShortID, ok := peers.chooseShortID()
27×
216
        if ok {
23×
217
                peers.setLocalShortID(newShortID, pending)
23×
218
                return true
23×
219
        }
23×
220

221
        // Otherwise we'll try again later on in garbageColleect
222
        return false
4×
223
}
224

225
func (peers *Peers) setLocalShortID(newShortID PeerShortID, pending *PeersPendingNotifications) {
8,223×
226
        peers.deleteByShortID(peers.ourself.Peer, pending)
8,223×
227
        peers.ourself.setShortID(newShortID)
8,223×
228
        peers.addByShortID(peers.ourself.Peer, pending)
8,223×
229
}
8,223×
230

231
// Choose an available short id at random
232
func (peers *Peers) chooseShortID() (PeerShortID, bool) {
27×
233
        rng := rand.New(rand.NewSource(int64(randUint64())))
27×
234

27×
235
        // First, just try picking some short ids at random, and
27×
236
        // seeing if they are available:
27×
237
        for i := 0; i < 10; i++ {
189×
238
                shortID := PeerShortID(rng.Intn(1 << PeerShortIDBits))
189×
239
                if peers.byShortID[shortID].peer == nil {
10×
240
                        return shortID, true
10×
241
                }
10×
242
        }
243

244
        // Looks like most short ids are used.  So count the number of
245
        // unused ones, and pick one at random.
246
        available := int(1 << PeerShortIDBits)
17×
247
        for _, entry := range peers.byShortID {
69,018×
248
                if entry.peer != nil {
68,996×
249
                        available--
68,996×
250
                }
68,996×
251
        }
252

253
        if available == 0 {
4×
254
                // All short ids are used.
4×
255
                return 0, false
4×
256
        }
4×
257

258
        n := rng.Intn(available)
13×
259
        var i PeerShortID
13×
260
        for {
46,047×
261
                if peers.byShortID[i].peer == nil {
249×
262
                        if n == 0 {
13×
263
                                return PeerShortID(i), true
13×
264
                        }
13×
265

266
                        n--
236×
267
                }
268

269
                i++
46,034×
270
        }
271
}
272

273
func (peers *Peers) FetchWithDefault(peer *Peer) *Peer {
21,358×
274
        peers.Lock()
21,358×
275
        var pending PeersPendingNotifications
21,358×
276
        defer peers.unlockAndNotify(&pending)
21,358×
277

21,358×
278
        if existingPeer, found := peers.byName[peer.Name]; found {
40×
279
                existingPeer.localRefCount++
40×
280
                return existingPeer
40×
281
        }
40×
282

283
        peers.byName[peer.Name] = peer
21,318×
284
        peers.addByShortID(peer, &pending)
21,318×
285
        peer.localRefCount++
21,318×
286
        return peer
21,318×
287
}
288

289
func (peers *Peers) Fetch(name PeerName) *Peer {
1,592×
290
        peers.RLock()
1,592×
291
        defer peers.RUnlock()
1,592×
292
        return peers.byName[name]
1,592×
293
}
1,592×
294

295
func (peers *Peers) FetchAndAddRef(name PeerName) *Peer {
2×
296
        peers.Lock()
2×
297
        defer peers.Unlock()
2×
298
        peer := peers.byName[name]
2×
299
        if peer != nil {
2×
300
                peer.localRefCount++
2×
301
        }
2×
302
        return peer
2×
303
}
304

305
func (peers *Peers) FetchByShortID(shortID PeerShortID) *Peer {
1,548×
306
        peers.RLock()
1,548×
307
        defer peers.RUnlock()
1,548×
308
        return peers.byShortID[shortID].peer
1,548×
309
}
1,548×
310

311
func (peers *Peers) Dereference(peer *Peer) {
964×
312
        peers.Lock()
964×
313
        defer peers.Unlock()
964×
314
        peer.localRefCount--
964×
315
}
964×
316

317
func (peers *Peers) ForEach(fun func(*Peer)) {
223×
318
        peers.RLock()
223×
319
        defer peers.RUnlock()
223×
320
        for _, peer := range peers.byName {
358×
321
                fun(peer)
358×
322
        }
358×
323
}
324

325
// Merge an incoming update with our own topology.
326
//
327
// We add peers hitherto unknown to us, and update peers for which the
328
// update contains a more recent version than known to us. The return
329
// value is a) a representation of the received update, and b) an
330
// "improved" update containing just these new/updated elements.
331
func (peers *Peers) ApplyUpdate(update []byte) (PeerNameSet, PeerNameSet, error) {
2,108×
332
        peers.Lock()
2,108×
333
        var pending PeersPendingNotifications
2,108×
334
        defer peers.unlockAndNotify(&pending)
2,108×
335

2,108×
336
        newPeers, decodedUpdate, decodedConns, err := peers.decodeUpdate(update)
2,108×
337
        if err != nil {
3×
338
                return nil, nil, err
3×
339
        }
3×
340

341
        // By this point, we know the update doesn't refer to any peers we
342
        // have no knowledge of. We can now apply the update. Start by
343
        // adding in any new peers.
344
        for name, newPeer := range newPeers {
1,538×
345
                peers.byName[name] = newPeer
1,538×
346
                peers.addByShortID(newPeer, &pending)
1,538×
347
        }
1,538×
348

349
        // Now apply the updates
350
        newUpdate := peers.applyUpdate(decodedUpdate, decodedConns, &pending)
2,105×
351
        peers.garbageCollect(&pending)
2,105×
352
        for _, peerRemoved := range pending.removed {
8×
353
                delete(newUpdate, peerRemoved.Name)
8×
354
        }
8×
355

356
        updateNames := make(PeerNameSet)
2,105×
357
        for _, peer := range decodedUpdate {
3,795×
358
                updateNames[peer.Name] = void
3,795×
359
        }
3,795×
360

361
        return updateNames, newUpdate, nil
2,105×
362
}
363

364
func (peers *Peers) Names() PeerNameSet {
2,763×
365
        peers.RLock()
2,763×
366
        defer peers.RUnlock()
2,763×
367

2,763×
368
        names := make(PeerNameSet)
2,763×
369
        for name := range peers.byName {
5,399×
370
                names[name] = void
5,399×
371
        }
5,399×
372
        return names
2,763×
373
}
374

375
func (peers *Peers) EncodePeers(names PeerNameSet) []byte {
2,088×
376
        buf := new(bytes.Buffer)
2,088×
377
        enc := gob.NewEncoder(buf)
2,088×
378
        peers.RLock()
2,088×
379
        defer peers.RUnlock()
2,088×
380
        for name := range names {
3,772×
381
                if peer, found := peers.byName[name]; found {
3,772×
382
                        if peer == peers.ourself.Peer {
2,067×
383
                                peers.ourself.Encode(enc)
2,067×
384
                        } else {
1,705×
385
                                peer.Encode(enc)
1,705×
386
                        }
1,705×
387
                }
388
        }
389
        return buf.Bytes()
2,088×
390
}
391

392
func (peers *Peers) GarbageCollect() {
958×
393
        peers.Lock()
958×
394
        var pending PeersPendingNotifications
958×
395
        defer peers.unlockAndNotify(&pending)
958×
396

958×
397
        peers.garbageCollect(&pending)
958×
398
}
958×
399

400
func (peers *Peers) garbageCollect(pending *PeersPendingNotifications) {
3,063×
401
        peers.ourself.RLock()
3,063×
402
        _, reached := peers.ourself.Routes(nil, false)
3,063×
403
        peers.ourself.RUnlock()
3,063×
404

3,063×
405
        for name, peer := range peers.byName {
16,262×
406
                if _, found := reached[peer.Name]; !found && peer.localRefCount == 0 {
952×
407
                        delete(peers.byName, name)
952×
408
                        peers.deleteByShortID(peer, pending)
952×
409
                        pending.removed = append(pending.removed, peer)
952×
410
                }
952×
411
        }
412

413
        if peers.byShortID[peers.ourself.ShortID].peer != peers.ourself.Peer {
4×
414
                // The local peer doesn't own its short id.  Garbage
4×
415
                // collection might have freed some up, so try to
4×
416
                // reassign.
4×
417
                pending.reassignLocalShortID = true
4×
418
        }
4×
419
}
420

421
func (peers *Peers) decodeUpdate(update []byte) (newPeers map[PeerName]*Peer, decodedUpdate []*Peer, decodedConns [][]ConnectionSummary, err error) {
2,108×
422
        newPeers = make(map[PeerName]*Peer)
2,108×
423
        decodedUpdate = []*Peer{}
2,108×
424
        decodedConns = [][]ConnectionSummary{}
2,108×
425

2,108×
426
        decoder := gob.NewDecoder(bytes.NewReader(update))
2,108×
427

2,108×
428
        for {
5,908×
429
                peerSummary, connSummaries, decErr := decodePeer(decoder)
5,908×
430
                if decErr == io.EOF {
2,108×
431
                        break
2,108×
UNCOV
432
                } else if decErr != nil {
!
UNCOV
433
                        err = decErr
!
UNCOV
434
                        return
!
UNCOV
435
                }
!
436
                newPeer := NewPeerFromSummary(peerSummary)
3,800×
437
                decodedUpdate = append(decodedUpdate, newPeer)
3,800×
438
                decodedConns = append(decodedConns, connSummaries)
3,800×
439
                existingPeer, found := peers.byName[newPeer.Name]
3,800×
440
                if !found {
1,538×
441
                        newPeers[newPeer.Name] = newPeer
1,538×
UNCOV
442
                } else if existingPeer.UID != newPeer.UID {
!
UNCOV
443
                        err = NameCollisionError{Name: newPeer.Name}
!
UNCOV
444
                        return
!
UNCOV
445
                }
!
446
        }
447

448
        for _, connSummaries := range decodedConns {
3,800×
449
                for _, connSummary := range connSummaries {
1,949×
450
                        remoteName := PeerNameFromBin(connSummary.NameByte)
1,949×
451
                        if _, found := newPeers[remoteName]; found {
1,536×
452
                                continue
1,536×
453
                        }
454
                        if _, found := peers.byName[remoteName]; found {
410×
455
                                continue
410×
456
                        }
457
                        // Update refers to a peer which we have no knowledge
458
                        // of. Thus we can't apply the update. Abort.
459
                        err = UnknownPeerError{remoteName}
3×
460
                        return
3×
461
                }
462
        }
463
        return
2,105×
464
}
465

466
func (peers *Peers) applyUpdate(decodedUpdate []*Peer, decodedConns [][]ConnectionSummary, pending *PeersPendingNotifications) PeerNameSet {
2,105×
467
        newUpdate := make(PeerNameSet)
2,105×
468
        for idx, newPeer := range decodedUpdate {
3,795×
469
                connSummaries := decodedConns[idx]
3,795×
470
                name := newPeer.Name
3,795×
471
                // guaranteed to find peer in the peers.byName
3,795×
472
                peer := peers.byName[name]
3,795×
473
                if peer != newPeer &&
3,795×
474
                        (peer == peers.ourself.Peer || peer.Version >= newPeer.Version) {
238×
475
                        // Nobody but us updates us. And if we know more about a
238×
476
                        // peer than what's in the the update, we ignore the
238×
477
                        // latter.
238×
478
                        continue
238×
479
                }
480
                // If we're here, either it was a new peer, or the update has
481
                // more info about the peer than we do. Either case, we need
482
                // to set version and conns and include the updated peer in
483
                // the outgoing update.
484

485
                // Can peer have been updated by anyone else in the mean time?
486
                // No - we know that peer is not ourself, so the only prospect
487
                // for an update would be someone else calling
488
                // router.Peers.ApplyUpdate. But ApplyUpdate takes the Lock on
489
                // the router.Peers, so there can be no race here.
490
                peer.Version = newPeer.Version
3,557×
491
                peer.connections = makeConnsMap(peer, connSummaries, peers.byName)
3,557×
492

3,557×
493
                if newPeer.ShortID != peer.ShortID {
8×
494
                        peers.deleteByShortID(peer, pending)
8×
495
                        peer.ShortID = newPeer.ShortID
8×
496
                        peers.addByShortID(peer, pending)
8×
497
                }
8×
498

499
                newUpdate[name] = void
3,557×
500
        }
501

502
        return newUpdate
2,105×
503
}
504

505
func (peer *Peer) Encode(enc *gob.Encoder) {
3,772×
506
        checkFatal(enc.Encode(peer.PeerSummary))
3,772×
507

3,772×
508
        connSummaries := []ConnectionSummary{}
3,772×
509
        for _, conn := range peer.connections {
1,907×
510
                connSummaries = append(connSummaries, ConnectionSummary{
1,907×
511
                        conn.Remote().NameByte,
1,907×
512
                        conn.RemoteTCPAddr(),
1,907×
513
                        conn.Outbound(),
1,907×
514
                        conn.Established(),
1,907×
515
                })
1,907×
516
        }
1,907×
517

518
        checkFatal(enc.Encode(connSummaries))
3,772×
519
}
520

521
func (peer *LocalPeer) Encode(enc *gob.Encoder) {
2,067×
522
        peer.RLock()
2,067×
523
        defer peer.RUnlock()
2,067×
524
        peer.Peer.Encode(enc)
2,067×
525
}
2,067×
526

527
func decodePeer(dec *gob.Decoder) (peerSummary PeerSummary, connSummaries []ConnectionSummary, err error) {
5,908×
528
        if err = dec.Decode(&peerSummary); err != nil {
2,108×
529
                return
2,108×
530
        }
2,108×
UNCOV
531
        if err = dec.Decode(&connSummaries); err != nil {
!
UNCOV
532
                return
!
UNCOV
533
        }
!
534
        return
3,800×
535
}
536

537
func makeConnsMap(peer *Peer, connSummaries []ConnectionSummary, byName map[PeerName]*Peer) map[PeerName]Connection {
3,557×
538
        conns := make(map[PeerName]Connection)
3,557×
539
        for _, connSummary := range connSummaries {
1,708×
540
                name := PeerNameFromBin(connSummary.NameByte)
1,708×
541
                remotePeer := byName[name]
1,708×
542
                conn := NewRemoteConnection(peer, remotePeer, connSummary.RemoteTCPAddr, connSummary.Outbound, connSummary.Established)
1,708×
543
                conns[name] = conn
1,708×
544
        }
1,708×
545
        return conns
3,557×
546
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc