Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / #6773

15 Jun 2016 - 14:50 coverage decreased (-0.03%) to 73.692%
#6773

Pull #2366

circleci

94ccfc629c6862b5950de3512742bcae?size=18&default=identiconbboreham
Publish weavedb container from release script
Pull Request #2366: Publish weavedb container from release script

6493 of 8811 relevant lines covered (73.69%)

120432.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.33
/ipam/allocator.go
1
package ipam
2

3
import (
4
        "bytes"
5
        "encoding/gob"
6
        "fmt"
7
        "sort"
8
        "time"
9

10
        "github.com/weaveworks/mesh"
11

12
        "github.com/weaveworks/weave/common"
13
        "github.com/weaveworks/weave/db"
14
        "github.com/weaveworks/weave/ipam/paxos"
15
        "github.com/weaveworks/weave/ipam/ring"
16
        "github.com/weaveworks/weave/ipam/space"
17
        "github.com/weaveworks/weave/ipam/tracker"
18
        "github.com/weaveworks/weave/net/address"
19
)
20

21
// Kinds of message we can unicast to other peers
22
const (
23
        msgSpaceRequest = iota
24
        msgRingUpdate
25
        msgSpaceRequestDenied
26

27
        tickInterval         = time.Second * 5
28
        MinSubnetSize        = 4 // first and last addresses are excluded, so 2 would be too small
29
        containerDiedTimeout = time.Second * 30
30
)
31

32
// operation represents something which Allocator wants to do, but
33
// which may need to wait until some other message arrives.
34
type operation interface {
35
        // Try attempts this operations and returns false if needs to be tried again.
36
        Try(alloc *Allocator) bool
37

38
        Cancel()
39

40
        // Does this operation pertain to the given container id?
41
        // Used for tidying up pending operations when containers die.
42
        ForContainer(ident string) bool
43
}
44

45
// This type is persisted hence all fields exported
46
type ownedData struct {
47
        IsContainer bool
48
        Cidrs       []address.CIDR
49
}
50

51
// Allocator brings together Ring and space.Set, and does the
52
// necessary plumbing.  Runs as a single-threaded Actor, so no locks
53
// are used around data structures.
54
type Allocator struct {
55
        actionChan        chan<- func()
56
        stopChan          chan<- struct{}
57
        ourName           mesh.PeerName
58
        seed              []mesh.PeerName          // optional user supplied ring seed
59
        universe          address.CIDR             // superset of all ranges
60
        ring              *ring.Ring               // information on ranges owned by all peers
61
        space             space.Space              // more detail on ranges owned by us
62
        owned             map[string]ownedData     // who owns what addresses, indexed by container-ID
63
        nicknames         map[mesh.PeerName]string // so we can map nicknames for rmpeer
64
        pendingAllocates  []operation              // held until we get some free space
65
        pendingClaims     []operation              // held until we know who owns the space
66
        pendingPrimes     []operation              // held while our ring is empty
67
        dead              map[string]time.Time     // containers we heard were dead, and when
68
        db                db.DB                    // persistence
69
        gossip            mesh.Gossip              // our link to the outside world for sending messages
70
        paxos             paxos.Participant
71
        awaitingConsensus bool
72
        ticker            *time.Ticker
73
        shuttingDown      bool // to avoid doing any requests while trying to shut down
74
        isKnownPeer       func(mesh.PeerName) bool
75
        quorum            func() uint
76
        now               func() time.Time
77
}
78

79
type Config struct {
80
        OurName     mesh.PeerName
81
        OurUID      mesh.PeerUID
82
        OurNickname string
83
        Seed        []mesh.PeerName
84
        Universe    address.CIDR
85
        IsObserver  bool
86
        Quorum      func() uint
87
        Db          db.DB
88
        IsKnownPeer func(name mesh.PeerName) bool
89
        Tracker     tracker.LocalRangeTracker
90
}
91

92
// NewAllocator creates and initialises a new Allocator
93
func NewAllocator(config Config) *Allocator {
414×
94
        var participant paxos.Participant
414×
95
        var alloc *Allocator
414×
96
        var onUpdate ring.OnUpdate
414×
97

414×
98
        if config.IsObserver {
416×
99
                participant = paxos.NewObserver()
2×
100
        } else {
414×
101
                participant = paxos.NewNode(config.OurName, config.OurUID, 0)
412×
102
        }
412×
103

104
        if config.Tracker != nil {
414×
105
                onUpdate = func(prev []address.Range, curr []address.Range, local bool) {
!
106
                        if err := config.Tracker.HandleUpdate(prev, curr, local); err != nil {
!
107
                                alloc.errorf("HandleUpdate failed: %s", err)
!
108
                        }
!
109
                }
110
        }
111

112
        alloc = &Allocator{
414×
113
                ourName:     config.OurName,
414×
114
                seed:        config.Seed,
414×
115
                universe:    config.Universe,
414×
116
                ring:        ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName, onUpdate),
414×
117
                owned:       make(map[string]ownedData),
414×
118
                db:          config.Db,
414×
119
                paxos:       participant,
414×
120
                nicknames:   map[mesh.PeerName]string{config.OurName: config.OurNickname},
414×
121
                isKnownPeer: config.IsKnownPeer,
414×
122
                quorum:      config.Quorum,
414×
123
                dead:        make(map[string]time.Time),
414×
124
                now:         time.Now,
414×
125
        }
414×
126
        return alloc
414×
127
}
128

129
func ParseCIDRSubnet(cidrStr string) (cidr address.CIDR, err error) {
113×
130
        cidr, err = address.ParseCIDR(cidrStr)
113×
131
        if err != nil {
113×
132
                return
!
133
        }
!
134
        if !cidr.IsSubnet() {
113×
UNCOV
135
                err = fmt.Errorf("invalid subnet - bits after network prefix are not all zero: %s", cidrStr)
!
UNCOV
136
        }
!
137
        if cidr.Size() < MinSubnetSize {
113×
UNCOV
138
                err = fmt.Errorf("invalid subnet - smaller than minimum size %d: %s", MinSubnetSize, cidrStr)
!
UNCOV
139
        }
!
140
        return
113×
141
}
142

143
// Start runs the allocator goroutine
144
func (alloc *Allocator) Start() {
414×
145
        loadedPersistedData := alloc.loadPersistedData()
414×
146
        switch {
414×
147
        case loadedPersistedData && len(alloc.seed) != 0:
!
148
                alloc.infof("Found persisted IPAM data, ignoring supplied IPAM seed")
!
149
        case loadedPersistedData:
15×
150
                alloc.infof("Initialising with persisted data")
15×
151
        case len(alloc.seed) != 0:
1×
152
                alloc.infof("Initialising with supplied IPAM seed")
1×
153
                alloc.createRing(alloc.seed)
1×
154
        case alloc.paxos.IsElector():
396×
155
                alloc.infof("Initialising via deferred consensus")
396×
156
        default:
2×
157
                alloc.infof("Initialising as observer - awaiting IPAM data from another peer")
2×
158
        }
159
        actionChan := make(chan func(), mesh.ChannelSize)
414×
160
        stopChan := make(chan struct{})
414×
161
        alloc.actionChan = actionChan
414×
162
        alloc.stopChan = stopChan
414×
163
        alloc.ticker = time.NewTicker(tickInterval)
414×
164
        go alloc.actorLoop(actionChan, stopChan)
414×
165
}
166

167
// Stop makes the actor routine exit, for test purposes ONLY because any
168
// calls after this is processed will hang. Async.
169
func (alloc *Allocator) Stop() {
323×
170
        select {
323×
171
        case alloc.stopChan <- struct{}{}:
290×
172
        default:
33×
173
        }
174
}
175

176
// Operation life cycle
177

178
// Given an operation, try it, and add it to the pending queue if it didn't succeed
179
func (alloc *Allocator) doOperation(op operation, ops *[]operation) {
7,076×
180
        alloc.actionChan <- func() {
14,152×
181
                if alloc.shuttingDown {
7,077×
182
                        op.Cancel()
1×
183
                        return
1×
184
                }
1×
185
                if !op.Try(alloc) {
7,659×
186
                        *ops = append(*ops, op)
584×
187
                }
584×
188
        }
189
}
190

191
// Given an operation, remove it from the pending queue
192
//  Note the op may not be on the queue; it may have
193
//  already succeeded.  If it is on the queue, we call
194
//  cancel on it, allowing callers waiting for the resultChans
195
//  to unblock.
196
func (alloc *Allocator) cancelOp(op operation, ops *[]operation) {
!
197
        for i, op := range *ops {
!
198
                if op == op {
!
199
                        *ops = append((*ops)[:i], (*ops)[i+1:]...)
!
200
                        op.Cancel()
!
201
                        break
!
202
                }
203
        }
204
}
205

206
// Cancel all operations in a queue
207
func (alloc *Allocator) cancelOps(ops *[]operation) {
3×
208
        for _, op := range *ops {
3×
209
                op.Cancel()
!
210
        }
!
211
        *ops = []operation{}
3×
212
}
213

214
// Cancel all operations for a given container id, returns true
215
// if we found any.
216
func (alloc *Allocator) cancelOpsFor(ops *[]operation, ident string) bool {
1,540×
217
        var found bool
1,540×
218
        for i := 0; i < len(*ops); {
1,548×
219
                if op := (*ops)[i]; op.ForContainer(ident) {
10×
220
                        found = true
2×
221
                        op.Cancel()
2×
222
                        *ops = append((*ops)[:i], (*ops)[i+1:]...)
2×
223
                } else {
8×
224
                        i++
6×
225
                }
6×
226
        }
227
        return found
1,540×
228
}
229

230
// Try all operations in a queue
231
func (alloc *Allocator) tryOps(ops *[]operation) {
4,887×
232
        for i := 0; i < len(*ops); {
5,334×
233
                op := (*ops)[i]
447×
234
                if !op.Try(alloc) {
587×
235
                        i++
140×
236
                        continue
140×
237
                }
238
                *ops = append((*ops)[:i], (*ops)[i+1:]...)
307×
239
        }
240
}
241

242
// Try all pending operations
243
func (alloc *Allocator) tryPendingOps() {
1,629×
244
        // Unblock pending primes first
1,629×
245
        alloc.tryOps(&alloc.pendingPrimes)
1,629×
246
        // Process existing claims before servicing new allocations
1,629×
247
        alloc.tryOps(&alloc.pendingClaims)
1,629×
248
        alloc.tryOps(&alloc.pendingAllocates)
1,629×
249
}
1,629×
250

251
func (alloc *Allocator) spaceRequestDenied(sender mesh.PeerName, r address.Range) {
340×
252
        for i := 0; i < len(alloc.pendingClaims); {
644×
253
                claim := alloc.pendingClaims[i].(*claim)
304×
254
                if r.Contains(claim.cidr.Addr) {
577×
255
                        claim.deniedBy(alloc, sender)
273×
256
                        alloc.pendingClaims = append(alloc.pendingClaims[:i], alloc.pendingClaims[i+1:]...)
273×
257
                        continue
273×
258
                }
259
                i++
31×
260
        }
261
}
262

263
type errorCancelled struct {
264
        kind  string
265
        ident string
266
}
267

268
func (e *errorCancelled) Error() string {
2×
269
        return fmt.Sprintf("%s request for %s cancelled", e.kind, e.ident)
2×
270
}
2×
271

272
// Actor client API
273

274
// Prime (Sync) - wait for consensus
275
func (alloc *Allocator) Prime() {
4×
276
        resultChan := make(chan struct{})
4×
277
        op := &prime{resultChan: resultChan}
4×
278
        alloc.doOperation(op, &alloc.pendingPrimes)
4×
279
        <-resultChan
4×
280
}
4×
281

282
// Allocate (Sync) - get new IP address for container with given name in range
283
// if there isn't any space in that range we block indefinitely
284
func (alloc *Allocator) Allocate(ident string, r address.CIDR, isContainer bool, hasBeenCancelled func() bool) (address.Address, error) {
6,517×
285
        resultChan := make(chan allocateResult)
6,517×
286
        op := &allocate{
6,517×
287
                resultChan:       resultChan,
6,517×
288
                ident:            ident,
6,517×
289
                r:                r,
6,517×
290
                isContainer:      isContainer,
6,517×
291
                hasBeenCancelled: hasBeenCancelled,
6,517×
292
        }
6,517×
293
        alloc.doOperation(op, &alloc.pendingAllocates)
6,517×
294
        result := <-resultChan
6,517×
295
        return result.addr, result.err
6,517×
296
}
6,517×
297

298
// Lookup (Sync) - get existing IP addresses for container with given name in range
299
func (alloc *Allocator) Lookup(ident string, r address.Range) ([]address.CIDR, error) {
9×
300
        resultChan := make(chan []address.CIDR)
9×
301
        alloc.actionChan <- func() {
18×
302
                resultChan <- alloc.ownedInRange(ident, r)
9×
303
        }
9×
304
        return <-resultChan, nil
9×
305
}
306

307
// Claim an address that we think we should own (Sync)
308
func (alloc *Allocator) Claim(ident string, cidr address.CIDR, isContainer, noErrorOnUnknown bool, hasBeenCancelled func() bool) error {
555×
309
        resultChan := make(chan error)
555×
310
        op := &claim{
555×
311
                resultChan:       resultChan,
555×
312
                ident:            ident,
555×
313
                cidr:             cidr,
555×
314
                isContainer:      isContainer,
555×
315
                noErrorOnUnknown: noErrorOnUnknown,
555×
316
                hasBeenCancelled: hasBeenCancelled,
555×
317
        }
555×
318
        alloc.doOperation(op, &alloc.pendingClaims)
555×
319
        return <-resultChan
555×
320
}
555×
321

322
// ContainerDied called from the updater interface.  Async.
323
func (alloc *Allocator) ContainerDied(ident string) {
770×
324
        alloc.actionChan <- func() {
1,540×
325
                if alloc.hasOwned(ident) {
856×
326
                        alloc.debugln("Container", ident, "died; noting to remove later")
86×
327
                        alloc.dead[ident] = alloc.now()
86×
328
                }
86×
329
                // Also remove any pending ops
330
                alloc.cancelOpsFor(&alloc.pendingAllocates, ident)
770×
331
                alloc.cancelOpsFor(&alloc.pendingClaims, ident)
770×
332
        }
333
}
334

335
// ContainerDestroyed called from the updater interface.  Async.
336
func (alloc *Allocator) ContainerDestroyed(ident string) {
670×
337
        alloc.actionChan <- func() {
1,340×
338
                if alloc.hasOwned(ident) {
716×
339
                        alloc.debugln("Container", ident, "destroyed; removing addresses")
46×
340
                        alloc.delete(ident)
46×
341
                        delete(alloc.dead, ident)
46×
342
                }
46×
343
        }
344
}
345

346
func (alloc *Allocator) removeDeadContainers() {
351×
347
        cutoff := alloc.now().Add(-containerDiedTimeout)
351×
348
        for ident, timeOfDeath := range alloc.dead {
383×
349
                if timeOfDeath.Before(cutoff) {
33×
350
                        if err := alloc.delete(ident); err == nil {
2×
351
                                alloc.debugln("Removed addresses for container", ident)
1×
352
                        }
1×
353
                        delete(alloc.dead, ident)
1×
354
                }
355
        }
356
}
357

358
func (alloc *Allocator) ContainerStarted(ident string) {
928×
359
        alloc.actionChan <- func() {
1,856×
360
                delete(alloc.dead, ident) // delete is no-op if key not in map
928×
361
        }
928×
362
}
363

364
func (alloc *Allocator) PruneOwned(ids []string) {
90×
365
        idmap := make(map[string]struct{}, len(ids))
90×
366
        for _, id := range ids {
403×
367
                idmap[id] = struct{}{}
313×
368
        }
313×
369
        alloc.actionChan <- func() {
180×
370
                alloc.pruneOwned(idmap)
90×
371
        }
90×
372
}
373

374
// Delete (Sync) - release all IP addresses for container with given name
375
func (alloc *Allocator) Delete(ident string) error {
10×
376
        errChan := make(chan error)
10×
377
        alloc.actionChan <- func() {
20×
378
                errChan <- alloc.delete(ident)
10×
379
        }
10×
380
        return <-errChan
10×
381
}
382

383
func (alloc *Allocator) delete(ident string) error {
57×
384
        cidrs := alloc.removeAllOwned(ident)
57×
385
        if len(cidrs) == 0 {
57×
386
                return fmt.Errorf("Delete: no addresses for %s", ident)
!
387
        }
!
388
        for _, cidr := range cidrs {
117×
389
                alloc.space.Free(cidr.Addr)
60×
390
        }
60×
391
        return nil
57×
392
}
393

394
// Free (Sync) - release single IP address for container
395
func (alloc *Allocator) Free(ident string, addrToFree address.Address) error {
4,016×
396
        errChan := make(chan error)
4,016×
397
        alloc.actionChan <- func() {
8,032×
398
                if alloc.removeOwned(ident, addrToFree) {
8,032×
399
                        alloc.debugln("Freed", addrToFree, "for", ident)
4,016×
400
                        alloc.space.Free(addrToFree)
4,016×
401
                        errChan <- nil
4,016×
402
                        return
4,016×
403
                }
4,016×
404

405
                errChan <- fmt.Errorf("Free: address %s not found for %s", addrToFree, ident)
!
406
        }
407
        return <-errChan
4,016×
408
}
409

410
func (alloc *Allocator) pickPeerFromNicknames(isValid func(mesh.PeerName) bool) mesh.PeerName {
2×
411
        for name := range alloc.nicknames {
4×
412
                if name != alloc.ourName && isValid(name) {
2×
413
                        return name
!
414
                }
!
415
        }
416
        return mesh.UnknownPeerName
2×
417
}
418

419
func (alloc *Allocator) pickPeerForTransfer() mesh.PeerName {
1×
420
        // first try alive peers that actively participate in IPAM (i.e. have entries)
1×
421
        if heir := alloc.ring.PickPeerForTransfer(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
1×
422
                return heir
!
423
        }
!
424
        // next try alive peers that have IPAM enabled but have no entries
425
        if heir := alloc.pickPeerFromNicknames(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
1×
426
                return heir
!
427
        }
!
428
        // next try disappeared peers that still have entries
429
        t := func(mesh.PeerName) bool { return true }
1×
430
        if heir := alloc.ring.PickPeerForTransfer(t); heir != mesh.UnknownPeerName {
1×
431
                return heir
!
432
        }
!
433
        // finally, disappeared peers that passively participated in IPAM
434
        return alloc.pickPeerFromNicknames(t)
1×
435
}
436

437
// Shutdown (Sync)
438
func (alloc *Allocator) Shutdown() {
1×
439
        alloc.infof("Shutdown")
1×
440
        doneChan := make(chan struct{})
1×
441
        alloc.actionChan <- func() {
2×
442
                alloc.shuttingDown = true
1×
443
                alloc.cancelOps(&alloc.pendingClaims)
1×
444
                alloc.cancelOps(&alloc.pendingAllocates)
1×
445
                alloc.cancelOps(&alloc.pendingPrimes)
1×
446
                heir := alloc.pickPeerForTransfer()
1×
447
                alloc.ring.Transfer(alloc.ourName, heir)
1×
448
                alloc.space.Clear()
1×
449
                if heir != mesh.UnknownPeerName {
1×
450
                        alloc.persistRing()
!
451
                        alloc.gossip.GossipBroadcast(alloc.Gossip())
!
452
                }
!
453
                doneChan <- struct{}{}
1×
454
        }
455
        <-doneChan
1×
456
}
457

458
// AdminTakeoverRanges (Sync) - take over the ranges owned by a given
459
// peer, and return how much space was transferred in the process.
460
// Only done on administrator command.
461
func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) address.Count {
3×
462
        resultChan := make(chan address.Count)
3×
463
        alloc.actionChan <- func() {
6×
464
                peername, err := alloc.lookupPeername(peerNameOrNickname)
3×
465
                if err != nil {
3×
466
                        alloc.warnf("attempt to take over range from unknown peer '%s'", peerNameOrNickname)
!
467
                        resultChan <- address.Count(0)
!
468
                        return
!
469
                }
!
470

471
                alloc.debugln("AdminTakeoverRanges:", peername)
3×
472
                if peername == alloc.ourName {
3×
473
                        alloc.warnf("attempt to take over range from ourself")
!
474
                        resultChan <- address.Count(0)
!
475
                        return
!
476
                }
!
477

478
                newRanges := alloc.ring.Transfer(peername, alloc.ourName)
3×
479

3×
480
                if len(newRanges) == 0 {
3×
481
                        resultChan <- address.Count(0)
!
482
                        return
!
483
                }
!
484

485
                before := alloc.space.NumFreeAddresses()
3×
486
                alloc.ringUpdated()
3×
487
                after := alloc.space.NumFreeAddresses()
3×
488

3×
489
                alloc.gossip.GossipBroadcast(alloc.Gossip())
3×
490

3×
491
                resultChan <- after - before
3×
492
        }
493
        return <-resultChan
3×
494
}
495

496
// Lookup a PeerName by nickname or stringified PeerName.  We can't
497
// call into the router for this because we are interested in peers
498
// that have gone away but are still in the ring, which is why we
499
// maintain our own nicknames map.
500
func (alloc *Allocator) lookupPeername(name string) (mesh.PeerName, error) {
3×
501
        for peername, nickname := range alloc.nicknames {
11×
502
                if nickname == name {
8×
503
                        return peername, nil
!
504
                }
!
505
        }
506

507
        return mesh.PeerNameFromString(name)
3×
508
}
509

510
// Restrict the peers in "nicknames" to those in the ring plus peers known to the router
511
func (alloc *Allocator) pruneNicknames() {
1,047×
512
        ringPeers := alloc.ring.PeerNames()
1,047×
513
        for name := range alloc.nicknames {
5,345×
514
                if _, ok := ringPeers[name]; !ok && !alloc.isKnownPeer(name) {
4,300×
515
                        delete(alloc.nicknames, name)
2×
516
                }
2×
517
        }
518
}
519

520
func (alloc *Allocator) annotatePeernames(names []mesh.PeerName) []string {
2×
521
        var res []string
2×
522
        for _, name := range names {
4×
523
                if nickname, found := alloc.nicknames[name]; found {
4×
524
                        res = append(res, fmt.Sprint(name, "(", nickname, ")"))
2×
525
                } else {
2×
526
                        res = append(res, name.String())
!
527
                }
!
528
        }
529
        return res
2×
530
}
531

532
// PeerGone removes nicknames of peers which are no longer mentioned
533
// in the ring. Async.
534
//
535
// NB: the function is invoked by the gossip library routines and should be
536
//     registered manually.
537
func (alloc *Allocator) PeerGone(peerName mesh.PeerName) {
44×
538
        alloc.debugf("PeerGone: peer %s", peerName)
44×
539

44×
540
        alloc.actionChan <- func() {
88×
541
                ringPeers := alloc.ring.PeerNames()
44×
542
                if _, ok := ringPeers[peerName]; !ok {
61×
543
                        delete(alloc.nicknames, peerName)
17×
544
                }
17×
545
        }
546
}
547

548
func decodeRange(msg []byte) (r address.Range, err error) {
936×
549
        decoder := gob.NewDecoder(bytes.NewReader(msg))
936×
550
        return r, decoder.Decode(&r)
936×
551
}
936×
552

553
// OnGossipUnicast (Sync)
554
func (alloc *Allocator) OnGossipUnicast(sender mesh.PeerName, msg []byte) error {
1,611×
555
        alloc.debugln("OnGossipUnicast from", sender, ": ", len(msg), "bytes")
1,611×
556
        resultChan := make(chan error)
1,611×
557
        alloc.actionChan <- func() {
3,222×
558
                switch msg[0] {
1,611×
559
                case msgSpaceRequest:
596×
560
                        alloc.debugln("Peer", sender, "asked me for space")
596×
561
                        r, err := decodeRange(msg[1:])
596×
562
                        // If we don't have a ring, just ignore a request for space.
596×
563
                        // They'll probably ask again later.
596×
564
                        if err == nil && !alloc.ring.Empty() {
1,191×
565
                                alloc.donateSpace(r, sender)
595×
566
                        }
595×
567
                        resultChan <- err
596×
568
                case msgSpaceRequestDenied:
340×
569
                        r, err := decodeRange(msg[1:])
340×
570
                        if err == nil {
680×
571
                                alloc.spaceRequestDenied(sender, r)
340×
572
                        }
340×
573
                        resultChan <- err
340×
574
                case msgRingUpdate:
675×
575
                        resultChan <- alloc.update(sender, msg[1:])
675×
576
                }
577
        }
578
        return <-resultChan
1,611×
579
}
580

581
// OnGossipBroadcast (Sync)
582
func (alloc *Allocator) OnGossipBroadcast(sender mesh.PeerName, msg []byte) (mesh.GossipData, error) {
3,193×
583
        alloc.debugln("OnGossipBroadcast from", sender, ":", len(msg), "bytes")
3,193×
584
        resultChan := make(chan error)
3,193×
585
        alloc.actionChan <- func() {
6,386×
586
                resultChan <- alloc.update(sender, msg)
3,193×
587
        }
3,193×
588
        return alloc.Gossip(), <-resultChan
3,193×
589
}
590

591
type gossipState struct {
592
        // We send a timstamp along with the information to be
593
        // gossipped for backwards-compatibility (previously to detect skewed clocks)
594
        Now       int64
595
        Nicknames map[mesh.PeerName]string
596

597
        Paxos paxos.GossipState
598
        Ring  *ring.Ring
599
}
600

601
func (alloc *Allocator) encode() []byte {
4,116×
602
        data := gossipState{
4,116×
603
                Now:       alloc.now().Unix(),
4,116×
604
                Nicknames: alloc.nicknames,
4,116×
605
        }
4,116×
606

4,116×
607
        // We're only interested in Paxos until we have a Ring.
4,116×
608
        // Non-electing participants (e.g. observers) return
4,116×
609
        // a nil gossip state in order to provoke a unicast ring
4,116×
610
        // update from electing peers who have reached consensus.
4,116×
611
        if alloc.ring.Empty() {
6,232×
612
                data.Paxos = alloc.paxos.GossipState()
2,116×
613
        } else {
4,116×
614
                data.Ring = alloc.ring
2,000×
615
        }
2,000×
616
        buf := new(bytes.Buffer)
4,116×
617
        enc := gob.NewEncoder(buf)
4,116×
618
        if err := enc.Encode(data); err != nil {
4,116×
619
                panic(err)
!
620
        }
621
        return buf.Bytes()
4,116×
622
}
623

624
// Encode (Sync)
625
func (alloc *Allocator) Encode() []byte {
3,417×
626
        resultChan := make(chan []byte)
3,417×
627
        alloc.actionChan <- func() {
6,834×
628
                resultChan <- alloc.encode()
3,417×
629
        }
3,417×
630
        return <-resultChan
3,417×
631
}
632

633
// OnGossip (Sync)
634
func (alloc *Allocator) OnGossip(msg []byte) (mesh.GossipData, error) {
221×
635
        alloc.debugln("Allocator.OnGossip:", len(msg), "bytes")
221×
636
        resultChan := make(chan error)
221×
637
        alloc.actionChan <- func() {
442×
638
                resultChan <- alloc.update(mesh.UnknownPeerName, msg)
221×
639
        }
221×
640
        return nil, <-resultChan // for now, we never propagate updates. TBD
221×
641
}
642

643
// GossipData implementation is trivial - we always gossip the latest
644
// data we have at time of sending
645
type ipamGossipData struct {
646
        alloc *Allocator
647
}
648

649
func (d *ipamGossipData) Merge(other mesh.GossipData) mesh.GossipData {
1×
650
        return d // no-op
1×
651
}
1×
652

653
func (d *ipamGossipData) Encode() [][]byte {
3,409×
654
        return [][]byte{d.alloc.Encode()}
3,409×
655
}
3,409×
656

657
// Gossip returns a GossipData implementation, which in this case always
658
// returns the latest ring state (and does nothing on merge)
659
func (alloc *Allocator) Gossip() mesh.GossipData {
4,501×
660
        return &ipamGossipData{alloc}
4,501×
661
}
4,501×
662

663
// SetInterfaces gives the allocator two interfaces for talking to the outside world
664
func (alloc *Allocator) SetInterfaces(gossip mesh.Gossip) {
414×
665
        alloc.gossip = gossip
414×
666
}
414×
667

668
// ACTOR server
669

670
func (alloc *Allocator) actorLoop(actionChan <-chan func(), stopChan <-chan struct{}) {
414×
671
        defer alloc.ticker.Stop()
414×
672
        for {
23,551×
673
                select {
23,137×
674
                case action := <-actionChan:
22,373×
675
                        action()
22,373×
676
                case <-stopChan:
290×
677
                        return
290×
678
                case <-alloc.ticker.C:
350×
679
                        // Retry things in case messages got lost between here and recipients
350×
680
                        if alloc.awaitingConsensus {
357×
681
                                alloc.propose()
7×
682
                        } else {
350×
683
                                alloc.tryPendingOps()
343×
684
                        }
343×
685
                        alloc.removeDeadContainers()
350×
686
                }
687

688
                alloc.assertInvariants()
22,723×
689
                alloc.reportFreeSpace()
22,723×
690
        }
691
}
692

693
// Helper functions
694

695
// Ensure we are making progress towards an established ring
696
func (alloc *Allocator) establishRing() {
5,822×
697
        if !alloc.ring.Empty() || alloc.awaitingConsensus {
11,402×
698
                return
5,580×
699
        }
5,580×
700

701
        alloc.awaitingConsensus = true
242×
702
        alloc.paxos.SetQuorum(alloc.quorum())
242×
703
        alloc.propose()
242×
704
        if ok, cons := alloc.paxos.Consensus(); ok {
362×
705
                // If the quorum was 1, then proposing immediately
120×
706
                // leads to consensus
120×
707
                alloc.createRing(cons.Value)
120×
708
        }
120×
709
}
710

711
func (alloc *Allocator) createRing(peers []mesh.PeerName) {
234×
712
        alloc.debugln("Paxos consensus:", peers)
234×
713
        alloc.ring.ClaimForPeers(normalizeConsensus(peers))
234×
714
        alloc.ringUpdated()
234×
715
        alloc.gossip.GossipBroadcast(alloc.Gossip())
234×
716
}
234×
717

718
func (alloc *Allocator) ringUpdated() {
1,284×
719
        // When we have a ring, we don't need paxos any more
1,284×
720
        if alloc.awaitingConsensus {
1,524×
721
                alloc.awaitingConsensus = false
240×
722
                alloc.paxos = nil
240×
723
        }
240×
724

725
        alloc.persistRing()
1,284×
726
        alloc.space.UpdateRanges(alloc.ring.OwnedRanges())
1,284×
727
        alloc.tryPendingOps()
1,284×
728
}
729

730
// For compatibility with sort.Interface
731
type peerNames []mesh.PeerName
732

733
func (a peerNames) Len() int           { return len(a) }
240×
734
func (a peerNames) Less(i, j int) bool { return a[i] < a[j] }
241×
735
func (a peerNames) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
128×
736

737
// When we get a consensus from Paxos, the peer names are not in a
738
// defined order and may contain duplicates.  This function sorts them
739
// and de-dupes.
740
func normalizeConsensus(consensus []mesh.PeerName) []mesh.PeerName {
240×
741
        if len(consensus) == 0 {
240×
742
                return nil
!
743
        }
!
744

745
        peers := make(peerNames, len(consensus))
240×
746
        copy(peers, consensus)
240×
747
        sort.Sort(peers)
240×
748

240×
749
        dst := 0
240×
750
        for src := 1; src < len(peers); src++ {
472×
751
                if peers[dst] != peers[src] {
462×
752
                        dst++
230×
753
                        peers[dst] = peers[src]
230×
754
                }
230×
755
        }
756

757
        return peers[:dst+1]
240×
758
}
759

760
func (alloc *Allocator) propose() {
249×
761
        alloc.debugf("Paxos proposing")
249×
762
        alloc.paxos.Propose()
249×
763
        alloc.gossip.GossipBroadcast(alloc.Gossip())
249×
764
}
249×
765

766
func encodeRange(r address.Range) []byte {
937×
767
        buf := new(bytes.Buffer)
937×
768
        enc := gob.NewEncoder(buf)
937×
769
        if err := enc.Encode(r); err != nil {
937×
770
                panic(err)
!
771
        }
772
        return buf.Bytes()
937×
773
}
774

775
func (alloc *Allocator) sendSpaceRequest(dest mesh.PeerName, r address.Range) error {
597×
776
        msg := append([]byte{msgSpaceRequest}, encodeRange(r)...)
597×
777
        return alloc.gossip.GossipUnicast(dest, msg)
597×
778
}
597×
779

780
func (alloc *Allocator) sendSpaceRequestDenied(dest mesh.PeerName, r address.Range) error {
340×
781
        msg := append([]byte{msgSpaceRequestDenied}, encodeRange(r)...)
340×
782
        return alloc.gossip.GossipUnicast(dest, msg)
340×
783
}
340×
784

785
func (alloc *Allocator) sendRingUpdate(dest mesh.PeerName) {
699×
786
        msg := append([]byte{msgRingUpdate}, alloc.encode()...)
699×
787
        alloc.gossip.GossipUnicast(dest, msg)
699×
788
}
699×
789

790
func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
4,089×
791
        reader := bytes.NewReader(msg)
4,089×
792
        decoder := gob.NewDecoder(reader)
4,089×
793
        var data gossipState
4,089×
794

4,089×
795
        if err := decoder.Decode(&data); err != nil {
4,089×
796
                return err
!
797
        }
!
798

799
        // Merge nicknames
800
        for peer, nickname := range data.Nicknames {
16,296×
801
                alloc.nicknames[peer] = nickname
12,207×
802
        }
12,207×
803

804
        switch {
4,089×
805
        // If someone sent us a ring, merge it into ours. Note this will move us
806
        // out of the awaiting-consensus state if we didn't have a ring already.
807
        case data.Ring != nil:
1,978×
808
                updated, err := alloc.ring.Merge(*data.Ring)
1,978×
809
                switch err {
1,978×
810
                case nil:
1,977×
811
                        if updated {
3,024×
812
                                alloc.pruneNicknames()
1,047×
813
                                alloc.ringUpdated()
1,047×
814
                        }
1,047×
815
                case ring.ErrDifferentSeeds:
1×
816
                        return fmt.Errorf("IP allocation was seeded by different peers (received: %v, ours: %v)",
1×
817
                                alloc.annotatePeernames(data.Ring.Seeds), alloc.annotatePeernames(alloc.ring.Seeds))
1×
818
                case ring.ErrDifferentRange:
!
819
                        return fmt.Errorf("Incompatible IP allocation ranges (received: %s, ours: %s)",
!
820
                                data.Ring.Range().AsCIDRString(), alloc.ring.Range().AsCIDRString())
!
821
                default:
!
822
                        return err
!
823
                }
824

825
        // If we reach this point we know the sender is either an elector
826
        // broadcasting a paxos proposal to form a ring or a non-elector
827
        // broadcasting a ring request. If we have a ring already we can just send
828
        // it back regardless.
829
        case !alloc.ring.Empty():
108×
830
                if sender != mesh.UnknownPeerName {
212×
831
                        alloc.sendRingUpdate(sender)
104×
832
                }
104×
833

834
        // Otherwise, we need to react according to whether or not we received a
835
        // paxos proposal.
836
        case data.Paxos != nil:
1,998×
837
                // Process the proposal (this is a no-op if we're an observer)
1,998×
838
                if alloc.paxos.Update(data.Paxos) {
3,059×
839
                        if alloc.paxos.Think() {
1,595×
840
                                // If something important changed, broadcast
534×
841
                                alloc.gossip.GossipBroadcast(alloc.Gossip())
534×
842
                        }
534×
843

844
                        if ok, cons := alloc.paxos.Consensus(); ok {
1,174×
845
                                alloc.createRing(cons.Value)
113×
846
                        }
113×
847
                }
848

849
        // No paxos proposal present, so sender is a non-elector. We don't have a
850
        // ring to send, so attempt to establish one on their behalf. NB we only do
851
        // this:
852
        //
853
        // * On an explicit broadcast request triggered by a remote allocation attempt
854
        //   (if we did so on periodic gossip we would force consensus unnecessarily)
855
        // * If we are an elector (to avoid a broadcast storm of ring request messages)
856
        default:
5×
857
                if alloc.paxos.IsElector() && sender != mesh.UnknownPeerName {
7×
858
                        alloc.establishRing()
2×
859
                }
2×
860
        }
861

862
        return nil
4,088×
863
}
864

865
func (alloc *Allocator) donateSpace(r address.Range, to mesh.PeerName) {
595×
866
        // No matter what we do, we'll send a unicast gossip
595×
867
        // of our ring back to the chap who asked for space.
595×
868
        // This serves to both tell him of any space we might
595×
869
        // have given him, or tell him where he might find some
595×
870
        // more.
595×
871
        defer alloc.sendRingUpdate(to)
595×
872

595×
873
        chunk, ok := alloc.space.Donate(r)
595×
874
        if !ok {
935×
875
                free := alloc.space.NumFreeAddressesInRange(r)
340×
876
                common.Assert(free == 0)
340×
877
                alloc.debugln("No space to give to peer", to)
340×
878
                // separate message maintains backwards-compatibility:
340×
879
                // down-level peers will ignore this and still get the ring update.
340×
880
                alloc.sendSpaceRequestDenied(to, r)
340×
881
                return
340×
882
        }
340×
883
        alloc.debugln("Giving range", chunk, "to", to)
255×
884
        alloc.ring.GrantRangeToHost(chunk.Start, chunk.End, to)
255×
885
        alloc.persistRing()
255×
886
}
887

888
func (alloc *Allocator) assertInvariants() {
22,723×
889
        // We need to ensure all ranges the ring thinks we own have
22,723×
890
        // a corresponding space in the space set, and vice versa
22,723×
891
        checkSpace := space.New()
22,723×
892
        checkSpace.AddRanges(alloc.ring.OwnedRanges())
22,723×
893
        ranges := checkSpace.OwnedRanges()
22,723×
894
        spaces := alloc.space.OwnedRanges()
22,723×
895

22,723×
896
        common.Assert(len(ranges) == len(spaces))
22,723×
897

22,723×
898
        for i := 0; i < len(ranges); i++ {
248,761×
899
                r := ranges[i]
226,038×
900
                s := spaces[i]
226,038×
901
                common.Assert(s.Start == r.Start && s.End == r.End)
226,038×
902
        }
226,038×
903
}
904

905
func (alloc *Allocator) reportFreeSpace() {
22,723×
906
        ranges := alloc.ring.OwnedRanges()
22,723×
907
        if len(ranges) == 0 {
29,360×
908
                return
6,637×
909
        }
6,637×
910

911
        freespace := make(map[address.Address]address.Count)
16,086×
912
        for _, r := range ranges {
279,855×
913
                freespace[r.Start] = alloc.space.NumFreeAddressesInRange(r)
263,769×
914
        }
263,769×
915
        if alloc.ring.ReportFree(freespace) {
25,279×
916
                alloc.persistRing()
9,193×
917
        }
9,193×
918
}
919

920
// Persistent data
921
const (
922
        ringIdent  = "ring"
923
        nameIdent  = "peername"
924
        ownedIdent = "ownedAddresses"
925
)
926

927
func (alloc *Allocator) persistRing() {
11,131×
928
        // It would be better if these two Save operations happened in the same transaction
11,131×
929
        if err := alloc.db.Save(nameIdent, alloc.ourName); err != nil {
11,131×
930
                alloc.fatalf("Error persisting ring data: %s", err)
!
931
                return
!
932
        }
!
933
        if err := alloc.db.Save(ringIdent, alloc.ring); err != nil {
11,131×
934
                alloc.fatalf("Error persisting ring data: %s", err)
!
935
        }
!
936
}
937

938
// Returns true if persisted data is to be used, otherwise false
939
func (alloc *Allocator) loadPersistedData() bool {
414×
940
        var checkPeerName mesh.PeerName
414×
941
        nameFound, err := alloc.db.Load(nameIdent, &checkPeerName)
414×
942
        if err != nil {
414×
943
                alloc.fatalf("Error loading persisted peer name: %s", err)
!
944
        }
!
945
        var persistedRing *ring.Ring
414×
946
        ringFound, err := alloc.db.Load(ringIdent, &persistedRing)
414×
947
        if err != nil {
414×
948
                alloc.fatalf("Error loading persisted IPAM data: %s", err)
!
949
        }
!
950
        var persistedOwned map[string]ownedData
414×
951
        ownedFound, err := alloc.db.Load(ownedIdent, &persistedOwned)
414×
952
        if err != nil {
414×
953
                alloc.fatalf("Error loading persisted address data: %s", err)
!
954
        }
!
955

956
        overwritePersisted := func(fmt string, args ...interface{}) {
813×
957
                alloc.infof(fmt, args...)
399×
958
                alloc.persistRing()
399×
959
                alloc.persistOwned()
399×
960
        }
399×
961

962
        if !nameFound || !ringFound {
811×
963
                overwritePersisted("No valid persisted data")
397×
964
                return false
397×
965
        }
397×
966

967
        if checkPeerName != alloc.ourName {
18×
968
                overwritePersisted("Deleting persisted data for peername %s", checkPeerName)
1×
969
                return false
1×
970
        }
1×
971

972
        if persistedRing.Range() != alloc.universe.Range() {
17×
973
                overwritePersisted("Deleting persisted data for IPAM range %s; our range is %s", persistedRing.Range(), alloc.universe)
1×
974
                return false
1×
975
        }
1×
976

977
        alloc.ring.Restore(persistedRing)
15×
978
        alloc.space.UpdateRanges(alloc.ring.OwnedRanges())
15×
979

15×
980
        if ownedFound {
30×
981
                alloc.owned = persistedOwned
15×
982
                for _, d := range alloc.owned {
29×
983
                        for _, cidr := range d.Cidrs {
28×
984
                                alloc.space.Claim(cidr.Addr)
14×
985
                        }
14×
986
                }
987
        }
988
        return true
15×
989
}
990

991
func (alloc *Allocator) persistOwned() {
9,588×
992
        if err := alloc.db.Save(ownedIdent, alloc.owned); err != nil {
9,588×
993
                alloc.fatalf("Error persisting address data: %s", err)
!
994
        }
!
995
}
996

997
// Owned addresses
998

999
func (alloc *Allocator) hasOwned(ident string) bool {
1,440×
1000
        _, b := alloc.owned[ident]
1,440×
1001
        return b
1,440×
1002
}
1,440×
1003

1004
// NB: addr must not be owned by ident already
1005
func (alloc *Allocator) addOwned(ident string, cidr address.CIDR, isContainer bool) {
5,114×
1006
        d := alloc.owned[ident]
5,114×
1007
        d.IsContainer = isContainer
5,114×
1008
        d.Cidrs = append(d.Cidrs, cidr)
5,114×
1009
        alloc.owned[ident] = d
5,114×
1010
        alloc.persistOwned()
5,114×
1011
}
5,114×
1012

1013
func (alloc *Allocator) removeAllOwned(ident string) []address.CIDR {
57×
1014
        a := alloc.owned[ident]
57×
1015
        delete(alloc.owned, ident)
57×
1016
        alloc.persistOwned()
57×
1017
        return a.Cidrs
57×
1018
}
57×
1019

1020
func (alloc *Allocator) removeOwned(ident string, addrToFree address.Address) bool {
4,016×
1021
        d := alloc.owned[ident]
4,016×
1022
        for i, ownedCidr := range d.Cidrs {
8,034×
1023
                if ownedCidr.Addr == addrToFree {
8,034×
1024
                        if len(d.Cidrs) == 1 {
8,031×
1025
                                delete(alloc.owned, ident)
4,015×
1026
                        } else {
4,016×
1027
                                d.Cidrs = append(d.Cidrs[:i], d.Cidrs[i+1:]...)
1×
1028
                                alloc.owned[ident] = d
1×
1029
                        }
1×
1030
                        alloc.persistOwned()
4,016×
1031
                        return true
4,016×
1032
                }
1033
        }
1034
        return false
!
1035
}
1036

1037
func (alloc *Allocator) ownedInRange(ident string, r address.Range) []address.CIDR {
6,810×
1038
        var c []address.CIDR
6,810×
1039
        for _, cidr := range alloc.owned[ident].Cidrs {
8,439×
1040
                if r.Contains(cidr.Addr) {
3,237×
1041
                        c = append(c, cidr)
1,608×
1042
                }
1,608×
1043
        }
1044
        return c
6,810×
1045
}
1046

1047
func (alloc *Allocator) findOwner(addr address.Address) string {
187×
1048
        for ident, d := range alloc.owned {
20,539×
1049
                for _, candidate := range d.Cidrs {
40,704×
1050
                        if candidate.Addr == addr {
20,426×
1051
                                return ident
74×
1052
                        }
74×
1053
                }
1054
        }
1055
        return ""
113×
1056
}
1057

1058
// For each ID in the 'owned' map, remove the entry if it isn't in the map
1059
func (alloc *Allocator) pruneOwned(ids map[string]struct{}) {
90×
1060
        changed := false
90×
1061
        for ident, d := range alloc.owned {
104×
1062
                if !d.IsContainer {
18×
1063
                        continue
4×
1064
                }
1065
                if _, found := ids[ident]; !found {
12×
1066
                        for _, cidr := range d.Cidrs {
4×
1067
                                alloc.space.Free(cidr.Addr)
2×
1068
                        }
2×
1069
                        alloc.debugf("Deleting old entry %s: %v", ident, d.Cidrs)
2×
1070
                        delete(alloc.owned, ident)
2×
1071
                        changed = true
2×
1072
                }
1073
        }
1074
        if changed {
92×
1075
                alloc.persistOwned()
2×
1076
        }
2×
1077
}
1078

1079
// Logging
1080

1081
func (alloc *Allocator) fatalf(fmt string, args ...interface{}) {
!
1082
        alloc.logf(common.Log.Fatalf, fmt, args...)
!
1083
}
!
1084
func (alloc *Allocator) warnf(fmt string, args ...interface{}) {
!
1085
        alloc.logf(common.Log.Warnf, fmt, args...)
!
1086
}
!
1087
func (alloc *Allocator) errorf(fmt string, args ...interface{}) {
!
1088
        common.Log.Errorf("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
!
1089
}
!
1090
func (alloc *Allocator) infof(fmt string, args ...interface{}) {
912×
1091
        alloc.logf(common.Log.Infof, fmt, args...)
912×
1092
}
912×
1093
func (alloc *Allocator) debugf(fmt string, args ...interface{}) {
724×
1094
        alloc.logf(common.Log.Debugf, fmt, args...)
724×
1095
}
724×
1096
func (alloc *Allocator) logf(f func(string, ...interface{}), fmt string, args ...interface{}) {
1,636×
1097
        f("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
1,636×
1098
}
1,636×
1099
func (alloc *Allocator) debugln(args ...interface{}) {
15,806×
1100
        common.Log.Debugln(append([]interface{}{fmt.Sprintf("[allocator %s]:", alloc.ourName)}, args...)...)
15,806×
1101
}
15,806×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc