Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #5287

29 Jan 2016 - 16:32 coverage decreased (-0.08%) to 75.358%
#5287

Pull #1879

circleci

94ccfc629c6862b5950de3512742bcae?size=18&default=identiconbboreham
Wait for plugin status via unix socket
Pull Request #1879: Improve visibility of startup problems

6682 of 8867 relevant lines covered (75.36%)

92936.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/mesh/local_peer.go
1
package mesh
2

3
import (
4
        "fmt"
5
        "net"
6
        "sync"
7
        "time"
8
)
9

10
type LocalPeer struct {
11
        sync.RWMutex
12
        *Peer
13
        router     *Router
14
        actionChan chan<- LocalPeerAction
15
}
16

17
type LocalPeerAction func()
18

19
func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer {
5,127×
20
        actionChan := make(chan LocalPeerAction, ChannelSize)
5,127×
21
        peer := &LocalPeer{
5,127×
22
                Peer:       NewPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
5,127×
23
                router:     router,
5,127×
24
                actionChan: actionChan,
5,127×
25
        }
5,127×
26
        go peer.actorLoop(actionChan)
5,127×
27
        return peer
5,127×
28
}
5,127×
29

30
func (peer *LocalPeer) Connections() ConnectionSet {
234×
31
        connections := make(ConnectionSet)
234×
32
        peer.RLock()
234×
33
        defer peer.RUnlock()
234×
34
        for _, conn := range peer.connections {
246×
35
                connections[conn] = void
246×
36
        }
246×
37
        return connections
234×
38
}
39

40
func (peer *LocalPeer) ConnectionTo(name PeerName) (Connection, bool) {
580×
41
        peer.RLock()
580×
42
        defer peer.RUnlock()
580×
43
        conn, found := peer.connections[name]
580×
44
        return conn, found // yes, you really can't inline that. FFS.
580×
45
}
580×
46

47
func (peer *LocalPeer) ConnectionsTo(names []PeerName) []Connection {
546×
48
        conns := make([]Connection, 0, len(names))
546×
49
        peer.RLock()
546×
50
        defer peer.RUnlock()
546×
51
        for _, name := range names {
822×
52
                conn, found := peer.connections[name]
822×
53
                // Again, !found could just be due to a race.
822×
54
                if found {
822×
55
                        conns = append(conns, conn)
822×
56
                }
822×
57
        }
58
        return conns
546×
59
}
60

61
func (peer *LocalPeer) CreateConnection(peerAddr string, acceptNewPeer bool) error {
37×
62
        if err := peer.checkConnectionLimit(); err != nil {
!
63
                return err
!
64
        }
!
65
        tcpAddr, err := net.ResolveTCPAddr("tcp4", peerAddr)
37×
66
        if err != nil {
!
67
                return err
!
68
        }
!
69
        tcpConn, err := net.DialTCP("tcp4", nil, tcpAddr)
37×
70
        if err != nil {
10×
71
                return err
10×
72
        }
10×
73
        connRemote := NewRemoteConnection(peer.Peer, nil, tcpConn.RemoteAddr().String(), true, false)
24×
74
        StartLocalConnection(connRemote, tcpConn, peer.router, acceptNewPeer)
24×
75
        return nil
24×
76
}
77

78
// ACTOR client API
79

80
// Sync.
81
func (peer *LocalPeer) AddConnection(conn *LocalConnection) error {
39×
82
        resultChan := make(chan error)
39×
83
        peer.actionChan <- func() {
39×
84
                resultChan <- peer.handleAddConnection(conn)
39×
85
        }
39×
86
        return <-resultChan
39×
87
}
88

89
// Async.
90
func (peer *LocalPeer) ConnectionEstablished(conn *LocalConnection) {
37×
91
        peer.actionChan <- func() {
37×
92
                peer.handleConnectionEstablished(conn)
37×
93
        }
37×
94
}
95

96
// Sync.
97
func (peer *LocalPeer) DeleteConnection(conn *LocalConnection) {
28×
98
        resultChan := make(chan interface{})
28×
99
        peer.actionChan <- func() {
28×
100
                peer.handleDeleteConnection(conn)
28×
101
                resultChan <- nil
28×
102
        }
28×
103
        <-resultChan
28×
104
}
105

106
// ACTOR server
107

108
func (peer *LocalPeer) actorLoop(actionChan <-chan LocalPeerAction) {
5,127×
109
        gossipTimer := time.Tick(GossipInterval)
5,127×
110
        for {
5,240×
111
                select {
5,240×
112
                case action := <-actionChan:
104×
113
                        action()
104×
114
                case <-gossipTimer:
9×
115
                        peer.router.SendAllGossip()
9×
116
                }
117
        }
118
}
119

120
func (peer *LocalPeer) handleAddConnection(conn Connection) error {
49×
121
        if peer.Peer != conn.Local() {
!
122
                log.Fatal("Attempt made to add connection to peer where peer is not the source of connection")
!
123
        }
!
124
        if conn.Remote() == nil {
!
125
                log.Fatal("Attempt made to add connection to peer with unknown remote peer")
!
126
        }
!
127
        toName := conn.Remote().Name
49×
128
        dupErr := fmt.Errorf("Multiple connections to %s added to %s", conn.Remote(), peer.String())
49×
129
        // deliberately non symmetrical
49×
130
        if dupConn, found := peer.connections[toName]; found {
2×
131
                if dupConn == conn {
!
132
                        return nil
!
133
                }
!
134
                switch conn.BreakTie(dupConn) {
2×
135
                case TieBreakWon:
2×
136
                        dupConn.Shutdown(dupErr)
2×
137
                        peer.handleDeleteConnection(dupConn)
2×
UNCOV
138
                case TieBreakLost:
!
UNCOV
139
                        return dupErr
!
140
                case TieBreakTied:
!
141
                        // oh good grief. Sod it, just kill both of them.
!
142
                        dupConn.Shutdown(dupErr)
!
143
                        peer.handleDeleteConnection(dupConn)
!
144
                        return dupErr
!
145
                }
146
        }
147
        if err := peer.checkConnectionLimit(); err != nil {
!
148
                return err
!
149
        }
!
150
        _, isConnectedPeer := peer.router.Routes.Unicast(toName)
49×
151
        peer.addConnection(conn)
49×
152
        if isConnectedPeer {
3×
153
                conn.Log("connection added")
3×
154
        } else {
46×
155
                conn.Log("connection added (new peer)")
46×
156
                peer.router.SendAllGossipDown(conn)
46×
157
        }
46×
158

159
        peer.router.Routes.Recalculate()
49×
160
        peer.broadcastPeerUpdate(conn.Remote())
49×
161

49×
162
        return nil
49×
163
}
164

165
func (peer *LocalPeer) handleConnectionEstablished(conn Connection) {
47×
166
        if peer.Peer != conn.Local() {
!
167
                log.Fatal("Peer informed of active connection where peer is not the source of connection")
!
168
        }
!
169
        if dupConn, found := peer.connections[conn.Remote().Name]; !found || conn != dupConn {
!
170
                conn.Shutdown(fmt.Errorf("Cannot set unknown connection active"))
!
171
                return
!
172
        }
!
173
        peer.connectionEstablished(conn)
47×
174
        conn.Log("connection fully established")
47×
175

47×
176
        peer.router.Routes.Recalculate()
47×
177
        peer.broadcastPeerUpdate()
47×
178
}
179

180
func (peer *LocalPeer) handleDeleteConnection(conn Connection) {
32×
181
        if peer.Peer != conn.Local() {
!
182
                log.Fatal("Attempt made to delete connection from peer where peer is not the source of connection")
!
183
        }
!
184
        if conn.Remote() == nil {
!
185
                log.Fatal("Attempt made to delete connection to peer with unknown remote peer")
!
186
        }
!
187
        toName := conn.Remote().Name
32×
188
        if connFound, found := peer.connections[toName]; !found || connFound != conn {
10×
189
                return
10×
190
        }
10×
191
        peer.deleteConnection(conn)
22×
192
        conn.Log("connection deleted")
22×
193
        // Must do garbage collection first to ensure we don't send out an
22×
194
        // update with unreachable peers (can cause looping)
22×
195
        peer.router.Peers.GarbageCollect()
22×
196
        peer.router.Routes.Recalculate()
22×
197
        peer.broadcastPeerUpdate()
22×
198
}
199

200
// helpers
201

202
func (peer *LocalPeer) broadcastPeerUpdate(peers ...*Peer) {
129×
203
        // Some tests run without a router.  This should be fixed so
129×
204
        // that the relevant part of Router can be easily run in the
129×
205
        // context of a test, but that will involve significant
129×
206
        // reworking of tests.
129×
207
        if peer.router != nil {
118×
208
                peer.router.BroadcastTopologyUpdate(append(peers, peer.Peer))
118×
209
        }
118×
210
}
211

212
func (peer *LocalPeer) checkConnectionLimit() error {
86×
213
        limit := peer.router.ConnLimit
86×
214
        if 0 != limit && peer.connectionCount() >= limit {
!
215
                return fmt.Errorf("Connection limit reached (%v)", limit)
!
216
        }
!
217
        return nil
86×
218
}
219

220
func (peer *LocalPeer) addConnection(conn Connection) {
5,576×
221
        peer.Lock()
5,576×
222
        defer peer.Unlock()
5,576×
223
        peer.connections[conn.Remote().Name] = conn
5,576×
224
        peer.Version++
5,576×
225
}
5,576×
226

227
func (peer *LocalPeer) deleteConnection(conn Connection) {
487×
228
        peer.Lock()
487×
229
        defer peer.Unlock()
487×
230
        delete(peer.connections, conn.Remote().Name)
487×
231
        peer.Version++
487×
232
}
487×
233

234
func (peer *LocalPeer) connectionEstablished(conn Connection) {
5,571×
235
        peer.Lock()
5,571×
236
        defer peer.Unlock()
5,571×
237
        peer.Version++
5,571×
238
}
5,571×
239

240
func (peer *LocalPeer) connectionCount() int {
76×
241
        peer.RLock()
76×
242
        defer peer.RUnlock()
76×
243
        return len(peer.connections)
76×
244
}
76×
245

246
func (peer *LocalPeer) setShortID(shortID PeerShortID) {
4,114×
247
        peer.Lock()
4,114×
248
        defer peer.Unlock()
4,114×
249
        peer.ShortID = shortID
4,114×
250
        peer.Version++
4,114×
251
}
4,114×
252

253
func (peer *LocalPeer) setVersionBeyond(version uint64) bool {
!
254
        peer.Lock()
!
255
        defer peer.Unlock()
!
256
        if version >= peer.Version {
!
257
                peer.Version = version + 1
!
258
                return true
!
259
        }
!
260
        return false
!
261
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc