Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #4371

9 Nov 2015 - 15:51 coverage increased (+0.1%) to 75.78%
#4371

Pull #1648

circleci

2f1e5f233f2b7283a9bf3277e75bf30a?size=18&default=identiconrade
refactor: move NetworkRouterStatus into its own file
Pull Request #1648: separate the control and data planes

134 of 198 new or added lines in 11 files covered. (67.68%)

56 existing lines in 6 files now uncovered.

6483 of 8555 relevant lines covered (75.78%)

192419.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.4
/router/connection.go
1
package router
2

3
import (
4
        "fmt"
5
        "net"
6
        "strconv"
7
        "sync"
8
        "time"
9
)
10

11
type Connection interface {
12
        Local() *Peer
13
        Remote() *Peer
14
        RemoteTCPAddr() string
15
        Outbound() bool
16
        Established() bool
17
        BreakTie(Connection) ConnectionTieBreak
18
        Shutdown(error)
19
        Log(args ...interface{})
20
}
21

22
type ConnectionTieBreak int
23

24
const (
25
        TieBreakWon ConnectionTieBreak = iota
26
        TieBreakLost
27
        TieBreakTied
28
)
29

30
var ErrConnectToSelf = fmt.Errorf("Cannot connect to ourself")
31

32
type RemoteConnection struct {
33
        local         *Peer
34
        remote        *Peer
35
        remoteTCPAddr string
36
        outbound      bool
37
        established   bool
38
}
39

40
type LocalConnection struct {
41
        sync.RWMutex
42
        RemoteConnection
43
        TCPConn      *net.TCPConn
44
        version      byte
45
        tcpSender    TCPSender
46
        SessionKey   *[32]byte
47
        heartbeatTCP *time.Ticker
48
        Router       *Router
49
        uid          uint64
50
        actionChan   chan<- ConnectionAction
51
        finished     <-chan struct{} // closed to signal that actorLoop has finished
52
        OverlayConn  OverlayConnection
53
}
54

55
type ConnectionAction func() error
56

57
func NewRemoteConnection(from, to *Peer, tcpAddr string, outbound bool, established bool) *RemoteConnection {
1,751×
58
        return &RemoteConnection{
1,751×
59
                local:         from,
1,751×
60
                remote:        to,
1,751×
61
                remoteTCPAddr: tcpAddr,
1,751×
62
                outbound:      outbound,
1,751×
63
                established:   established,
1,751×
64
        }
1,751×
65
}
1,751×
66

67
func (conn *RemoteConnection) Local() *Peer                           { return conn.local }
136×
68
func (conn *RemoteConnection) Remote() *Peer                          { return conn.remote }
32,387×
69
func (conn *RemoteConnection) RemoteTCPAddr() string                  { return conn.remoteTCPAddr }
2,135×
70
func (conn *RemoteConnection) Outbound() bool                         { return conn.outbound }
2,160×
71
func (conn *RemoteConnection) Established() bool                      { return conn.established }
3,226×
72
func (conn *RemoteConnection) BreakTie(Connection) ConnectionTieBreak { return TieBreakTied }
!
73
func (conn *RemoteConnection) Shutdown(error)                         {}
!
74

75
func (conn *RemoteConnection) Log(args ...interface{}) {
161×
76
        log.Println(append(append([]interface{}{}, fmt.Sprintf("->[%s|%s]:", conn.remoteTCPAddr, conn.remote)), args...)...)
161×
77
}
161×
78

79
func (conn *RemoteConnection) ErrorLog(args ...interface{}) {
26×
80
        log.Errorln(append(append([]interface{}{}, fmt.Sprintf("->[%s|%s]:", conn.remoteTCPAddr, conn.remote)), args...)...)
26×
81
}
26×
82

83
// Does not return anything. If the connection is successful, it will
84
// end up in the local peer's connections map.
85
func StartLocalConnection(connRemote *RemoteConnection, tcpConn *net.TCPConn, router *Router, acceptNewPeer bool) {
43×
86
        if connRemote.local != router.Ourself.Peer {
!
87
                log.Fatal("Attempt to create local connection from a peer which is not ourself")
!
88
        }
!
89
        // NB, we're taking a copy of connRemote here.
90
        actionChan := make(chan ConnectionAction, ChannelSize)
43×
91
        finished := make(chan struct{})
43×
92
        conn := &LocalConnection{
43×
93
                RemoteConnection: *connRemote,
43×
94
                Router:           router,
43×
95
                TCPConn:          tcpConn,
43×
96
                uid:              randUint64(),
43×
97
                actionChan:       actionChan,
43×
98
                finished:         finished}
43×
99
        go conn.run(actionChan, finished, acceptNewPeer)
43×
100
}
101

102
func (conn *LocalConnection) BreakTie(dupConn Connection) ConnectionTieBreak {
2×
103
        dupConnLocal := dupConn.(*LocalConnection)
2×
104
        // conn.uid is used as the tie breaker here, in the knowledge that
2×
105
        // both sides will make the same decision.
2×
106
        if conn.uid < dupConnLocal.uid {
2×
107
                return TieBreakWon
2×
UNCOV
108
        } else if dupConnLocal.uid < conn.uid {
!
UNCOV
109
                return TieBreakLost
!
110
        } else {
!
111
                return TieBreakTied
!
112
        }
!
113
}
114

115
func (conn *LocalConnection) Established() bool {
896×
116
        return conn.established
896×
117
}
896×
118

119
// Send directly, not via the Actor.  If it goes via the Actor we can
120
// get a deadlock where LocalConnection is blocked talking to
121
// LocalPeer and LocalPeer is blocked trying send a ProtocolMsg via
122
// LocalConnection, and the channels are full in both directions so
123
// nothing can proceed.
124
func (conn *LocalConnection) SendProtocolMsg(m ProtocolMsg) {
445×
125
        if err := conn.sendProtocolMsg(m); err != nil {
3×
126
                conn.Shutdown(err)
3×
127
        }
3×
128
}
129

130
// ACTOR methods
131

132
// NB: The conn.* fields are only written by the connection actor
133
// process, which is the caller of the ConnectionAction funs. Hence we
134
// do not need locks for reading, and only need write locks for fields
135
// read by other processes.
136

137
// Async
138
func (conn *LocalConnection) Shutdown(err error) {
23×
139
        // err should always be a real error, even if only io.EOF
23×
140
        if err == nil {
!
141
                panic("nil error")
!
142
        }
!
143

144
        // Run on its own goroutine in case the channel is backed up
145
        go func() { conn.sendAction(func() error { return err }) }()
18×
146
}
147

148
// Send an actor request to the actorLoop, but don't block if
149
// actorLoop has exited - see http://blog.golang.org/pipelines for
150
// pattern
151
func (conn *LocalConnection) sendAction(action ConnectionAction) {
23×
152
        select {
23×
153
        case conn.actionChan <- action:
21×
154
        case <-conn.finished:
2×
155
        }
156
}
157

158
// ACTOR server
159

160
func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished chan<- struct{}, acceptNewPeer bool) {
43×
161
        var err error // important to use this var and not create another one with 'err :='
43×
162
        defer func() { conn.shutdown(err) }()
26×
163
        defer close(finished)
43×
164

43×
165
        conn.TCPConn.SetLinger(0)
43×
166
        intro, err := ProtocolIntroParams{
43×
167
                MinVersion: conn.Router.ProtocolMinVersion,
43×
168
                MaxVersion: ProtocolMaxVersion,
43×
169
                Features:   conn.makeFeatures(),
43×
170
                Conn:       conn.TCPConn,
43×
171
                Password:   conn.Router.Password,
43×
172
                Outbound:   conn.outbound,
43×
173
        }.DoIntro()
43×
174
        if err != nil {
!
175
                return
!
176
        }
!
177

178
        conn.SessionKey = intro.SessionKey
43×
179
        conn.tcpSender = intro.Sender
43×
180
        conn.version = intro.Version
43×
181

43×
182
        remote, err := conn.parseFeatures(intro.Features)
43×
183
        if err != nil {
!
184
                return
!
185
        }
!
186

187
        if err = conn.registerRemote(remote, acceptNewPeer); err != nil {
8×
188
                return
8×
189
        }
8×
190

191
        conn.Log("connection ready; using protocol version", conn.version)
35×
192

35×
193
        params := OverlayConnectionParams{
35×
194
                RemotePeer:         conn.remote,
35×
195
                LocalAddr:          conn.TCPConn.LocalAddr().(*net.TCPAddr),
35×
196
                RemoteAddr:         conn.TCPConn.RemoteAddr().(*net.TCPAddr),
35×
197
                Outbound:           conn.outbound,
35×
198
                ConnUID:            conn.uid,
35×
199
                SessionKey:         conn.SessionKey,
35×
200
                SendControlMessage: conn.sendOverlayControlMessage,
35×
201
                Features:           intro.Features,
35×
202
        }
35×
NEW
203
        if conn.OverlayConn, err = conn.Router.Overlay.PrepareConnection(params); err != nil {
!
204
                return
!
205
        }
!
206

207
        // As soon as we do AddConnection, the new connection becomes
208
        // visible to the packet routing logic.  So AddConnection must
209
        // come after PrepareConnection
UNCOV
210
        if err = conn.Router.Ourself.AddConnection(conn); err != nil {
!
UNCOV
211
                return
!
UNCOV
212
        }
!
213
        conn.Router.ConnectionMaker.ConnectionCreated(conn)
35×
214

35×
215
        // OverlayConnection confirmation comes after AddConnection,
35×
216
        // because only after that completes do we know the connection is
35×
217
        // valid: in particular that it is not a duplicate connection to
35×
218
        // the same peer. Overlay communication on a duplicate connection
35×
219
        // can cause problems such as tripping up overlay crypto at the
35×
220
        // other end due to data being decoded by the other connection. It
35×
221
        // is also generally wasteful to engage in any interaction with
35×
222
        // the remote on a connection that turns out to be invalid.
35×
223
        conn.OverlayConn.Confirm()
35×
224

35×
225
        // receiveTCP must follow also AddConnection. In the absence
35×
226
        // of any indirect connectivity to the remote peer, the first
35×
227
        // we hear about it (and any peers reachable from it) is
35×
228
        // through topology gossip it sends us on the connection. We
35×
229
        // must ensure that the connection has been added to Ourself
35×
230
        // prior to processing any such gossip, otherwise we risk
35×
231
        // immediately gc'ing part of that newly received portion of
35×
232
        // the topology (though not the remote peer itself, since that
35×
233
        // will have a positive ref count), leaving behind dangling
35×
234
        // references to peers. Hence we must invoke AddConnection,
35×
235
        // which is *synchronous*, first.
35×
236
        conn.heartbeatTCP = time.NewTicker(TCPHeartbeat)
35×
237
        go conn.receiveTCP(intro.Receiver)
35×
238

35×
239
        // AddConnection must precede actorLoop. More precisely, it
35×
240
        // must precede shutdown, since that invokes DeleteConnection
35×
241
        // and is invoked on termination of this entire
35×
242
        // function. Essentially this boils down to a prohibition on
35×
243
        // running AddConnection in a separate goroutine, at least not
35×
244
        // without some synchronisation. Which in turn requires the
35×
245
        // launching of the receiveTCP goroutine to precede actorLoop.
35×
246
        err = conn.actorLoop(actionChan)
35×
247
}
248

249
func (conn *LocalConnection) makeFeatures() map[string]string {
43×
250
        features := map[string]string{
43×
251
                "PeerNameFlavour": PeerNameFlavour,
43×
252
                "Name":            conn.local.Name.String(),
43×
253
                "NickName":        conn.local.NickName,
43×
254
                "ShortID":         fmt.Sprint(conn.local.ShortID),
43×
255
                "UID":             fmt.Sprint(conn.local.UID),
43×
256
                "ConnID":          fmt.Sprint(conn.uid),
43×
257
        }
43×
258
        conn.Router.Overlay.AddFeaturesTo(features)
43×
259
        return features
43×
260
}
43×
261

262
type features map[string]string
263

264
func (features features) MustHave(keys []string) error {
43×
265
        for _, key := range keys {
215×
266
                if _, ok := features[key]; !ok {
!
267
                        return fmt.Errorf("Field %s is missing", key)
!
268
                }
!
269
        }
270
        return nil
43×
271
}
272

273
func (features features) Get(key string) string {
215×
274
        return features[key]
215×
275
}
215×
276

277
func (conn *LocalConnection) parseFeatures(features features) (*Peer, error) {
43×
278
        if err := features.MustHave([]string{"PeerNameFlavour", "Name", "NickName", "UID", "ConnID"}); err != nil {
!
279
                return nil, err
!
280
        }
!
281

282
        remotePeerNameFlavour := features.Get("PeerNameFlavour")
43×
283
        if remotePeerNameFlavour != PeerNameFlavour {
!
284
                return nil, fmt.Errorf("Peer name flavour mismatch (ours: '%s', theirs: '%s')", PeerNameFlavour, remotePeerNameFlavour)
!
285
        }
!
286

287
        name, err := PeerNameFromString(features.Get("Name"))
43×
288
        if err != nil {
!
289
                return nil, err
!
290
        }
!
291

292
        nickName := features.Get("NickName")
43×
293

43×
294
        var shortID uint64
43×
295
        var hasShortID bool
43×
296
        if shortIDStr, present := features["ShortID"]; present {
43×
297
                hasShortID = true
43×
298
                shortID, err = strconv.ParseUint(shortIDStr, 10,
43×
299
                        PeerShortIDBits)
43×
300
                if err != nil {
!
301
                        return nil, err
!
302
                }
!
303
        }
304

305
        uid, err := ParsePeerUID(features.Get("UID"))
43×
306
        if err != nil {
!
307
                return nil, err
!
308
        }
!
309

310
        remoteConnID, err := strconv.ParseUint(features.Get("ConnID"), 10, 64)
43×
311
        if err != nil {
!
312
                return nil, err
!
313
        }
!
314

315
        conn.uid ^= remoteConnID
43×
316
        peer := NewPeer(name, nickName, uid, 0, PeerShortID(shortID))
43×
317
        peer.HasShortID = hasShortID
43×
318
        return peer, nil
43×
319
}
320

321
func (conn *LocalConnection) registerRemote(remote *Peer, acceptNewPeer bool) error {
43×
322
        if acceptNewPeer {
41×
323
                conn.remote = conn.Router.Peers.FetchWithDefault(remote)
41×
324
        } else {
2×
325
                conn.remote = conn.Router.Peers.FetchAndAddRef(remote.Name)
2×
326
                if conn.remote == nil {
!
327
                        return fmt.Errorf("Found unknown remote name: %s at %s", remote.Name, conn.remoteTCPAddr)
!
328
                }
!
329
        }
330

UNCOV
331
        if conn.remote.UID != remote.UID {
!
UNCOV
332
                return fmt.Errorf("Connection appears to be with different version of a peer we already know of")
!
UNCOV
333
        }
!
334

335
        if conn.remote == conn.local {
8×
336
                return ErrConnectToSelf
8×
337
        }
8×
338

339
        return nil
35×
340
}
341

342
func (conn *LocalConnection) actorLoop(actionChan <-chan ConnectionAction) (err error) {
35×
343
        fwdErrorChan := conn.OverlayConn.ErrorChannel()
35×
344
        fwdEstablishedChan := conn.OverlayConn.EstablishedChannel()
35×
345

35×
346
        for err == nil {
82×
347
                select {
82×
348
                case action := <-actionChan:
18×
349
                        err = action()
18×
350

351
                case <-conn.heartbeatTCP.C:
14×
352
                        err = conn.sendSimpleProtocolMsg(ProtocolHeartbeat)
14×
353

354
                case <-fwdEstablishedChan:
33×
355
                        conn.established = true
33×
356
                        fwdEstablishedChan = nil
33×
357
                        conn.Router.Ourself.ConnectionEstablished(conn)
33×
358

UNCOV
359
                case err = <-fwdErrorChan:
!
360
                }
361
        }
362
        return
18×
363
}
364

365
func (conn *LocalConnection) shutdown(err error) {
26×
UNCOV
366
        if conn.remote == nil {
!
UNCOV
367
                log.Errorf("->[%s] connection shutting down due to error during handshake: %v", conn.remoteTCPAddr, err)
!
368
        } else {
26×
369
                conn.ErrorLog("connection shutting down due to error:", err)
26×
370
        }
26×
371

372
        if conn.TCPConn != nil {
26×
373
                checkWarn(conn.TCPConn.Close())
26×
374
        }
26×
375

376
        if conn.remote != nil {
26×
377
                conn.Router.Peers.Dereference(conn.remote)
26×
378
                conn.Router.Ourself.DeleteConnection(conn)
26×
379
        }
26×
380

381
        if conn.heartbeatTCP != nil {
18×
382
                conn.heartbeatTCP.Stop()
18×
383
        }
18×
384

385
        if conn.OverlayConn != nil {
18×
386
                conn.OverlayConn.Stop()
18×
387
        }
18×
388

389
        conn.Router.ConnectionMaker.ConnectionTerminated(conn, err)
26×
390
}
391

392
func (conn *LocalConnection) sendOverlayControlMessage(tag byte, msg []byte) error {
132×
393
        return conn.sendProtocolMsg(ProtocolMsg{ProtocolTag(tag), msg})
132×
394
}
132×
395

396
// Helpers
397

398
func (conn *LocalConnection) sendSimpleProtocolMsg(tag ProtocolTag) error {
14×
399
        return conn.sendProtocolMsg(ProtocolMsg{tag: tag})
14×
400
}
14×
401

402
func (conn *LocalConnection) sendProtocolMsg(m ProtocolMsg) error {
591×
403
        return conn.tcpSender.Send(append([]byte{byte(m.tag)}, m.msg...))
591×
404
}
591×
405

406
func (conn *LocalConnection) receiveTCP(receiver TCPReceiver) {
35×
407
        var err error
35×
408
        for {
617×
409
                conn.extendReadDeadline()
617×
410

617×
411
                var msg []byte
617×
412
                if msg, err = receiver.Receive(); err != nil {
18×
413
                        break
18×
414
                }
415
                if len(msg) < 1 {
!
416
                        conn.Log("ignoring blank msg")
!
UNCOV
417
                        continue
!
418
                }
UNCOV
419
                if err = conn.handleProtocolMsg(ProtocolTag(msg[0]), msg[1:]); err != nil {
!
UNCOV
420
                        break
!
421
                }
422
        }
423
        conn.Shutdown(err)
18×
424
}
425

426
func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte) error {
582×
427
        switch tag {
582×
428
        case ProtocolHeartbeat:
14×
429
        case ProtocolConnectionEstablished, ProtocolFragmentationReceived, ProtocolPMTUVerified, ProtocolOverlayControlMsg:
133×
430
                conn.OverlayConn.ControlMessage(byte(tag), payload)
133×
431
        case ProtocolGossipUnicast, ProtocolGossipBroadcast, ProtocolGossip:
435×
432
                return conn.Router.handleGossip(tag, payload)
435×
UNCOV
433
        default:
!
UNCOV
434
                conn.Log("ignoring unknown protocol tag:", tag)
!
435
        }
436
        return nil
147×
437
}
438

439
func (conn *LocalConnection) extendReadDeadline() {
617×
440
        conn.TCPConn.SetReadDeadline(time.Now().Add(TCPHeartbeat * 2))
617×
441
}
617×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc