Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #4443

16 Nov 2015 - 10:28 coverage increased (+9.2%) to 75.714%
#4443

Pull #1670

circleci

73b5a6163b938fcb0b72d2fa48439665?size=18&default=identiconinercia
Forbid traffic to the Weave port from other containers Make sure we have the Docker bridge IP before adding/removing iptables rules
Pull Request #1670: Forbid traffic to the Weave port from containers

6494 of 8577 relevant lines covered (75.71%)

94673.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.47
/mesh/local_peer.go
1
package mesh
2

3
import (
4
        "fmt"
5
        "net"
6
        "sync"
7
        "time"
8
)
9

10
type LocalPeer struct {
11
        sync.RWMutex
12
        *Peer
13
        router     *Router
14
        actionChan chan<- LocalPeerAction
15
}
16

17
type LocalPeerAction func()
18

19
func NewLocalPeer(name PeerName, nickName string, router *Router) *LocalPeer {
5,128×
20
        actionChan := make(chan LocalPeerAction, ChannelSize)
5,128×
21
        peer := &LocalPeer{
5,128×
22
                Peer:       NewPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()),
5,128×
23
                router:     router,
5,128×
24
                actionChan: actionChan,
5,128×
25
        }
5,128×
26
        go peer.actorLoop(actionChan)
5,128×
27
        return peer
5,128×
28
}
5,128×
29

30
func (peer *LocalPeer) Connections() ConnectionSet {
218×
31
        connections := make(ConnectionSet)
218×
32
        peer.RLock()
218×
33
        defer peer.RUnlock()
218×
34
        for _, conn := range peer.connections {
221×
35
                connections[conn] = void
221×
36
        }
221×
37
        return connections
218×
38
}
39

40
func (peer *LocalPeer) ConnectionTo(name PeerName) (Connection, bool) {
599×
41
        peer.RLock()
599×
42
        defer peer.RUnlock()
599×
43
        conn, found := peer.connections[name]
599×
44
        return conn, found // yes, you really can't inline that. FFS.
599×
45
}
599×
46

47
func (peer *LocalPeer) ConnectionsTo(names []PeerName) []Connection {
476×
48
        conns := make([]Connection, 0, len(names))
476×
49
        peer.RLock()
476×
50
        defer peer.RUnlock()
476×
51
        for _, name := range names {
753×
52
                conn, found := peer.connections[name]
753×
53
                // Again, !found could just be due to a race.
753×
54
                if found {
753×
55
                        conns = append(conns, conn)
753×
56
                }
753×
57
        }
58
        return conns
476×
59
}
60

61
func (peer *LocalPeer) CreateConnection(peerAddr string, acceptNewPeer bool) error {
35×
62
        if err := peer.checkConnectionLimit(); err != nil {
!
63
                return err
!
64
        }
!
65
        tcpAddr, err := net.ResolveTCPAddr("tcp4", peerAddr)
35×
66
        if err != nil {
!
67
                return err
!
68
        }
!
69
        tcpConn, err := net.DialTCP("tcp4", nil, tcpAddr)
35×
70
        if err != nil {
10×
71
                return err
10×
72
        }
10×
73
        connRemote := NewRemoteConnection(peer.Peer, nil, tcpConn.RemoteAddr().String(), true, false)
22×
74
        StartLocalConnection(connRemote, tcpConn, peer.router, acceptNewPeer)
22×
75
        return nil
22×
76
}
77

78
// ACTOR client API
79

80
// Sync.
81
func (peer *LocalPeer) AddConnection(conn *LocalConnection) error {
35×
82
        resultChan := make(chan error)
35×
83
        peer.actionChan <- func() {
35×
84
                resultChan <- peer.handleAddConnection(conn)
35×
85
        }
35×
86
        return <-resultChan
35×
87
}
88

89
// Async.
90
func (peer *LocalPeer) ConnectionEstablished(conn *LocalConnection) {
33×
91
        peer.actionChan <- func() {
33×
92
                peer.handleConnectionEstablished(conn)
33×
93
        }
33×
94
}
95

96
// Sync.
97
func (peer *LocalPeer) DeleteConnection(conn *LocalConnection) {
26×
98
        resultChan := make(chan interface{})
26×
99
        peer.actionChan <- func() {
26×
100
                peer.handleDeleteConnection(conn)
26×
101
                resultChan <- nil
26×
102
        }
26×
103
        <-resultChan
26×
104
}
105

106
// ACTOR server
107

108
func (peer *LocalPeer) actorLoop(actionChan <-chan LocalPeerAction) {
5,128×
109
        gossipTimer := time.Tick(GossipInterval)
5,128×
110
        for {
5,232×
111
                select {
5,232×
112
                case action := <-actionChan:
94×
113
                        action()
94×
114
                case <-gossipTimer:
10×
115
                        peer.router.SendAllGossip()
10×
116
                }
117
        }
118
}
119

120
func (peer *LocalPeer) handleAddConnection(conn Connection) error {
44×
121
        if peer.Peer != conn.Local() {
!
122
                log.Fatal("Attempt made to add connection to peer where peer is not the source of connection")
!
123
        }
!
124
        if conn.Remote() == nil {
!
125
                log.Fatal("Attempt made to add connection to peer with unknown remote peer")
!
126
        }
!
127
        toName := conn.Remote().Name
44×
128
        dupErr := fmt.Errorf("Multiple connections to %s added to %s", conn.Remote(), peer.String())
44×
129
        // deliberately non symmetrical
44×
130
        if dupConn, found := peer.connections[toName]; found {
2×
131
                if dupConn == conn {
!
132
                        return nil
!
133
                }
!
134
                switch conn.BreakTie(dupConn) {
2×
UNCOV
135
                case TieBreakWon:
!
UNCOV
136
                        dupConn.Shutdown(dupErr)
!
UNCOV
137
                        peer.handleDeleteConnection(dupConn)
!
138
                case TieBreakLost:
2×
139
                        return dupErr
2×
140
                case TieBreakTied:
!
141
                        // oh good grief. Sod it, just kill both of them.
!
142
                        dupConn.Shutdown(dupErr)
!
143
                        peer.handleDeleteConnection(dupConn)
!
144
                        return dupErr
!
145
                }
146
        }
147
        if err := peer.checkConnectionLimit(); err != nil {
!
148
                return err
!
149
        }
!
150
        _, isConnectedPeer := peer.router.Routes.Unicast(toName)
42×
151
        peer.addConnection(conn)
42×
152
        if isConnectedPeer {
!
153
                conn.Log("connection added")
!
154
        } else {
42×
155
                conn.Log("connection added (new peer)")
42×
156
                peer.router.SendAllGossipDown(conn)
42×
157
        }
42×
158

159
        peer.router.Routes.Recalculate()
42×
160
        peer.broadcastPeerUpdate(conn.Remote())
42×
161

42×
162
        return nil
42×
163
}
164

165
func (peer *LocalPeer) handleConnectionEstablished(conn Connection) {
42×
166
        if peer.Peer != conn.Local() {
!
167
                log.Fatal("Peer informed of active connection where peer is not the source of connection")
!
168
        }
!
169
        if dupConn, found := peer.connections[conn.Remote().Name]; !found || conn != dupConn {
!
170
                conn.Shutdown(fmt.Errorf("Cannot set unknown connection active"))
!
171
                return
!
172
        }
!
173
        peer.connectionEstablished(conn)
42×
174
        conn.Log("connection fully established")
42×
175

42×
176
        peer.router.Routes.Recalculate()
42×
177
        peer.broadcastPeerUpdate()
42×
178
}
179

180
func (peer *LocalPeer) handleDeleteConnection(conn Connection) {
28×
181
        if peer.Peer != conn.Local() {
!
182
                log.Fatal("Attempt made to delete connection from peer where peer is not the source of connection")
!
183
        }
!
184
        if conn.Remote() == nil {
!
185
                log.Fatal("Attempt made to delete connection to peer with unknown remote peer")
!
186
        }
!
187
        toName := conn.Remote().Name
28×
188
        if connFound, found := peer.connections[toName]; !found || connFound != conn {
10×
189
                return
10×
190
        }
10×
191
        peer.deleteConnection(conn)
18×
192
        conn.Log("connection deleted")
18×
193
        // Must do garbage collection first to ensure we don't send out an
18×
194
        // update with unreachable peers (can cause looping)
18×
195
        peer.router.Peers.GarbageCollect()
18×
196
        peer.router.Routes.Recalculate()
18×
197
        peer.broadcastPeerUpdate()
18×
198
}
199

200
// helpers
201

202
func (peer *LocalPeer) broadcastPeerUpdate(peers ...*Peer) {
112×
203
        // Some tests run without a router.  This should be fixed so
112×
204
        // that the relevant part of Router can be easily run in the
112×
205
        // context of a test, but that will involve significant
112×
206
        // reworking of tests.
112×
207
        if peer.router != nil {
102×
208
                peer.router.BroadcastTopologyUpdate(append(peers, peer.Peer))
102×
209
        }
102×
210
}
211

212
func (peer *LocalPeer) checkConnectionLimit() error {
77×
213
        limit := peer.router.ConnLimit
77×
214
        if 0 != limit && peer.connectionCount() >= limit {
!
215
                return fmt.Errorf("Connection limit reached (%v)", limit)
!
216
        }
!
217
        return nil
77×
218
}
219

220
func (peer *LocalPeer) addConnection(conn Connection) {
5,569×
221
        peer.Lock()
5,569×
222
        defer peer.Unlock()
5,569×
223
        peer.connections[conn.Remote().Name] = conn
5,569×
224
        peer.Version++
5,569×
225
}
5,569×
226

227
func (peer *LocalPeer) deleteConnection(conn Connection) {
483×
228
        peer.Lock()
483×
229
        defer peer.Unlock()
483×
230
        delete(peer.connections, conn.Remote().Name)
483×
231
        peer.Version++
483×
232
}
483×
233

234
func (peer *LocalPeer) connectionEstablished(conn Connection) {
5,566×
235
        peer.Lock()
5,566×
236
        defer peer.Unlock()
5,566×
237
        peer.Version++
5,566×
238
}
5,566×
239

240
func (peer *LocalPeer) connectionCount() int {
68×
241
        peer.RLock()
68×
242
        defer peer.RUnlock()
68×
243
        return len(peer.connections)
68×
244
}
68×
245

246
func (peer *LocalPeer) setShortID(shortID PeerShortID) {
4,113×
247
        peer.Lock()
4,113×
248
        defer peer.Unlock()
4,113×
249
        peer.ShortID = shortID
4,113×
250
        peer.Version++
4,113×
251
}
4,113×
252

253
func (peer *LocalPeer) setVersionBeyond(version uint64) bool {
!
254
        peer.Lock()
!
255
        defer peer.Unlock()
!
256
        if version >= peer.Version {
!
257
                peer.Version = version + 1
!
258
                return true
!
259
        }
!
260
        return false
!
261
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc