• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

docker / libnetwork / #3369

pending completion
#3369

push

web-flow
Merge pull request #1727 from sanimej/cphard

11527 of 26691 relevant lines covered (43.19%)

33227.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.39
/networkdb/cluster.go
1
package networkdb
2

3
import (
4
        "bytes"
5
        "crypto/rand"
6
        "encoding/hex"
7
        "fmt"
8
        "log"
9
        "math/big"
10
        rnd "math/rand"
11
        "net"
12
        "strings"
13
        "time"
14

15
        "github.com/Sirupsen/logrus"
16
        "github.com/hashicorp/memberlist"
17
)
18

19
const (
20
        reapInterval     = 30 * time.Minute
21
        reapPeriod       = 5 * time.Second
22
        retryInterval    = 1 * time.Second
23
        nodeReapInterval = 24 * time.Hour
24
        nodeReapPeriod   = 2 * time.Hour
25
)
26

27
type logWriter struct{}
28

29
func (l *logWriter) Write(p []byte) (int, error) {
85✔
30
        str := string(p)
85✔
31
        str = strings.TrimSuffix(str, "\n")
85✔
32

85✔
33
        switch {
85✔
34
        case strings.HasPrefix(str, "[WARN] "):
22✔
35
                str = strings.TrimPrefix(str, "[WARN] ")
22✔
36
                logrus.Warn(str)
22✔
37
        case strings.HasPrefix(str, "[DEBUG] "):
59✔
38
                str = strings.TrimPrefix(str, "[DEBUG] ")
59✔
39
                logrus.Debug(str)
59✔
40
        case strings.HasPrefix(str, "[INFO] "):
2✔
41
                str = strings.TrimPrefix(str, "[INFO] ")
2✔
42
                logrus.Info(str)
2✔
43
        case strings.HasPrefix(str, "[ERR] "):
2✔
44
                str = strings.TrimPrefix(str, "[ERR] ")
2✔
45
                logrus.Warn(str)
2✔
46
        }
47

48
        return len(p), nil
85✔
49
}
50

51
// SetKey adds a new key to the key ring
52
func (nDB *NetworkDB) SetKey(key []byte) {
×
53
        logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5])
×
54
        nDB.Lock()
×
55
        defer nDB.Unlock()
×
56
        for _, dbKey := range nDB.config.Keys {
×
57
                if bytes.Equal(key, dbKey) {
×
58
                        return
×
59
                }
×
60
        }
61
        nDB.config.Keys = append(nDB.config.Keys, key)
×
62
        if nDB.keyring != nil {
×
63
                nDB.keyring.AddKey(key)
×
64
        }
×
65
}
66

67
// SetPrimaryKey sets the given key as the primary key. This should have
68
// been added apriori through SetKey
69
func (nDB *NetworkDB) SetPrimaryKey(key []byte) {
×
70
        logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5])
×
71
        nDB.RLock()
×
72
        defer nDB.RUnlock()
×
73
        for _, dbKey := range nDB.config.Keys {
×
74
                if bytes.Equal(key, dbKey) {
×
75
                        if nDB.keyring != nil {
×
76
                                nDB.keyring.UseKey(dbKey)
×
77
                        }
×
78
                        break
×
79
                }
80
        }
81
}
82

83
// RemoveKey removes a key from the key ring. The key being removed
84
// can't be the primary key
85
func (nDB *NetworkDB) RemoveKey(key []byte) {
×
86
        logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5])
×
87
        nDB.Lock()
×
88
        defer nDB.Unlock()
×
89
        for i, dbKey := range nDB.config.Keys {
×
90
                if bytes.Equal(key, dbKey) {
×
91
                        nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...)
×
92
                        if nDB.keyring != nil {
×
93
                                nDB.keyring.RemoveKey(dbKey)
×
94
                        }
×
95
                        break
×
96
                }
97
        }
98
}
99

100
func (nDB *NetworkDB) clusterInit() error {
22✔
101
        config := memberlist.DefaultLANConfig()
22✔
102
        config.Name = nDB.config.NodeName
22✔
103
        config.BindAddr = nDB.config.BindAddr
22✔
104
        config.AdvertiseAddr = nDB.config.AdvertiseAddr
22✔
105

22✔
106
        if nDB.config.BindPort != 0 {
44✔
107
                config.BindPort = nDB.config.BindPort
22✔
108
        }
22✔
109

110
        config.ProtocolVersion = memberlist.ProtocolVersionMax
22✔
111
        config.Delegate = &delegate{nDB: nDB}
22✔
112
        config.Events = &eventDelegate{nDB: nDB}
22✔
113
        // custom logger that does not add time or date, so they are not
22✔
114
        // duplicated by logrus
22✔
115
        config.Logger = log.New(&logWriter{}, "", 0)
22✔
116

22✔
117
        var err error
22✔
118
        if len(nDB.config.Keys) > 0 {
22✔
119
                for i, key := range nDB.config.Keys {
×
120
                        logrus.Debugf("Encryption key %d: %s", i+1, hex.EncodeToString(key)[0:5])
×
121
                }
×
122
                nDB.keyring, err = memberlist.NewKeyring(nDB.config.Keys, nDB.config.Keys[0])
×
123
                if err != nil {
×
124
                        return err
×
125
                }
×
126
                config.Keyring = nDB.keyring
×
127
        }
128

129
        nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
22✔
130
                NumNodes: func() int {
151✔
131
                        nDB.RLock()
129✔
132
                        num := len(nDB.nodes)
129✔
133
                        nDB.RUnlock()
129✔
134
                        return num
129✔
135
                },
129✔
136
                RetransmitMult: config.RetransmitMult,
137
        }
138

139
        nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
22✔
140
                NumNodes: func() int {
329✔
141
                        nDB.RLock()
307✔
142
                        num := len(nDB.nodes)
307✔
143
                        nDB.RUnlock()
307✔
144
                        return num
307✔
145
                },
307✔
146
                RetransmitMult: config.RetransmitMult,
147
        }
148

149
        mlist, err := memberlist.Create(config)
22✔
150
        if err != nil {
22✔
151
                return fmt.Errorf("failed to create memberlist: %v", err)
×
152
        }
×
153

154
        nDB.stopCh = make(chan struct{})
22✔
155
        nDB.memberlist = mlist
22✔
156

22✔
157
        for _, trigger := range []struct {
22✔
158
                interval time.Duration
22✔
159
                fn       func()
22✔
160
        }{
22✔
161
                {reapPeriod, nDB.reapState},
22✔
162
                {config.GossipInterval, nDB.gossip},
22✔
163
                {config.PushPullInterval, nDB.bulkSyncTables},
22✔
164
                {retryInterval, nDB.reconnectNode},
22✔
165
                {nodeReapPeriod, nDB.reapDeadNode},
22✔
166
        } {
132✔
167
                t := time.NewTicker(trigger.interval)
110✔
168
                go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
110✔
169
                nDB.tickers = append(nDB.tickers, t)
110✔
170
        }
110✔
171

172
        return nil
22✔
173
}
174

175
func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
×
176
        t := time.NewTicker(retryInterval)
×
177
        defer t.Stop()
×
178

×
179
        for {
×
180
                select {
×
181
                case <-t.C:
×
182
                        if _, err := nDB.memberlist.Join(members); err != nil {
×
183
                                logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err)
×
184
                                continue
×
185
                        }
186
                        if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
×
187
                                logrus.Errorf("failed to send node join on retry: %v", err)
×
188
                                continue
×
189
                        }
190
                        return
×
191
                case <-stop:
×
192
                        return
×
193
                }
194
        }
195

196
}
197

198
func (nDB *NetworkDB) clusterJoin(members []string) error {
13✔
199
        mlist := nDB.memberlist
13✔
200

13✔
201
        if _, err := mlist.Join(members); err != nil {
13✔
202
                // Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
×
203
                go nDB.retryJoin(members, nDB.stopCh)
×
204

×
205
                return fmt.Errorf("could not join node to memberlist: %v", err)
×
206
        }
×
207

208
        if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
13✔
209
                return fmt.Errorf("failed to send node join: %v", err)
×
210
        }
×
211

212
        return nil
13✔
213
}
214

215
func (nDB *NetworkDB) clusterLeave() error {
22✔
216
        mlist := nDB.memberlist
22✔
217

22✔
218
        if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil {
22✔
219
                logrus.Errorf("failed to send node leave: %v", err)
×
220
        }
×
221

222
        if err := mlist.Leave(time.Second); err != nil {
22✔
223
                return err
×
224
        }
×
225

226
        close(nDB.stopCh)
22✔
227

22✔
228
        for _, t := range nDB.tickers {
132✔
229
                t.Stop()
110✔
230
        }
110✔
231

232
        return mlist.Shutdown()
22✔
233
}
234

235
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
110✔
236
        // Use a random stagger to avoid syncronizing
110✔
237
        randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
110✔
238
        select {
110✔
239
        case <-time.After(randStagger):
61✔
240
        case <-stop:
49✔
241
                return
49✔
242
        }
243
        for {
506✔
244
                select {
445✔
245
                case <-C:
384✔
246
                        f()
384✔
247
                case <-stop:
61✔
248
                        return
61✔
249
                }
250
        }
251
}
252

253
func (nDB *NetworkDB) reapDeadNode() {
×
254
        nDB.Lock()
×
255
        defer nDB.Unlock()
×
256
        for id, n := range nDB.failedNodes {
×
257
                if n.reapTime > 0 {
×
258
                        n.reapTime -= nodeReapPeriod
×
259
                        continue
×
260
                }
261
                logrus.Debugf("Removing failed node %v from gossip cluster", n.Name)
×
262
                delete(nDB.failedNodes, id)
×
263
        }
264
}
265

266
func (nDB *NetworkDB) reconnectNode() {
55✔
267
        nDB.RLock()
55✔
268
        if len(nDB.failedNodes) == 0 {
110✔
269
                nDB.RUnlock()
55✔
270
                return
55✔
271
        }
55✔
272

273
        nodes := make([]*node, 0, len(nDB.failedNodes))
×
274
        for _, n := range nDB.failedNodes {
×
275
                nodes = append(nodes, n)
×
276
        }
×
277
        nDB.RUnlock()
×
278

×
279
        node := nodes[randomOffset(len(nodes))]
×
280
        addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
×
281

×
282
        if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
×
283
                return
×
284
        }
×
285

286
        if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
×
287
                logrus.Errorf("failed to send node join during reconnect: %v", err)
×
288
                return
×
289
        }
×
290

291
        // Update all the local table state to a new time to
292
        // force update on the node we are trying to rejoin, just in
293
        // case that node has these in deleting state still. This is
294
        // facilitate fast convergence after recovering from a gossip
295
        // failure.
296
        nDB.updateLocalTableTime()
×
297

×
298
        logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
×
299
        nDB.bulkSync([]string{node.Name}, true)
×
300
}
301

302
// For timing the entry deletion in the repaer APIs that doesn't use monotonic clock
303
// source (time.Now, Sub etc.) should be avoided. Hence we use reapTime in every
304
// entry which is set initially to reapInterval and decremented by reapPeriod every time
305
// the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
306
// is safe as long as no other concurrent path touches the reapTime field.
307
func (nDB *NetworkDB) reapState() {
×
308
        nDB.reapNetworks()
×
309
        nDB.reapTableEntries()
×
310
}
×
311

312
func (nDB *NetworkDB) reapNetworks() {
×
313
        nDB.Lock()
×
314
        for name, nn := range nDB.networks {
×
315
                for id, n := range nn {
×
316
                        if n.leaving {
×
317
                                if n.reapTime <= 0 {
×
318
                                        delete(nn, id)
×
319
                                        nDB.deleteNetworkNode(id, name)
×
320
                                        continue
×
321
                                }
322
                                n.reapTime -= reapPeriod
×
323
                        }
324
                }
325
        }
326
        nDB.Unlock()
×
327
}
328

329
func (nDB *NetworkDB) reapTableEntries() {
×
330
        var paths []string
×
331

×
332
        nDB.RLock()
×
333
        nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
×
334
                entry, ok := v.(*entry)
×
335
                if !ok {
×
336
                        return false
×
337
                }
×
338

339
                if !entry.deleting {
×
340
                        return false
×
341
                }
×
342
                if entry.reapTime > 0 {
×
343
                        entry.reapTime -= reapPeriod
×
344
                        return false
×
345
                }
×
346
                paths = append(paths, path)
×
347
                return false
×
348
        })
349
        nDB.RUnlock()
×
350

×
351
        nDB.Lock()
×
352
        for _, path := range paths {
×
353
                params := strings.Split(path[1:], "/")
×
354
                tname := params[0]
×
355
                nid := params[1]
×
356
                key := params[2]
×
357

×
358
                if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
×
359
                        logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
×
360
                }
×
361

362
                if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
×
363
                        logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
×
364
                }
×
365
        }
366
        nDB.Unlock()
×
367
}
368

369
func (nDB *NetworkDB) gossip() {
329✔
370
        networkNodes := make(map[string][]string)
329✔
371
        nDB.RLock()
329✔
372
        thisNodeNetworks := nDB.networks[nDB.config.NodeName]
329✔
373
        for nid := range thisNodeNetworks {
617✔
374
                networkNodes[nid] = nDB.networkNodes[nid]
288✔
375

288✔
376
        }
288✔
377
        nDB.RUnlock()
329✔
378

329✔
379
        for nid, nodes := range networkNodes {
617✔
380
                mNodes := nDB.mRandomNodes(3, nodes)
288✔
381
                bytesAvail := udpSendBuf - compoundHeaderOverhead
288✔
382

288✔
383
                nDB.RLock()
288✔
384
                network, ok := thisNodeNetworks[nid]
288✔
385
                nDB.RUnlock()
288✔
386
                if !ok || network == nil {
288✔
387
                        // It is normal for the network to be removed
×
388
                        // between the time we collect the network
×
389
                        // attachments of this node and processing
×
390
                        // them here.
×
391
                        continue
×
392
                }
393

394
                broadcastQ := network.tableBroadcasts
288✔
395

288✔
396
                if broadcastQ == nil {
288✔
397
                        logrus.Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
×
398
                        continue
×
399
                }
400

401
                msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
288✔
402
                if len(msgs) == 0 {
496✔
403
                        continue
208✔
404
                }
405

406
                // Create a compound message
407
                compound := makeCompoundMessage(msgs)
80✔
408

80✔
409
                for _, node := range mNodes {
195✔
410
                        nDB.RLock()
115✔
411
                        mnode := nDB.nodes[node]
115✔
412
                        nDB.RUnlock()
115✔
413

115✔
414
                        if mnode == nil {
134✔
415
                                break
19✔
416
                        }
417

418
                        // Send the compound message
419
                        if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
96✔
420
                                logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
×
421
                        }
×
422
                }
423
        }
424
}
425

426
func (nDB *NetworkDB) bulkSyncTables() {
×
427
        var networks []string
×
428
        nDB.RLock()
×
429
        for nid, network := range nDB.networks[nDB.config.NodeName] {
×
430
                if network.leaving {
×
431
                        continue
×
432
                }
433
                networks = append(networks, nid)
×
434
        }
435
        nDB.RUnlock()
×
436

×
437
        for {
×
438
                if len(networks) == 0 {
×
439
                        break
×
440
                }
441

442
                nid := networks[0]
×
443
                networks = networks[1:]
×
444

×
445
                nDB.RLock()
×
446
                nodes := nDB.networkNodes[nid]
×
447
                nDB.RUnlock()
×
448

×
449
                // No peer nodes on this network. Move on.
×
450
                if len(nodes) == 0 {
×
451
                        continue
×
452
                }
453

454
                completed, err := nDB.bulkSync(nodes, false)
×
455
                if err != nil {
×
456
                        logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
×
457
                        continue
×
458
                }
459

460
                // Remove all the networks for which we have
461
                // successfully completed bulk sync in this iteration.
462
                updatedNetworks := make([]string, 0, len(networks))
×
463
                for _, nid := range networks {
×
464
                        var found bool
×
465
                        for _, completedNid := range completed {
×
466
                                if nid == completedNid {
×
467
                                        found = true
×
468
                                        break
×
469
                                }
470
                        }
471

472
                        if !found {
×
473
                                updatedNetworks = append(updatedNetworks, nid)
×
474
                        }
×
475
                }
476

477
                networks = updatedNetworks
×
478
        }
479
}
480

481
func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
36✔
482
        if !all {
36✔
483
                // If not all, then just pick one.
×
484
                nodes = nDB.mRandomNodes(1, nodes)
×
485
        }
×
486

487
        if len(nodes) == 0 {
36✔
488
                return nil, nil
×
489
        }
×
490

491
        logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
36✔
492
        var err error
36✔
493
        var networks []string
36✔
494
        for _, node := range nodes {
75✔
495
                if node == nDB.config.NodeName {
75✔
496
                        continue
36✔
497
                }
498

499
                networks = nDB.findCommonNetworks(node)
3✔
500
                err = nDB.bulkSyncNode(networks, node, true)
3✔
501
                if err != nil {
3✔
502
                        err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
×
503
                }
×
504
        }
505

506
        if err != nil {
36✔
507
                return nil, err
×
508
        }
×
509

510
        return networks, nil
36✔
511
}
512

513
// Bulk sync all the table entries belonging to a set of networks to a
514
// single peer node. It can be unsolicited or can be in response to an
515
// unsolicited bulk sync
516
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
6✔
517
        var msgs [][]byte
6✔
518

6✔
519
        var unsolMsg string
6✔
520
        if unsolicited {
9✔
521
                unsolMsg = "unsolicited"
3✔
522
        }
3✔
523

524
        logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)
6✔
525

6✔
526
        nDB.RLock()
6✔
527
        mnode := nDB.nodes[node]
6✔
528
        if mnode == nil {
6✔
529
                nDB.RUnlock()
×
530
                return nil
×
531
        }
×
532

533
        for _, nid := range networks {
12✔
534
                nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
1,006✔
535
                        entry, ok := v.(*entry)
1,000✔
536
                        if !ok {
1,000✔
537
                                return false
×
538
                        }
×
539

540
                        eType := TableEventTypeCreate
1,000✔
541
                        if entry.deleting {
1,000✔
542
                                eType = TableEventTypeDelete
×
543
                        }
×
544

545
                        params := strings.Split(path[1:], "/")
1,000✔
546
                        tEvent := TableEvent{
1,000✔
547
                                Type:      eType,
1,000✔
548
                                LTime:     entry.ltime,
1,000✔
549
                                NodeName:  entry.node,
1,000✔
550
                                NetworkID: nid,
1,000✔
551
                                TableName: params[1],
1,000✔
552
                                Key:       params[2],
1,000✔
553
                                Value:     entry.value,
1,000✔
554
                        }
1,000✔
555

1,000✔
556
                        msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
1,000✔
557
                        if err != nil {
1,000✔
558
                                logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
×
559
                                return false
×
560
                        }
×
561

562
                        msgs = append(msgs, msg)
1,000✔
563
                        return false
1,000✔
564
                })
565
        }
566
        nDB.RUnlock()
6✔
567

6✔
568
        // Create a compound message
6✔
569
        compound := makeCompoundMessage(msgs)
6✔
570

6✔
571
        bsm := BulkSyncMessage{
6✔
572
                LTime:       nDB.tableClock.Time(),
6✔
573
                Unsolicited: unsolicited,
6✔
574
                NodeName:    nDB.config.NodeName,
6✔
575
                Networks:    networks,
6✔
576
                Payload:     compound,
6✔
577
        }
6✔
578

6✔
579
        buf, err := encodeMessage(MessageTypeBulkSync, &bsm)
6✔
580
        if err != nil {
6✔
581
                return fmt.Errorf("failed to encode bulk sync message: %v", err)
×
582
        }
×
583

584
        nDB.Lock()
6✔
585
        ch := make(chan struct{})
6✔
586
        nDB.bulkSyncAckTbl[node] = ch
6✔
587
        nDB.Unlock()
6✔
588

6✔
589
        err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
6✔
590
        if err != nil {
6✔
591
                nDB.Lock()
×
592
                delete(nDB.bulkSyncAckTbl, node)
×
593
                nDB.Unlock()
×
594

×
595
                return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
×
596
        }
×
597

598
        // Wait on a response only if it is unsolicited.
599
        if unsolicited {
9✔
600
                startTime := time.Now()
3✔
601
                t := time.NewTimer(30 * time.Second)
3✔
602
                select {
3✔
603
                case <-t.C:
×
604
                        logrus.Errorf("Bulk sync to node %s timed out", node)
×
605
                case <-ch:
3✔
606
                        logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
3✔
607
                }
608
                t.Stop()
3✔
609
        }
610

611
        return nil
6✔
612
}
613

614
// Returns a random offset between 0 and n
615
func randomOffset(n int) int {
1,210✔
616
        if n == 0 {
1,210✔
617
                return 0
×
618
        }
×
619

620
        val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
1,210✔
621
        if err != nil {
1,210✔
622
                logrus.Errorf("Failed to get a random offset: %v", err)
×
623
                return 0
×
624
        }
×
625

626
        return int(val.Int64())
1,210✔
627
}
628

629
// mRandomNodes is used to select up to m random nodes. It is possible
630
// that less than m nodes are returned.
631
func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
288✔
632
        n := len(nodes)
288✔
633
        mNodes := make([]string, 0, m)
288✔
634
OUTER:
288✔
635
        // Probe up to 3*n times, with large n this is not necessary
288✔
636
        // since k << n, but with small n we want search to be
288✔
637
        // exhaustive
288✔
638
        for i := 0; i < 3*n && len(mNodes) < m; i++ {
1,498✔
639
                // Get random node
1,210✔
640
                idx := randomOffset(n)
1,210✔
641
                node := nodes[idx]
1,210✔
642

1,210✔
643
                if node == nDB.config.NodeName {
2,001✔
644
                        continue
791✔
645
                }
646

647
                // Check if we have this node already
648
                for j := 0; j < len(mNodes); j++ {
793✔
649
                        if node == mNodes[j] {
612✔
650
                                continue OUTER
238✔
651
                        }
652
                }
653

654
                // Append the node
655
                mNodes = append(mNodes, node)
181✔
656
        }
657

658
        return mNodes
288✔
659
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc