• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

docker / libnetwork / #3394

pending completion
#3394

push

web-flow
Merge pull request #1786 from fcrisciani/netlink_leak

17 of 17 new or added lines in 1 file covered. (100.0%)

11729 of 27300 relevant lines covered (42.96%)

32488.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.19
/networkdb/delegate.go
1
package networkdb
2

3
import (
4
        "fmt"
5
        "net"
6
        "strings"
7

8
        "github.com/Sirupsen/logrus"
9
        "github.com/gogo/protobuf/proto"
10
)
11

12
type delegate struct {
13
        nDB *NetworkDB
14
}
15

16
func (d *delegate) NodeMeta(limit int) []byte {
22✔
17
        return []byte{}
22✔
18
}
22✔
19

20
func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
394✔
21
        nDB.Lock()
394✔
22
        defer nDB.Unlock()
394✔
23

394✔
24
        for _, nodes := range []map[string]*node{
394✔
25
                nDB.failedNodes,
394✔
26
                nDB.leftNodes,
394✔
27
                nDB.nodes,
394✔
28
        } {
1,445✔
29
                if n, ok := nodes[nEvent.NodeName]; ok {
1,445✔
30
                        if n.ltime >= nEvent.LTime {
670✔
31
                                return nil
276✔
32
                        }
276✔
33
                        return n
118✔
34
                }
35
        }
36
        return nil
×
37
}
38

39
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
118✔
40
        nDB.Lock()
118✔
41
        defer nDB.Unlock()
118✔
42

118✔
43
        for _, nodes := range []map[string]*node{
118✔
44
                nDB.failedNodes,
118✔
45
                nDB.leftNodes,
118✔
46
                nDB.nodes,
118✔
47
        } {
466✔
48
                if n, ok := nodes[nEvent.NodeName]; ok {
466✔
49
                        if n.ltime >= nEvent.LTime {
118✔
50
                                return nil
×
51
                        }
×
52

53
                        delete(nodes, n.Name)
118✔
54
                        return n
118✔
55
                }
56
        }
57

58
        return nil
×
59
}
60

61
func (nDB *NetworkDB) purgeSameNode(n *node) {
118✔
62
        nDB.Lock()
118✔
63
        defer nDB.Unlock()
118✔
64

118✔
65
        prefix := strings.Split(n.Name, "-")[0]
118✔
66
        for _, nodes := range []map[string]*node{
118✔
67
                nDB.failedNodes,
118✔
68
                nDB.leftNodes,
118✔
69
                nDB.nodes,
118✔
70
        } {
472✔
71
                var nodeNames []string
354✔
72
                for name, node := range nodes {
576✔
73
                        if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
222✔
74
                                nodeNames = append(nodeNames, name)
×
75
                        }
×
76
                }
77

78
                for _, name := range nodeNames {
354✔
79
                        delete(nodes, name)
×
80
                }
×
81
        }
82
}
83

84
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
394✔
85
        // Update our local clock if the received messages has newer
394✔
86
        // time.
394✔
87
        nDB.networkClock.Witness(nEvent.LTime)
394✔
88

394✔
89
        n := nDB.getNode(nEvent)
394✔
90
        if n == nil {
670✔
91
                return false
276✔
92
        }
276✔
93
        // If its a node leave event for a manager and this is the only manager we
94
        // know of we want the reconnect logic to kick in. In a single manager
95
        // cluster manager's gossip can't be bootstrapped unless some other node
96
        // connects to it.
97
        if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
142✔
98
                for _, ip := range nDB.bootStrapIP {
48✔
99
                        if ip.Equal(n.Addr) {
24✔
100
                                n.ltime = nEvent.LTime
×
101
                                return true
×
102
                        }
×
103
                }
104
        }
105

106
        n = nDB.checkAndGetNode(nEvent)
118✔
107

118✔
108
        nDB.purgeSameNode(n)
118✔
109
        n.ltime = nEvent.LTime
118✔
110

118✔
111
        switch nEvent.Type {
118✔
112
        case NodeEventTypeJoin:
85✔
113
                nDB.Lock()
85✔
114
                nDB.nodes[n.Name] = n
85✔
115
                nDB.Unlock()
85✔
116
                logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
85✔
117
                return true
85✔
118
        case NodeEventTypeLeave:
33✔
119
                nDB.Lock()
33✔
120
                nDB.leftNodes[n.Name] = n
33✔
121
                nDB.Unlock()
33✔
122
                logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
33✔
123
                return true
33✔
124
        }
125

126
        return false
×
127
}
128

129
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
461✔
130
        var flushEntries bool
461✔
131
        // Update our local clock if the received messages has newer
461✔
132
        // time.
461✔
133
        nDB.networkClock.Witness(nEvent.LTime)
461✔
134

461✔
135
        nDB.Lock()
461✔
136
        defer func() {
922✔
137
                nDB.Unlock()
461✔
138
                // When a node leaves a network on the last task removal cleanup the
461✔
139
                // local entries for this network & node combination. When the tasks
461✔
140
                // on a network are removed we could have missed the gossip updates.
461✔
141
                // Not doing this cleanup can leave stale entries because bulksyncs
461✔
142
                // from the node will no longer include this network state.
461✔
143
                //
461✔
144
                // deleteNodeNetworkEntries takes nDB lock.
461✔
145
                if flushEntries {
482✔
146
                        nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
21✔
147
                }
21✔
148
        }()
149

150
        if nEvent.NodeName == nDB.config.NodeName {
648✔
151
                return false
187✔
152
        }
187✔
153

154
        nodeNetworks, ok := nDB.networks[nEvent.NodeName]
274✔
155
        if !ok {
309✔
156
                // We haven't heard about this node at all.  Ignore the leave
35✔
157
                if nEvent.Type == NetworkEventTypeLeave {
35✔
158
                        return false
×
159
                }
×
160

161
                nodeNetworks = make(map[string]*network)
35✔
162
                nDB.networks[nEvent.NodeName] = nodeNetworks
35✔
163
        }
164

165
        if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
495✔
166
                // We have the latest state. Ignore the event
221✔
167
                // since it is stale.
221✔
168
                if n.ltime >= nEvent.LTime {
421✔
169
                        return false
200✔
170
                }
200✔
171

172
                n.ltime = nEvent.LTime
21✔
173
                n.leaving = nEvent.Type == NetworkEventTypeLeave
21✔
174
                if n.leaving {
42✔
175
                        n.reapTime = reapInterval
21✔
176
                        flushEntries = true
21✔
177
                }
21✔
178

179
                nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
21✔
180
                return true
21✔
181
        }
182

183
        if nEvent.Type == NetworkEventTypeLeave {
53✔
184
                return false
×
185
        }
×
186

187
        // This remote network join is being seen the first time.
188
        nodeNetworks[nEvent.NetworkID] = &network{
53✔
189
                id:    nEvent.NetworkID,
53✔
190
                ltime: nEvent.LTime,
53✔
191
        }
53✔
192

53✔
193
        nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
53✔
194
        return true
53✔
195
}
196

197
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
1,345✔
198
        // Update our local clock if the received messages has newer
1,345✔
199
        // time.
1,345✔
200
        nDB.tableClock.Witness(tEvent.LTime)
1,345✔
201

1,345✔
202
        // Ignore the table events for networks that are in the process of going away
1,345✔
203
        nDB.RLock()
1,345✔
204
        networks := nDB.networks[nDB.config.NodeName]
1,345✔
205
        network, ok := networks[tEvent.NetworkID]
1,345✔
206
        nDB.RUnlock()
1,345✔
207
        if !ok || network.leaving {
1,345✔
208
                return true
×
209
        }
×
210

211
        e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
1,345✔
212
        if err != nil && tEvent.Type == TableEventTypeDelete {
1,345✔
213
                // If it is a delete event and we don't have the entry here nothing to do.
×
214
                return false
×
215
        }
×
216

217
        if err == nil {
1,663✔
218
                // We have the latest state. Ignore the event
318✔
219
                // since it is stale.
318✔
220
                if e.ltime >= tEvent.LTime {
604✔
221
                        return false
286✔
222
                }
286✔
223
        }
224

225
        e = &entry{
1,059✔
226
                ltime:    tEvent.LTime,
1,059✔
227
                node:     tEvent.NodeName,
1,059✔
228
                value:    tEvent.Value,
1,059✔
229
                deleting: tEvent.Type == TableEventTypeDelete,
1,059✔
230
        }
1,059✔
231

1,059✔
232
        if e.deleting {
1,085✔
233
                e.reapTime = reapInterval
26✔
234
        }
26✔
235

236
        nDB.Lock()
1,059✔
237
        nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
1,059✔
238
        nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
1,059✔
239
        nDB.Unlock()
1,059✔
240

1,059✔
241
        var op opType
1,059✔
242
        switch tEvent.Type {
1,059✔
243
        case TableEventTypeCreate:
1,027✔
244
                op = opCreate
1,027✔
245
        case TableEventTypeUpdate:
6✔
246
                op = opUpdate
6✔
247
        case TableEventTypeDelete:
26✔
248
                op = opDelete
26✔
249
        }
250

251
        nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
1,059✔
252
        return true
1,059✔
253
}
254

255
func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
122✔
256
        // Decode the parts
122✔
257
        parts, err := decodeCompoundMessage(buf)
122✔
258
        if err != nil {
122✔
259
                logrus.Errorf("Failed to decode compound request: %v", err)
×
260
                return
×
261
        }
×
262

263
        // Handle each message
264
        for _, part := range parts {
1,573✔
265
                nDB.handleMessage(part, isBulkSync)
1,451✔
266
        }
1,451✔
267
}
268

269
func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
1,451✔
270
        var tEvent TableEvent
1,451✔
271
        if err := proto.Unmarshal(buf, &tEvent); err != nil {
1,451✔
272
                logrus.Errorf("Error decoding table event message: %v", err)
×
273
                return
×
274
        }
×
275

276
        // Ignore messages that this node generated.
277
        if tEvent.NodeName == nDB.config.NodeName {
1,557✔
278
                return
106✔
279
        }
106✔
280

281
        // Do not rebroadcast a bulk sync
282
        if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
1,404✔
283
                var err error
59✔
284
                buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
59✔
285
                if err != nil {
59✔
286
                        logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
×
287
                        return
×
288
                }
×
289

290
                nDB.RLock()
59✔
291
                n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
59✔
292
                nDB.RUnlock()
59✔
293

59✔
294
                if !ok {
59✔
295
                        return
×
296
                }
×
297

298
                broadcastQ := n.tableBroadcasts
59✔
299

59✔
300
                if broadcastQ == nil {
59✔
301
                        return
×
302
                }
×
303

304
                broadcastQ.QueueBroadcast(&tableEventMessage{
59✔
305
                        msg:   buf,
59✔
306
                        id:    tEvent.NetworkID,
59✔
307
                        tname: tEvent.TableName,
59✔
308
                        key:   tEvent.Key,
59✔
309
                        node:  nDB.config.NodeName,
59✔
310
                })
59✔
311
        }
312
}
313

314
func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
342✔
315
        var nEvent NodeEvent
342✔
316
        if err := proto.Unmarshal(buf, &nEvent); err != nil {
342✔
317
                logrus.Errorf("Error decoding node event message: %v", err)
×
318
                return
×
319
        }
×
320

321
        if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
408✔
322
                var err error
66✔
323
                buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
66✔
324
                if err != nil {
66✔
325
                        logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
×
326
                        return
×
327
                }
×
328

329
                nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
66✔
330
                        msg: buf,
66✔
331
                })
66✔
332
        }
333
}
334

335
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
461✔
336
        var nEvent NetworkEvent
461✔
337
        if err := proto.Unmarshal(buf, &nEvent); err != nil {
461✔
338
                logrus.Errorf("Error decoding network event message: %v", err)
×
339
                return
×
340
        }
×
341

342
        if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
535✔
343
                var err error
74✔
344
                buf, err = encodeRawMessage(MessageTypeNetworkEvent, buf)
74✔
345
                if err != nil {
74✔
346
                        logrus.Errorf("Error marshalling gossip message for network event rebroadcast: %v", err)
×
347
                        return
×
348
                }
×
349

350
                nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
74✔
351
                        msg:  buf,
74✔
352
                        id:   nEvent.NetworkID,
74✔
353
                        node: nEvent.NodeName,
74✔
354
                })
74✔
355
        }
356
}
357

358
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
6✔
359
        var bsm BulkSyncMessage
6✔
360
        if err := proto.Unmarshal(buf, &bsm); err != nil {
6✔
361
                logrus.Errorf("Error decoding bulk sync message: %v", err)
×
362
                return
×
363
        }
×
364

365
        if bsm.LTime > 0 {
7✔
366
                nDB.tableClock.Witness(bsm.LTime)
1✔
367
        }
1✔
368

369
        nDB.handleMessage(bsm.Payload, true)
6✔
370

6✔
371
        // Don't respond to a bulk sync which was not unsolicited
6✔
372
        if !bsm.Unsolicited {
9✔
373
                nDB.Lock()
3✔
374
                ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
3✔
375
                if ok {
6✔
376
                        close(ch)
3✔
377
                        delete(nDB.bulkSyncAckTbl, bsm.NodeName)
3✔
378
                }
3✔
379
                nDB.Unlock()
3✔
380

3✔
381
                return
3✔
382
        }
383

384
        var nodeAddr net.IP
3✔
385
        nDB.RLock()
3✔
386
        if node, ok := nDB.nodes[bsm.NodeName]; ok {
6✔
387
                nodeAddr = node.Addr
3✔
388
        }
3✔
389
        nDB.RUnlock()
3✔
390

3✔
391
        if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
3✔
392
                logrus.Errorf("Error in responding to bulk sync from node %s: %v", nodeAddr, err)
×
393
        }
×
394
}
395

396
func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
2,382✔
397
        mType, data, err := decodeMessage(buf)
2,382✔
398
        if err != nil {
2,382✔
399
                logrus.Errorf("Error decoding gossip message to get message type: %v", err)
×
400
                return
×
401
        }
×
402

403
        switch mType {
2,382✔
404
        case MessageTypeNodeEvent:
342✔
405
                nDB.handleNodeMessage(data)
342✔
406
        case MessageTypeNetworkEvent:
461✔
407
                nDB.handleNetworkMessage(data)
461✔
408
        case MessageTypeTableEvent:
1,451✔
409
                nDB.handleTableMessage(data, isBulkSync)
1,451✔
410
        case MessageTypeBulkSync:
6✔
411
                nDB.handleBulkSync(data)
6✔
412
        case MessageTypeCompound:
122✔
413
                nDB.handleCompound(data, isBulkSync)
122✔
414
        default:
×
415
                logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
×
416
        }
417
}
418

419
func (d *delegate) NotifyMsg(buf []byte) {
925✔
420
        if len(buf) == 0 {
925✔
421
                return
×
422
        }
×
423

424
        d.nDB.handleMessage(buf, false)
925✔
425
}
426

427
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
555✔
428
        msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
555✔
429
        msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
555✔
430
        return msgs
555✔
431
}
555✔
432

433
func (d *delegate) LocalState(join bool) []byte {
52✔
434
        if join {
104✔
435
                // Update all the local node/network state to a new time to
52✔
436
                // force update on the node we are trying to rejoin, just in
52✔
437
                // case that node has these in leaving state still. This is
52✔
438
                // facilitate fast convergence after recovering from a gossip
52✔
439
                // failure.
52✔
440
                d.nDB.updateLocalNetworkTime()
52✔
441
        }
52✔
442

443
        d.nDB.RLock()
52✔
444
        defer d.nDB.RUnlock()
52✔
445

52✔
446
        pp := NetworkPushPull{
52✔
447
                LTime:    d.nDB.networkClock.Time(),
52✔
448
                NodeName: d.nDB.config.NodeName,
52✔
449
        }
52✔
450

52✔
451
        for name, nn := range d.nDB.networks {
52✔
452
                for _, n := range nn {
×
453
                        pp.Networks = append(pp.Networks, &NetworkEntry{
×
454
                                LTime:     n.ltime,
×
455
                                NetworkID: n.id,
×
456
                                NodeName:  name,
×
457
                                Leaving:   n.leaving,
×
458
                        })
×
459
                }
×
460
        }
461

462
        buf, err := encodeMessage(MessageTypePushPull, &pp)
52✔
463
        if err != nil {
52✔
464
                logrus.Errorf("Failed to encode local network state: %v", err)
×
465
                return nil
×
466
        }
×
467

468
        return buf
52✔
469
}
470

471
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
52✔
472
        if len(buf) == 0 {
52✔
473
                logrus.Error("zero byte remote network state received")
×
474
                return
×
475
        }
×
476

477
        var gMsg GossipMessage
52✔
478
        err := proto.Unmarshal(buf, &gMsg)
52✔
479
        if err != nil {
52✔
480
                logrus.Errorf("Error unmarshalling push pull messsage: %v", err)
×
481
                return
×
482
        }
×
483

484
        if gMsg.Type != MessageTypePushPull {
52✔
485
                logrus.Errorf("Invalid message type %v received from remote", buf[0])
×
486
        }
×
487

488
        pp := NetworkPushPull{}
52✔
489
        if err := proto.Unmarshal(gMsg.Data, &pp); err != nil {
52✔
490
                logrus.Errorf("Failed to decode remote network state: %v", err)
×
491
                return
×
492
        }
×
493

494
        nodeEvent := &NodeEvent{
52✔
495
                LTime:    pp.LTime,
52✔
496
                NodeName: pp.NodeName,
52✔
497
                Type:     NodeEventTypeJoin,
52✔
498
        }
52✔
499
        d.nDB.handleNodeEvent(nodeEvent)
52✔
500

52✔
501
        for _, n := range pp.Networks {
52✔
502
                nEvent := &NetworkEvent{
×
503
                        LTime:     n.LTime,
×
504
                        NodeName:  n.NodeName,
×
505
                        NetworkID: n.NetworkID,
×
506
                        Type:      NetworkEventTypeJoin,
×
507
                }
×
508

×
509
                if n.Leaving {
×
510
                        nEvent.Type = NetworkEventTypeLeave
×
511
                }
×
512

513
                d.nDB.handleNetworkEvent(nEvent)
×
514
        }
515

516
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc