Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

weaveworks / weave / 10517

2 Aug 2018 - 10:34 coverage increased (+0.2%) to 71.094%
10517

Pull #3270

circleci

Bryan Boreham
Specify https for security
Pull Request #3270: Convert CI build to CircleCI 2.0

8522 of 11987 relevant lines covered (71.09%)

85434.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.29
/ipam/allocator.go
1
package ipam
2

3
import (
4
        "bytes"
5
        "encoding/gob"
6
        "fmt"
7
        "sort"
8
        "time"
9

10
        "github.com/weaveworks/mesh"
11

12
        "github.com/weaveworks/weave/common"
13
        "github.com/weaveworks/weave/db"
14
        "github.com/weaveworks/weave/ipam/paxos"
15
        "github.com/weaveworks/weave/ipam/ring"
16
        "github.com/weaveworks/weave/ipam/space"
17
        "github.com/weaveworks/weave/ipam/tracker"
18
        "github.com/weaveworks/weave/net/address"
19
)
20

21
// Kinds of message we can unicast to other peers
22
const (
23
        msgSpaceRequest = iota
24
        msgRingUpdate
25
        msgSpaceRequestDenied
26

27
        tickInterval         = time.Second * 5
28
        MinSubnetSize        = 4 // first and last addresses are excluded, so 2 would be too small
29
        containerDiedTimeout = time.Second * 30
30
)
31

32
// operation represents something which Allocator wants to do, but
33
// which may need to wait until some other message arrives.
34
type operation interface {
35
        // Try attempts this operations and returns false if needs to be tried again.
36
        Try(alloc *Allocator) bool
37

38
        Cancel()
39

40
        // Does this operation pertain to the given container id?
41
        // Used for tidying up pending operations when containers die.
42
        ForContainer(ident string) bool
43
}
44

45
// This type is persisted hence all fields exported
46
type ownedData struct {
47
        IsContainer bool
48
        Cidrs       []address.CIDR
49
}
50

51
// Allocator brings together Ring and space.Set, and does the
52
// necessary plumbing.  Runs as a single-threaded Actor, so no locks
53
// are used around data structures.
54
type Allocator struct {
55
        actionChan        chan<- func()
56
        stopChan          chan<- struct{}
57
        ourName           mesh.PeerName
58
        seed              []mesh.PeerName          // optional user supplied ring seed
59
        universe          address.CIDR             // superset of all ranges
60
        ring              *ring.Ring               // information on ranges owned by all peers
61
        space             space.Space              // more detail on ranges owned by us
62
        owned             map[string]ownedData     // who owns what addresses, indexed by container-ID
63
        nicknames         map[mesh.PeerName]string // so we can map nicknames for rmpeer
64
        pendingAllocates  []operation              // held until we get some free space
65
        pendingClaims     []operation              // held until we know who owns the space
66
        pendingPrimes     []operation              // held while our ring is empty
67
        dead              map[string]time.Time     // containers we heard were dead, and when
68
        db                db.DB                    // persistence
69
        gossip            mesh.Gossip              // our link to the outside world for sending messages
70
        paxos             paxos.Participant
71
        awaitingConsensus bool
72
        ticker            *time.Ticker
73
        shuttingDown      bool // to avoid doing any requests while trying to shut down
74
        isKnownPeer       func(mesh.PeerName) bool
75
        quorum            func() uint
76
        now               func() time.Time
77
        tracker           tracker.LocalRangeTracker
78
}
79

80
// PreClaims are IP addresses discovered before we could initialize IPAM
81
type PreClaim struct {
82
        Ident       string // a container ID, something like "weave:expose", or api.NoContainerID
83
        IsContainer bool   // true if Ident is a container ID
84
        Cidr        address.CIDR
85
}
86

87
type Config struct {
88
        OurName     mesh.PeerName
89
        OurUID      mesh.PeerUID
90
        OurNickname string
91
        Seed        []mesh.PeerName
92
        Universe    address.CIDR
93
        IsObserver  bool
94
        PreClaims   []PreClaim
95
        Quorum      func() uint
96
        Db          db.DB
97
        IsKnownPeer func(name mesh.PeerName) bool
98
        Tracker     tracker.LocalRangeTracker
99
}
100

101
// NewAllocator creates and initialises a new Allocator
102
func NewAllocator(config Config) *Allocator {
431×
103
        var participant paxos.Participant
431×
104
        var alloc *Allocator
431×
105
        var onUpdate ring.OnUpdate
431×
106

431×
107
        if config.IsObserver {
433×
108
                participant = paxos.NewObserver()
2×
109
        } else {
431×
110
                participant = paxos.NewNode(config.OurName, config.OurUID, 0)
429×
111
        }
429×
112

113
        if config.Tracker != nil {
431×
114
                onUpdate = func(prev []address.Range, curr []address.Range, local bool) {
!
115
                        if err := config.Tracker.HandleUpdate(prev, curr, local); err != nil {
!
116
                                alloc.errorf("HandleUpdate failed: %s", err)
!
117
                        }
!
118
                }
119
        }
120

121
        alloc = &Allocator{
431×
122
                ourName:     config.OurName,
431×
123
                seed:        config.Seed,
431×
124
                universe:    config.Universe,
431×
125
                ring:        ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName, onUpdate),
431×
126
                owned:       make(map[string]ownedData),
431×
127
                db:          config.Db,
431×
128
                paxos:       participant,
431×
129
                nicknames:   map[mesh.PeerName]string{config.OurName: config.OurNickname},
431×
130
                isKnownPeer: config.IsKnownPeer,
431×
131
                quorum:      config.Quorum,
431×
132
                dead:        make(map[string]time.Time),
431×
133
                now:         time.Now,
431×
134
                tracker:     config.Tracker,
431×
135
        }
431×
136

431×
137
        alloc.pendingClaims = make([]operation, len(config.PreClaims))
431×
138
        for i, c := range config.PreClaims {
462×
139
                alloc.pendingClaims[i] = &claim{ident: c.Ident, cidr: c.Cidr}
31×
140
        }
31×
141

142
        return alloc
431×
143
}
144

145
func ParseCIDRSubnet(cidrStr string) (cidr address.CIDR, err error) {
126×
146
        cidr, err = address.ParseCIDR(cidrStr)
126×
147
        if err != nil {
126×
148
                return
!
149
        }
!
150
        if !cidr.IsSubnet() {
126×
UNCOV
151
                err = fmt.Errorf("invalid subnet - bits after network prefix are not all zero: %s", cidrStr)
!
UNCOV
152
        }
!
153
        if cidr.Size() < MinSubnetSize {
126×
UNCOV
154
                err = fmt.Errorf("invalid subnet - smaller than minimum size %d: %s", MinSubnetSize, cidrStr)
!
UNCOV
155
        }
!
156
        return
126×
157
}
158

159
// Start runs the allocator goroutine
160
func (alloc *Allocator) Start() {
431×
161
        loadedPersistedData := alloc.loadPersistedData()
431×
162
        switch {
431×
163
        case loadedPersistedData && len(alloc.seed) != 0:
!
164
                alloc.infof("Found persisted IPAM data, ignoring supplied IPAM seed")
!
165
        case loadedPersistedData:
25×
166
                alloc.infof("Initialising with persisted data")
25×
167
        case len(alloc.seed) != 0:
1×
168
                alloc.infof("Initialising with supplied IPAM seed")
1×
169
                alloc.createRing(alloc.seed)
1×
170
        case alloc.paxos.IsElector():
403×
171
                alloc.infof("Initialising via deferred consensus")
403×
172
        default:
2×
173
                alloc.infof("Initialising as observer - awaiting IPAM data from another peer")
2×
174
        }
175
        if loadedPersistedData { // do any pre-claims right away
456×
176
                alloc.tryOps(&alloc.pendingClaims)
25×
177
        }
25×
178
        actionChan := make(chan func(), mesh.ChannelSize)
431×
179
        stopChan := make(chan struct{})
431×
180
        alloc.actionChan = actionChan
431×
181
        alloc.stopChan = stopChan
431×
182
        alloc.ticker = time.NewTicker(tickInterval)
431×
183
        go alloc.actorLoop(actionChan, stopChan)
431×
184
}
185

186
// Stop makes the actor routine exit, for test purposes ONLY because any
187
// calls after this is processed will hang. Async.
188
func (alloc *Allocator) Stop() {
323×
189
        select {
323×
190
        case alloc.stopChan <- struct{}{}:
287×
191
        default:
36×
192
        }
193
}
194

195
// Operation life cycle
196

197
// Given an operation, try it, and add it to the pending queue if it didn't succeed
198
func (alloc *Allocator) doOperation(op operation, ops *[]operation) {
6,999×
199
        alloc.actionChan <- func() {
13,998×
200
                if alloc.shuttingDown {
7,000×
201
                        op.Cancel()
1×
202
                        return
1×
203
                }
1×
204
                if !op.Try(alloc) {
7,585×
205
                        *ops = append(*ops, op)
587×
206
                }
587×
207
        }
208
}
209

210
// Given an operation, remove it from the pending queue
211
//  Note the op may not be on the queue; it may have
212
//  already succeeded.  If it is on the queue, we call
213
//  cancel on it, allowing callers waiting for the resultChans
214
//  to unblock.
215
func (alloc *Allocator) cancelOp(opToCancel operation, ops *[]operation) {
!
216
        for i, op := range *ops {
!
217
                if op == opToCancel {
!
218
                        *ops = append((*ops)[:i], (*ops)[i+1:]...)
!
219
                        op.Cancel()
!
220
                        break
!
221
                }
222
        }
223
}
224

225
// Cancel all operations in a queue
226
func (alloc *Allocator) cancelOps(ops *[]operation) {
3×
227
        for _, op := range *ops {
3×
228
                op.Cancel()
!
229
        }
!
230
        *ops = []operation{}
3×
231
}
232

233
// Cancel all operations for a given container id, returns true
234
// if we found any.
235
func (alloc *Allocator) cancelOpsFor(ops *[]operation, ident string) bool {
1,422×
236
        var found bool
1,422×
237
        for i := 0; i < len(*ops); {
1,434×
238
                if op := (*ops)[i]; op.ForContainer(ident) {
14×
239
                        found = true
2×
240
                        op.Cancel()
2×
241
                        *ops = append((*ops)[:i], (*ops)[i+1:]...)
2×
242
                } else {
12×
243
                        i++
10×
244
                }
10×
245
        }
246
        return found
1,422×
247
}
248

249
// Try all operations in a queue
250
func (alloc *Allocator) tryOps(ops *[]operation) {
3,775×
251
        for i := 0; i < len(*ops); {
4,254×
252
                op := (*ops)[i]
479×
253
                if !op.Try(alloc) {
621×
254
                        i++
142×
255
                        continue
142×
256
                }
257
                *ops = append((*ops)[:i], (*ops)[i+1:]...)
337×
258
        }
259
}
260

261
// Try all pending operations
262
func (alloc *Allocator) tryPendingOps() {
1,250×
263
        // Unblock pending primes first
1,250×
264
        alloc.tryOps(&alloc.pendingPrimes)
1,250×
265
        // Process existing claims before servicing new allocations
1,250×
266
        alloc.tryOps(&alloc.pendingClaims)
1,250×
267
        alloc.tryOps(&alloc.pendingAllocates)
1,250×
268
}
1,250×
269

270
func (alloc *Allocator) havePendingOps() bool {
358×
271
        return len(alloc.pendingPrimes)+len(alloc.pendingClaims)+len(alloc.pendingAllocates) > 0
358×
272
}
358×
273

274
func (alloc *Allocator) spaceRequestDenied(sender mesh.PeerName, r address.Range) {
343×
275
        for i := 0; i < len(alloc.pendingClaims); {
663×
276
                claim := alloc.pendingClaims[i].(*claim)
320×
277
                if r.Contains(claim.cidr.Addr) {
597×
278
                        claim.deniedBy(alloc, sender)
277×
279
                        alloc.pendingClaims = append(alloc.pendingClaims[:i], alloc.pendingClaims[i+1:]...)
277×
280
                        continue
277×
281
                }
282
                i++
43×
283
        }
284
}
285

286
type errorCancelled struct {
287
        kind  string
288
        ident string
289
}
290

291
func (e *errorCancelled) Error() string {
!
292
        return fmt.Sprintf("%s request for %s cancelled", e.kind, e.ident)
!
293
}
!
294

295
// Actor client API
296

297
// Prime (Sync) - wait for consensus
298
func (alloc *Allocator) Prime() {
6×
299
        resultChan := make(chan struct{})
6×
300
        op := &prime{resultChan: resultChan}
6×
301
        alloc.doOperation(op, &alloc.pendingPrimes)
6×
302
        <-resultChan
6×
303
}
6×
304

305
// Allocate (Sync) - get new IP address for container with given name in range
306
// if there isn't any space in that range we block indefinitely
307
func (alloc *Allocator) Allocate(ident string, r address.CIDR, isContainer bool, hasBeenCancelled func() bool) (address.Address, error) {
6,442×
308
        resultChan := make(chan allocateResult)
6,442×
309
        op := &allocate{
6,442×
310
                resultChan:       resultChan,
6,442×
311
                ident:            ident,
6,442×
312
                r:                r,
6,442×
313
                isContainer:      isContainer,
6,442×
314
                hasBeenCancelled: hasBeenCancelled,
6,442×
315
        }
6,442×
316
        alloc.doOperation(op, &alloc.pendingAllocates)
6,442×
317
        result := <-resultChan
6,442×
318
        return result.addr, result.err
6,442×
319
}
6,442×
320

321
// Lookup (Sync) - get existing IP addresses for container with given name in range
322
func (alloc *Allocator) Lookup(ident string, r address.Range) ([]address.CIDR, error) {
12×
323
        resultChan := make(chan []address.CIDR)
12×
324
        alloc.actionChan <- func() {
24×
325
                resultChan <- alloc.ownedInRange(ident, r)
12×
326
        }
12×
327
        return <-resultChan, nil
12×
328
}
329

330
// Claim an address that we think we should own (Sync)
331
func (alloc *Allocator) Claim(ident string, cidr address.CIDR, isContainer, noErrorOnUnknown bool, hasBeenCancelled func() bool) error {
551×
332
        resultChan := make(chan error)
551×
333
        op := &claim{
551×
334
                resultChan:       resultChan,
551×
335
                ident:            ident,
551×
336
                cidr:             cidr,
551×
337
                isContainer:      isContainer,
551×
338
                noErrorOnUnknown: noErrorOnUnknown,
551×
339
                hasBeenCancelled: hasBeenCancelled,
551×
340
        }
551×
341
        alloc.doOperation(op, &alloc.pendingClaims)
551×
342
        return <-resultChan
551×
343
}
551×
344

345
// ContainerDied called from the updater interface.  Async.
346
func (alloc *Allocator) ContainerDied(ident string) {
711×
347
        alloc.actionChan <- func() {
1,422×
348
                if alloc.hasOwnedByContainer(ident) {
792×
349
                        alloc.debugln("Container", ident, "died; noting to remove later")
81×
350
                        alloc.dead[ident] = alloc.now()
81×
351
                }
81×
352
                // Also remove any pending ops
353
                alloc.cancelOpsFor(&alloc.pendingAllocates, ident)
711×
354
                alloc.cancelOpsFor(&alloc.pendingClaims, ident)
711×
355
        }
356
}
357

358
// ContainerDestroyed called from the updater interface.  Async.
359
func (alloc *Allocator) ContainerDestroyed(ident string) {
642×
360
        alloc.actionChan <- func() {
1,284×
361
                if alloc.hasOwnedByContainer(ident) {
673×
362
                        alloc.debugln("Container", ident, "destroyed; removing addresses")
31×
363
                        alloc.delete(ident)
31×
364
                        delete(alloc.dead, ident)
31×
365
                }
31×
366
        }
367
}
368

369
func (alloc *Allocator) removeDeadContainers() {
365×
370
        cutoff := alloc.now().Add(-containerDiedTimeout)
365×
371
        for ident, timeOfDeath := range alloc.dead {
405×
372
                if timeOfDeath.Before(cutoff) {
41×
373
                        if err := alloc.delete(ident); err == nil {
2×
374
                                alloc.debugln("Removed addresses for container", ident)
1×
375
                        }
1×
376
                        delete(alloc.dead, ident)
1×
377
                }
378
        }
379
}
380

381
func (alloc *Allocator) ContainerStarted(ident string) {
815×
382
        alloc.actionChan <- func() {
1,630×
383
                delete(alloc.dead, ident) // delete is no-op if key not in map
815×
384
        }
815×
385
}
386

387
func (alloc *Allocator) PruneOwned(ids []string) {
107×
388
        idmap := make(map[string]struct{}, len(ids))
107×
389
        for _, id := range ids {
348×
390
                idmap[id] = struct{}{}
241×
391
        }
241×
392
        alloc.actionChan <- func() {
214×
393
                alloc.pruneOwned(idmap)
107×
394
        }
107×
395
}
396

397
// Delete (Sync) - release all IP addresses for container with given name
398
func (alloc *Allocator) Delete(ident string) error {
10×
399
        errChan := make(chan error)
10×
400
        alloc.actionChan <- func() {
20×
401
                errChan <- alloc.delete(ident)
10×
402
        }
10×
403
        return <-errChan
10×
404
}
405

406
func (alloc *Allocator) delete(ident string) error {
42×
407
        cidrs := alloc.removeAllOwned(ident)
42×
408
        if len(cidrs) == 0 {
42×
409
                return fmt.Errorf("Delete: no addresses for %s", ident)
!
410
        }
!
411
        for _, cidr := range cidrs {
87×
412
                alloc.space.Free(cidr.Addr)
45×
413
        }
45×
414
        return nil
42×
415
}
416

417
// Free (Sync) - release single IP address for container
418
func (alloc *Allocator) Free(ident string, addrToFree address.Address) error {
3,905×
419
        errChan := make(chan error)
3,905×
420
        alloc.actionChan <- func() {
7,810×
421
                if alloc.removeOwned(ident, addrToFree) {
7,810×
422
                        alloc.debugln("Freed", addrToFree, "for", ident)
3,905×
423
                        alloc.space.Free(addrToFree)
3,905×
424
                        errChan <- nil
3,905×
425
                        return
3,905×
426
                }
3,905×
427

428
                errChan <- fmt.Errorf("Free: address %s not found for %s", addrToFree, ident)
!
429
        }
430
        return <-errChan
3,905×
431
}
432

433
func (alloc *Allocator) pickPeerFromNicknames(isValid func(mesh.PeerName) bool) mesh.PeerName {
2×
434
        for name := range alloc.nicknames {
4×
435
                if name != alloc.ourName && isValid(name) {
2×
436
                        return name
!
437
                }
!
438
        }
439
        return mesh.UnknownPeerName
2×
440
}
441

442
func (alloc *Allocator) pickPeerForTransfer() mesh.PeerName {
1×
443
        // first try alive peers that actively participate in IPAM (i.e. have entries)
1×
444
        if heir := alloc.ring.PickPeerForTransfer(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
1×
445
                return heir
!
446
        }
!
447
        // next try alive peers that have IPAM enabled but have no entries
448
        if heir := alloc.pickPeerFromNicknames(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
1×
449
                return heir
!
450
        }
!
451
        // next try disappeared peers that still have entries
452
        t := func(mesh.PeerName) bool { return true }
1×
453
        if heir := alloc.ring.PickPeerForTransfer(t); heir != mesh.UnknownPeerName {
1×
454
                return heir
!
455
        }
!
456
        // finally, disappeared peers that passively participated in IPAM
457
        return alloc.pickPeerFromNicknames(t)
1×
458
}
459

460
// Shutdown (Sync)
461
func (alloc *Allocator) Shutdown() {
1×
462
        alloc.infof("Shutdown")
1×
463
        doneChan := make(chan struct{})
1×
464
        alloc.actionChan <- func() {
2×
465
                alloc.shuttingDown = true
1×
466
                alloc.cancelOps(&alloc.pendingClaims)
1×
467
                alloc.cancelOps(&alloc.pendingAllocates)
1×
468
                alloc.cancelOps(&alloc.pendingPrimes)
1×
469
                heir := alloc.pickPeerForTransfer()
1×
470
                alloc.ring.Transfer(alloc.ourName, heir)
1×
471
                alloc.space.Clear()
1×
472
                if heir != mesh.UnknownPeerName {
1×
473
                        alloc.persistRing()
!
474
                        alloc.gossip.GossipBroadcast(alloc.Gossip())
!
475
                }
!
476
                doneChan <- struct{}{}
1×
477
        }
478
        <-doneChan
1×
479
}
480

481
// AdminTakeoverRanges (Sync) - take over the ranges owned by a given
482
// peer, and return how much space was transferred in the process.
483
// Only done on administrator command.
484
func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) address.Count {
3×
485
        resultChan := make(chan address.Count)
3×
486
        alloc.actionChan <- func() {
6×
487
                peername, err := alloc.lookupPeername(peerNameOrNickname)
3×
488
                if err != nil {
3×
489
                        alloc.warnf("attempt to take over range from unknown peer '%s'", peerNameOrNickname)
!
490
                        resultChan <- address.Count(0)
!
491
                        return
!
492
                }
!
493

494
                alloc.debugln("AdminTakeoverRanges:", peername)
3×
495
                if peername == alloc.ourName {
3×
496
                        alloc.warnf("attempt to take over range from ourself")
!
497
                        resultChan <- address.Count(0)
!
498
                        return
!
499
                }
!
500

501
                newRanges := alloc.ring.Transfer(peername, alloc.ourName)
3×
502

3×
503
                if len(newRanges) == 0 {
3×
504
                        resultChan <- address.Count(0)
!
505
                        return
!
506
                }
!
507

508
                before := alloc.space.NumFreeAddresses()
3×
509
                alloc.ringUpdated()
3×
510
                after := alloc.space.NumFreeAddresses()
3×
511

3×
512
                alloc.gossip.GossipBroadcast(alloc.Gossip())
3×
513

3×
514
                resultChan <- after - before
3×
515
        }
516
        return <-resultChan
3×
517
}
518

519
// Lookup a PeerName by nickname or stringified PeerName.  We can't
520
// call into the router for this because we are interested in peers
521
// that have gone away but are still in the ring, which is why we
522
// maintain our own nicknames map.
523
func (alloc *Allocator) lookupPeername(name string) (mesh.PeerName, error) {
3×
524
        for peername, nickname := range alloc.nicknames {
12×
525
                if nickname == name {
9×
526
                        return peername, nil
!
527
                }
!
528
        }
529

530
        return mesh.PeerNameFromString(name)
3×
531
}
532

533
// Restrict the peers in "nicknames" to those in the ring plus peers known to the router
534
func (alloc *Allocator) pruneNicknames() {
1,008×
535
        ringPeers := alloc.ring.PeerNames()
1,008×
536
        for name := range alloc.nicknames {
5,039×
537
                if _, ok := ringPeers[name]; !ok && !alloc.isKnownPeer(name) {
4,033×
538
                        delete(alloc.nicknames, name)
2×
539
                }
2×
540
        }
541
}
542

543
func (alloc *Allocator) annotatePeernames(names []mesh.PeerName) []string {
6×
544
        var res []string
6×
545
        for _, name := range names {
12×
546
                if nickname, found := alloc.nicknames[name]; found {
12×
547
                        res = append(res, fmt.Sprint(name, "(", nickname, ")"))
6×
548
                } else {
6×
549
                        res = append(res, name.String())
!
550
                }
!
551
        }
552
        return res
6×
553
}
554

555
// PeerGone removes nicknames of peers which are no longer mentioned
556
// in the ring. Async.
557
//
558
// NB: the function is invoked by the gossip library routines and should be
559
//     registered manually.
560
func (alloc *Allocator) PeerGone(peerName mesh.PeerName) {
76×
561
        alloc.debugf("PeerGone: peer %s", peerName)
76×
562

76×
563
        alloc.actionChan <- func() {
152×
564
                ringPeers := alloc.ring.PeerNames()
76×
565
                if _, ok := ringPeers[peerName]; !ok {
121×
566
                        delete(alloc.nicknames, peerName)
45×
567
                }
45×
568
        }
569
}
570

571
func decodeRange(msg []byte) (r address.Range, err error) {
939×
572
        decoder := gob.NewDecoder(bytes.NewReader(msg))
939×
573
        return r, decoder.Decode(&r)
939×
574
}
939×
575

576
// OnGossipUnicast (Sync)
577
func (alloc *Allocator) OnGossipUnicast(sender mesh.PeerName, msg []byte) error {
1,612×
578
        alloc.debugln("OnGossipUnicast from", sender, ": ", len(msg), "bytes")
1,612×
579
        resultChan := make(chan error)
1,612×
580
        alloc.actionChan <- func() {
3,224×
581
                switch msg[0] {
1,612×
582
                case msgSpaceRequest:
596×
583
                        alloc.debugln("Peer", sender, "asked me for space")
596×
584
                        r, err := decodeRange(msg[1:])
596×
585
                        // If we don't have a ring, just ignore a request for space.
596×
586
                        // They'll probably ask again later.
596×
587
                        if err == nil && !alloc.ring.Empty() {
1,191×
588
                                alloc.donateSpace(r, sender)
595×
589
                        }
595×
590
                        resultChan <- err
596×
591
                case msgSpaceRequestDenied:
343×
592
                        r, err := decodeRange(msg[1:])
343×
593
                        if err == nil {
686×
594
                                alloc.spaceRequestDenied(sender, r)
343×
595
                        }
343×
596
                        resultChan <- err
343×
597
                case msgRingUpdate:
673×
598
                        resultChan <- alloc.update(sender, msg[1:])
673×
599
                }
600
        }
601
        return <-resultChan
1,612×
602
}
603

604
// OnGossipBroadcast (Sync)
605
func (alloc *Allocator) OnGossipBroadcast(sender mesh.PeerName, msg []byte) (mesh.GossipData, error) {
3,230×
606
        alloc.debugln("OnGossipBroadcast from", sender, ":", len(msg), "bytes")
3,230×
607
        resultChan := make(chan error)
3,230×
608
        alloc.actionChan <- func() {
6,460×
609
                resultChan <- alloc.update(sender, msg)
3,230×
610
        }
3,230×
611
        return alloc.Gossip(), <-resultChan
3,230×
612
}
613

614
type gossipState struct {
615
        // We send a timstamp along with the information to be
616
        // gossipped for backwards-compatibility (previously to detect skewed clocks)
617
        Now       int64
618
        Nicknames map[mesh.PeerName]string
619

620
        Paxos paxos.GossipState
621
        Ring  *ring.Ring
622
}
623

624
func (alloc *Allocator) encode() []byte {
4,091×
625
        data := gossipState{
4,091×
626
                Now:       alloc.now().Unix(),
4,091×
627
                Nicknames: alloc.nicknames,
4,091×
628
        }
4,091×
629

4,091×
630
        // We're only interested in Paxos until we have a Ring.
4,091×
631
        // Non-electing participants (e.g. observers) return
4,091×
632
        // a nil gossip state in order to provoke a unicast ring
4,091×
633
        // update from electing peers who have reached consensus.
4,091×
634
        if alloc.ring.Empty() {
6,311×
635
                data.Paxos = alloc.paxos.GossipState()
2,220×
636
        } else {
4,091×
637
                data.Ring = alloc.ring
1,871×
638
        }
1,871×
639
        buf := new(bytes.Buffer)
4,091×
640
        enc := gob.NewEncoder(buf)
4,091×
641
        if err := enc.Encode(data); err != nil {
4,091×
642
                panic(err)
!
643
        }
644
        return buf.Bytes()
4,091×
645
}
646

647
// Encode (Sync)
648
func (alloc *Allocator) Encode() []byte {
3,411×
649
        resultChan := make(chan []byte)
3,411×
650
        alloc.actionChan <- func() {
6,822×
651
                resultChan <- alloc.encode()
3,411×
652
        }
3,411×
653
        return <-resultChan
3,411×
654
}
655

656
// OnGossip (Sync)
657
func (alloc *Allocator) OnGossip(msg []byte) (mesh.GossipData, error) {
187×
658
        alloc.debugln("Allocator.OnGossip:", len(msg), "bytes")
187×
659
        resultChan := make(chan error)
187×
660
        alloc.actionChan <- func() {
374×
661
                resultChan <- alloc.update(mesh.UnknownPeerName, msg)
187×
662
        }
187×
663
        return nil, <-resultChan // for now, we never propagate updates. TBD
187×
664
}
665

666
// GossipData implementation is trivial - we always gossip the latest
667
// data we have at time of sending
668
type ipamGossipData struct {
669
        alloc *Allocator
670
}
671

672
func (d *ipamGossipData) Merge(other mesh.GossipData) mesh.GossipData {
1×
673
        return d // no-op
1×
674
}
1×
675

676
func (d *ipamGossipData) Encode() [][]byte {
3,403×
677
        return [][]byte{d.alloc.Encode()}
3,403×
678
}
3,403×
679

680
// Gossip returns a GossipData implementation, which in this case always
681
// returns the latest ring state (and does nothing on merge)
682
func (alloc *Allocator) Gossip() mesh.GossipData {
4,534×
683
        return &ipamGossipData{alloc}
4,534×
684
}
4,534×
685

686
// SetInterfaces gives the allocator two interfaces for talking to the outside world
687
func (alloc *Allocator) SetInterfaces(gossip mesh.Gossip) {
431×
688
        alloc.gossip = gossip
431×
689
}
431×
690

691
// ACTOR server
692

693
func (alloc *Allocator) actorLoop(actionChan <-chan func(), stopChan <-chan struct{}) {
431×
694
        defer alloc.ticker.Stop()
431×
695
        for {
23,368×
696
                select {
22,937×
697
                case action := <-actionChan:
22,142×
698
                        action()
22,142×
699
                case <-stopChan:
287×
700
                        return
287×
701
                case <-alloc.ticker.C:
364×
702
                        // Retry things in case messages got lost between here and recipients
364×
703
                        if alloc.awaitingConsensus {
370×
704
                                alloc.propose()
6×
705
                        } else if alloc.havePendingOps() {
367×
706
                                if alloc.ring.Empty() {
3×
707
                                        alloc.establishRing()
!
708
                                } else {
3×
709
                                        alloc.tryPendingOps()
3×
710
                                }
3×
711
                        }
712
                        alloc.removeDeadContainers()
364×
713
                }
714

715
                alloc.assertInvariants()
22,506×
716
                alloc.reportFreeSpace()
22,506×
717
        }
718
}
719

720
// Helper functions
721

722
// Ensure we are making progress towards an established ring
723
func (alloc *Allocator) establishRing() {
5,713×
724
        if !alloc.ring.Empty() || alloc.awaitingConsensus {
11,186×
725
                return
5,473×
726
        }
5,473×
727

728
        alloc.awaitingConsensus = true
240×
729
        alloc.paxos.SetQuorum(alloc.quorum())
240×
730
        alloc.propose()
240×
731
        if ok, cons := alloc.paxos.Consensus(); ok {
362×
732
                // If the quorum was 1, then proposing immediately
122×
733
                // leads to consensus
122×
734
                alloc.createRing(cons.Value)
122×
735
        }
122×
736
}
737

738
func (alloc *Allocator) createRing(peers []mesh.PeerName) {
234×
739
        alloc.debugln("Paxos consensus:", peers)
234×
740
        alloc.ring.ClaimForPeers(normalizeConsensus(peers))
234×
741
        alloc.ringUpdated()
234×
742
        alloc.gossip.GossipBroadcast(alloc.Gossip())
234×
743
}
234×
744

745
func (alloc *Allocator) ringUpdated() {
1,245×
746
        // When we have a ring, we don't need paxos any more
1,245×
747
        if alloc.awaitingConsensus {
1,483×
748
                alloc.awaitingConsensus = false
238×
749
                alloc.paxos = nil
238×
750
        }
238×
751

752
        alloc.persistRing()
1,245×
753
        alloc.space.UpdateRanges(alloc.ring.OwnedRanges())
1,245×
754
        alloc.tryPendingOps()
1,245×
755
}
756

757
// For compatibility with sort.Interface
758
type peerNames []mesh.PeerName
759

760
func (a peerNames) Len() int           { return len(a) }
240×
761
func (a peerNames) Less(i, j int) bool { return a[i] < a[j] }
222×
762
func (a peerNames) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
116×
763

764
// When we get a consensus from Paxos, the peer names are not in a
765
// defined order and may contain duplicates.  This function sorts them
766
// and de-dupes.
767
func normalizeConsensus(consensus []mesh.PeerName) []mesh.PeerName {
240×
768
        if len(consensus) == 0 {
240×
769
                return nil
!
770
        }
!
771

772
        peers := make(peerNames, len(consensus))
240×
773
        copy(peers, consensus)
240×
774
        sort.Sort(peers)
240×
775

240×
776
        dst := 0
240×
777
        for src := 1; src < len(peers); src++ {
462×
778
                if peers[dst] != peers[src] {
442×
779
                        dst++
220×
780
                        peers[dst] = peers[src]
220×
781
                }
220×
782
        }
783

784
        return peers[:dst+1]
240×
785
}
786

787
func (alloc *Allocator) propose() {
246×
788
        alloc.debugf("Paxos proposing")
246×
789
        alloc.paxos.Propose()
246×
790
        alloc.gossip.GossipBroadcast(alloc.Gossip())
246×
791
}
246×
792

793
func encodeRange(r address.Range) []byte {
946×
794
        buf := new(bytes.Buffer)
946×
795
        enc := gob.NewEncoder(buf)
946×
796
        if err := enc.Encode(r); err != nil {
946×
797
                panic(err)
!
798
        }
799
        return buf.Bytes()
946×
800
}
801

802
func (alloc *Allocator) sendSpaceRequest(dest mesh.PeerName, r address.Range) error {
604×
803
        msg := append([]byte{msgSpaceRequest}, encodeRange(r)...)
604×
804
        return alloc.gossip.GossipUnicast(dest, msg)
604×
805
}
604×
806

807
func (alloc *Allocator) sendSpaceRequestDenied(dest mesh.PeerName, r address.Range) error {
342×
808
        msg := append([]byte{msgSpaceRequestDenied}, encodeRange(r)...)
342×
809
        return alloc.gossip.GossipUnicast(dest, msg)
342×
810
}
342×
811

812
func (alloc *Allocator) sendRingUpdate(dest mesh.PeerName) {
680×
813
        msg := append([]byte{msgRingUpdate}, alloc.encode()...)
680×
814
        alloc.gossip.GossipUnicast(dest, msg)
680×
815
}
680×
816

817
func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
4,090×
818
        reader := bytes.NewReader(msg)
4,090×
819
        decoder := gob.NewDecoder(reader)
4,090×
820
        var data gossipState
4,090×
821

4,090×
822
        if err := decoder.Decode(&data); err != nil {
4,090×
823
                return err
!
824
        }
!
825

826
        // Merge nicknames
827
        for peer, nickname := range data.Nicknames {
16,097×
828
                alloc.nicknames[peer] = nickname
12,007×
829
        }
12,007×
830

831
        switch {
4,090×
832
        // If someone sent us a ring, merge it into ours. Note this will move us
833
        // out of the awaiting-consensus state if we didn't have a ring already.
834
        case data.Ring != nil:
1,874×
835
                updated, err := alloc.ring.Merge(*data.Ring)
1,874×
836
                switch err {
1,874×
837
                case nil:
1,865×
838
                        if updated {
2,873×
839
                                alloc.pruneNicknames()
1,008×
840
                                alloc.ringUpdated()
1,008×
841
                        }
1,008×
842
                case ring.ErrDifferentSeeds:
3×
843
                        return fmt.Errorf("IP allocation was seeded by different peers (received: %v, ours: %v)",
3×
844
                                alloc.annotatePeernames(data.Ring.Seeds), alloc.annotatePeernames(alloc.ring.Seeds))
3×
845
                case ring.ErrDifferentRange:
!
846
                        return fmt.Errorf("Incompatible IP allocation ranges (received: %s, ours: %s)",
!
847
                                data.Ring.Range().AsCIDRString(), alloc.ring.Range().AsCIDRString())
!
848
                default:
6×
849
                        return err
6×
850
                }
851

852
        // If we reach this point we know the sender is either an elector
853
        // broadcasting a paxos proposal to form a ring or a non-elector
854
        // broadcasting a ring request. If we have a ring already we can just send
855
        // it back regardless.
856
        case !alloc.ring.Empty():
89×
857
                if sender != mesh.UnknownPeerName {
174×
858
                        alloc.sendRingUpdate(sender)
85×
859
                }
85×
860

861
        // Otherwise, we need to react according to whether or not we received a
862
        // paxos proposal.
863
        case data.Paxos != nil:
2,121×
864
                // Process the proposal (this is a no-op if we're an observer)
2,121×
865
                if alloc.paxos.Update(data.Paxos) {
3,272×
866
                        if alloc.paxos.Think() {
1,691×
867
                                // If something important changed, broadcast
540×
868
                                alloc.gossip.GossipBroadcast(alloc.Gossip())
540×
869
                        }
540×
870

871
                        if ok, cons := alloc.paxos.Consensus(); ok {
1,262×
872
                                alloc.createRing(cons.Value)
111×
873
                        }
111×
874
                }
875

876
        // No paxos proposal present, so sender is a non-elector. We don't have a
877
        // ring to send, so attempt to establish one on their behalf. NB we only do
878
        // this:
879
        //
880
        // * On an explicit broadcast request triggered by a remote allocation attempt
881
        //   (if we did so on periodic gossip we would force consensus unnecessarily)
882
        // * If we are an elector (to avoid a broadcast storm of ring request messages)
883
        default:
6×
884
                if alloc.paxos.IsElector() && sender != mesh.UnknownPeerName {
8×
885
                        alloc.establishRing()
2×
886
                }
2×
887
        }
888

889
        return nil
4,081×
890
}
891

892
func (alloc *Allocator) donateSpace(r address.Range, to mesh.PeerName) {
595×
893
        // No matter what we do, we'll send a unicast gossip
595×
894
        // of our ring back to the chap who asked for space.
595×
895
        // This serves to both tell him of any space we might
595×
896
        // have given him, or tell him where he might find some
595×
897
        // more.
595×
898
        defer alloc.sendRingUpdate(to)
595×
899

595×
900
        chunk, ok := alloc.space.Donate(r)
595×
901
        if !ok {
937×
902
                free := alloc.space.NumFreeAddressesInRange(r)
342×
903
                common.Assert(free == 0)
342×
904
                alloc.debugln("No space to give to peer", to)
342×
905
                // separate message maintains backwards-compatibility:
342×
906
                // down-level peers will ignore this and still get the ring update.
342×
907
                alloc.sendSpaceRequestDenied(to, r)
342×
908
                return
342×
909
        }
342×
910
        alloc.debugln("Giving range", chunk, "to", to)
253×
911
        alloc.ring.GrantRangeToHost(chunk.Start, chunk.End, to)
253×
912
        alloc.persistRing()
253×
913
}
914

915
func (alloc *Allocator) assertInvariants() {
22,506×
916
        // We need to ensure all ranges the ring thinks we own have
22,506×
917
        // a corresponding space in the space set, and vice versa
22,506×
918
        checkSpace := space.New()
22,506×
919
        checkSpace.AddRanges(alloc.ring.OwnedRanges())
22,506×
920
        ranges := checkSpace.OwnedRanges()
22,506×
921
        spaces := alloc.space.OwnedRanges()
22,506×
922

22,506×
923
        common.Assert(len(ranges) == len(spaces))
22,506×
924

22,506×
925
        for i := 0; i < len(ranges); i++ {
217,984×
926
                r := ranges[i]
195,478×
927
                s := spaces[i]
195,478×
928
                common.Assert(s.Start == r.Start && s.End == r.End)
195,478×
929
        }
195,478×
930
}
931

932
func (alloc *Allocator) reportFreeSpace() {
22,506×
933
        ranges := alloc.ring.OwnedRanges()
22,506×
934
        if len(ranges) == 0 {
29,661×
935
                return
7,155×
936
        }
7,155×
937

938
        freespace := make(map[address.Address]address.Count)
15,351×
939
        for _, r := range ranges {
233,887×
940
                freespace[r.Start] = alloc.space.NumFreeAddressesInRange(r)
218,536×
941
        }
218,536×
942
        if alloc.ring.ReportFree(freespace) {
24,293×
943
                alloc.persistRing()
8,942×
944
        }
8,942×
945
}
946

947
// Persistent data
948
const (
949
        ringIdent  = "ring"
950
        ownedIdent = "ownedAddresses"
951
)
952

953
func (alloc *Allocator) persistRing() {
10,846×
954
        // It would be better if these two Save operations happened in the same transaction
10,846×
955
        if err := alloc.db.Save(db.NameIdent, alloc.ourName); err != nil {
10,846×
956
                alloc.fatalf("Error persisting ring data: %s", err)
!
957
                return
!
958
        }
!
959
        if err := alloc.db.Save(ringIdent, alloc.ring); err != nil {
10,846×
960
                alloc.fatalf("Error persisting ring data: %s", err)
!
961
        }
!
962
}
963

964
// Returns true if persisted data is to be used, otherwise false
965
func (alloc *Allocator) loadPersistedData() bool {
431×
966
        var checkPeerName mesh.PeerName
431×
967
        nameFound, err := alloc.db.Load(db.NameIdent, &checkPeerName)
431×
968
        if err != nil {
431×
969
                alloc.fatalf("Error loading persisted peer name: %s", err)
!
970
        }
!
971
        var persistedRing *ring.Ring
431×
972
        ringFound, err := alloc.db.Load(ringIdent, &persistedRing)
431×
973
        if err != nil {
431×
974
                alloc.fatalf("Error loading persisted IPAM data: %s", err)
!
975
        }
!
976
        var persistedOwned map[string]ownedData
431×
977
        ownedFound, err := alloc.db.Load(ownedIdent, &persistedOwned)
431×
978
        if err != nil {
431×
979
                alloc.fatalf("Error loading persisted address data: %s", err)
!
980
        }
!
981

982
        overwritePersisted := func(fmt string, args ...interface{}) {
837×
983
                alloc.infof(fmt, args...)
406×
984
                alloc.persistRing()
406×
985
                alloc.persistOwned()
406×
986
        }
406×
987

988
        if !nameFound || !ringFound {
833×
989
                overwritePersisted("No valid persisted data")
402×
990
                return false
402×
991
        }
402×
992

993
        if checkPeerName != alloc.ourName {
31×
994
                overwritePersisted("Deleting persisted data for peername %s", checkPeerName)
2×
995
                return false
2×
996
        }
2×
997

998
        if persistedRing.Range() != alloc.universe.Range() {
29×
999
                overwritePersisted("Deleting persisted data for IPAM range %s; our range is %s", persistedRing.Range(), alloc.universe)
2×
1000
                return false
2×
1001
        }
2×
1002

1003
        alloc.ring.Restore(persistedRing)
25×
1004
        alloc.space.UpdateRanges(alloc.ring.OwnedRanges())
25×
1005

25×
1006
        if ownedFound {
50×
1007
                alloc.owned = persistedOwned
25×
1008
                for _, d := range alloc.owned {
58×
1009
                        for _, cidr := range d.Cidrs {
66×
1010
                                alloc.space.Claim(cidr.Addr)
33×
1011
                        }
33×
1012
                }
1013
        }
1014
        return true
25×
1015
}
1016

1017
func (alloc *Allocator) persistOwned() {
9,335×
1018
        if err := alloc.db.Save(ownedIdent, alloc.owned); err != nil {
9,335×
1019
                alloc.fatalf("Error persisting address data: %s", err)
!
1020
        }
!
1021
}
1022

1023
// Owned addresses
1024

1025
func (alloc *Allocator) hasOwnedByContainer(ident string) bool {
1,353×
1026
        d, b := alloc.owned[ident]
1,353×
1027
        return b && d.IsContainer
1,353×
1028
}
1,353×
1029

1030
// NB: addr must not be owned by ident already
1031
func (alloc *Allocator) addOwned(ident string, cidr address.CIDR, isContainer bool) {
4,970×
1032
        d := alloc.owned[ident]
4,970×
1033
        d.IsContainer = isContainer
4,970×
1034
        d.Cidrs = append(d.Cidrs, cidr)
4,970×
1035
        alloc.owned[ident] = d
4,970×
1036
        alloc.persistOwned()
4,970×
1037
}
4,970×
1038

1039
func (alloc *Allocator) removeAllOwned(ident string) []address.CIDR {
42×
1040
        a := alloc.owned[ident]
42×
1041
        delete(alloc.owned, ident)
42×
1042
        alloc.persistOwned()
42×
1043
        return a.Cidrs
42×
1044
}
42×
1045

1046
func (alloc *Allocator) removeOwned(ident string, addrToFree address.Address) bool {
3,911×
1047
        d := alloc.owned[ident]
3,911×
1048
        for i, ownedCidr := range d.Cidrs {
7,823×
1049
                if ownedCidr.Addr == addrToFree {
7,823×
1050
                        if len(d.Cidrs) == 1 {
7,821×
1051
                                delete(alloc.owned, ident)
3,910×
1052
                        } else {
3,911×
1053
                                d.Cidrs = append(d.Cidrs[:i], d.Cidrs[i+1:]...)
1×
1054
                                alloc.owned[ident] = d
1×
1055
                        }
1×
1056
                        alloc.persistOwned()
3,911×
1057
                        return true
3,911×
1058
                }
1059
        }
1060
        return false
!
1061
}
1062

1063
func (alloc *Allocator) ownedInRange(ident string, r address.Range) []address.CIDR {
6,740×
1064
        var c []address.CIDR
6,740×
1065
        for _, cidr := range alloc.owned[ident].Cidrs {
8,433×
1066
                if r.Contains(cidr.Addr) {
3,368×
1067
                        c = append(c, cidr)
1,675×
1068
                }
1,675×
1069
        }
1070
        return c
6,740×
1071
}
1072

1073
func (alloc *Allocator) findOwner(addr address.Address) string {
304×
1074
        for ident, d := range alloc.owned {
21,947×
1075
                for _, candidate := range d.Cidrs {
43,286×
1076
                        if candidate.Addr == addr {
21,752×
1077
                                return ident
109×
1078
                        }
109×
1079
                }
1080
        }
1081
        return ""
195×
1082
}
1083

1084
// For each ID in the 'owned' map, remove the entry if it isn't in the map
1085
func (alloc *Allocator) pruneOwned(ids map[string]struct{}) {
107×
1086
        changed := false
107×
1087
        for ident, d := range alloc.owned {
140×
1088
                if !d.IsContainer {
47×
1089
                        continue
14×
1090
                }
1091
                if _, found := ids[ident]; !found {
25×
1092
                        for _, cidr := range d.Cidrs {
12×
1093
                                alloc.space.Free(cidr.Addr)
6×
1094
                        }
6×
1095
                        alloc.debugf("Deleting old entry %s: %v", ident, d.Cidrs)
6×
1096
                        delete(alloc.owned, ident)
6×
1097
                        changed = true
6×
1098
                }
1099
        }
1100
        if changed {
113×
1101
                alloc.persistOwned()
6×
1102
        }
6×
1103
}
1104

1105
// Logging
1106

1107
func (alloc *Allocator) fatalf(fmt string, args ...interface{}) {
!
1108
        alloc.logf(common.Log.Fatalf, fmt, args...)
!
1109
}
!
1110
func (alloc *Allocator) warnf(fmt string, args ...interface{}) {
!
1111
        alloc.logf(common.Log.Warnf, fmt, args...)
!
1112
}
!
1113
func (alloc *Allocator) errorf(fmt string, args ...interface{}) {
!
1114
        common.Log.Errorf("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
!
1115
}
!
1116
func (alloc *Allocator) infof(fmt string, args ...interface{}) {
932×
1117
        alloc.logf(common.Log.Infof, fmt, args...)
932×
1118
}
932×
1119
func (alloc *Allocator) debugf(fmt string, args ...interface{}) {
761×
1120
        alloc.logf(common.Log.Debugf, fmt, args...)
761×
1121
}
761×
1122
func (alloc *Allocator) logf(f func(string, ...interface{}), fmt string, args ...interface{}) {
1,693×
1123
        f("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
1,693×
1124
}
1,693×
1125
func (alloc *Allocator) debugln(args ...interface{}) {
15,551×
1126
        common.Log.Debugln(append([]interface{}{fmt.Sprintf("[allocator %s]:", alloc.ourName)}, args...)...)
15,551×
1127
}
15,551×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc