• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 13012512557

28 Jan 2025 02:22PM UTC coverage: 79.602% (-0.3%) from 79.891%
13012512557

Pull #1980

github

tolgaOzen
refactor(pkg/balancer): Refactor Config struct default value checks
Pull Request #1980: refactor: enhance distributed consistent hashing configuration and er…

288 of 548 new or added lines in 4 files covered. (52.55%)

6 existing lines in 2 files now uncovered.

8324 of 10457 relevant lines covered (79.6%)

147.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.8
/pkg/consistent/consistent.go
1
package consistent
2

3
import (
4
        "encoding/binary"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "sort"
9
        "sync"
10
)
11

12
const (
13
        // DefaultPartitionCount defines the default number of virtual partitions in the hash ring.
14
        // This helps balance the load distribution among members, even with a small number of members.
15
        DefaultPartitionCount int = 271
16

17
        // DefaultReplicationFactor specifies the default number of replicas for each partition.
18
        // This ensures redundancy and fault tolerance by assigning partitions to multiple members.
19
        DefaultReplicationFactor int = 20
20

21
        // DefaultLoad defines the default maximum load factor for each member.
22
        // A higher value allows members to handle more load before being considered full.
23
        DefaultLoad float64 = 1.25
24

25
        // DefaultPickerWidth determines the default range of candidates considered when picking members.
26
        // This can influence the selection logic in advanced configurations.
27
        DefaultPickerWidth int = 1
28
)
29

30
type Hasher func([]byte) uint64
31

32
type Member interface {
33
        String() string
34
}
35

36
// Config represents the configuration settings for a specific system or application.
37
// It includes settings for hashing, partitioning, replication, load balancing, and picker width.
38
type Config struct {
39
        // Hasher is an interface or implementation used for generating hash values.
40
        // It is typically used to distribute data evenly across partitions.
41
        Hasher Hasher
42

43
        // PartitionCount defines the number of partitions in the system.
44
        // This value affects how data is distributed and processed.
45
        PartitionCount int
46

47
        // ReplicationFactor specifies the number of replicas for each partition.
48
        // It ensures data redundancy and fault tolerance in the system.
49
        ReplicationFactor int
50

51
        // Load represents the load balancing factor for the system.
52
        // It could be a threshold or weight used for distributing work.
53
        Load float64
54

55
        // PickerWidth determines the width or range of the picker mechanism.
56
        // It is typically used to influence how selections are made in certain operations.
57
        PickerWidth int
58
}
59

60
// Consistent implements a consistent hashing mechanism with partitioning and load balancing.
61
// It is used for distributing data across a dynamic set of members efficiently.
62
type Consistent struct {
63
        // mu is a read-write mutex used to protect shared resources from concurrent access.
64
        mu sync.RWMutex
65

66
        // config holds the configuration settings for the consistent hashing instance.
67
        config Config
68

69
        // hasher is an implementation of the Hasher interface used for generating hash values.
70
        hasher Hasher
71

72
        // sortedSet maintains a sorted slice of hash values to represent the hash ring.
73
        sortedSet []uint64
74

75
        // partitionCount specifies the number of partitions in the hash ring.
76
        partitionCount uint64
77

78
        // loads tracks the load distribution for each member in the hash ring.
79
        // The key is the member's identifier, and the value is the load.
80
        loads map[string]float64
81

82
        // members is a map of member identifiers to their corresponding Member struct.
83
        members map[string]*Member
84

85
        // partitions maps each partition index to the corresponding member.
86
        partitions map[int]*Member
87

88
        // ring is a map that associates each hash value in the ring with a specific member.
89
        ring map[uint64]*Member
90
}
91

92
// New initializes and returns a new instance of the Consistent struct.
93
// It takes a Config parameter and applies default values for any unset fields.
94
func New(config Config) *Consistent {
9✔
95
        // Ensure the Hasher is not nil; a nil Hasher would make consistent hashing unusable.
9✔
96
        if config.Hasher == nil {
9✔
NEW
97
                panic("Hasher cannot be nil")
×
98
        }
99

100
        // Set default values for partition count, replication factor, load, and picker width if not provided.
101
        if config.PartitionCount == 0 {
10✔
102
                config.PartitionCount = DefaultPartitionCount
1✔
103
        }
1✔
104
        if config.ReplicationFactor == 0 {
10✔
105
                config.ReplicationFactor = DefaultReplicationFactor
1✔
106
        }
1✔
107
        if config.Load == 0 {
10✔
108
                config.Load = DefaultLoad
1✔
109
        }
1✔
110
        if config.PickerWidth == 0 {
10✔
111
                config.PickerWidth = DefaultPickerWidth
1✔
112
        }
1✔
113

114
        // Initialize a new Consistent instance with the provided configuration.
115
        c := &Consistent{
9✔
116
                config:         config,
9✔
117
                members:        make(map[string]*Member),
9✔
118
                partitionCount: uint64(config.PartitionCount),
9✔
119
                ring:           make(map[uint64]*Member),
9✔
120
        }
9✔
121

9✔
122
        // Assign the provided Hasher implementation to the instance.
9✔
123
        c.hasher = config.Hasher
9✔
124
        return c
9✔
125
}
126

127
// Members returns a slice of all the members currently in the consistent hash ring.
128
// It safely retrieves the members using a read lock to prevent data races while
129
// accessing the shared `members` map.
130
func (c *Consistent) Members() []Member {
3✔
131
        // Acquire a read lock to ensure thread-safe access to the members map.
3✔
132
        c.mu.RLock()
3✔
133
        defer c.mu.RUnlock()
3✔
134

3✔
135
        // Create a slice to hold the members, pre-allocating its capacity to avoid resizing.
3✔
136
        members := make([]Member, 0, len(c.members))
3✔
137

3✔
138
        // Iterate over the members map and append each member to the slice.
3✔
139
        for _, member := range c.members {
5✔
140
                members = append(members, *member)
2✔
141
        }
2✔
142

143
        // Return the slice of members.
144
        return members
3✔
145
}
146

147
// GetAverageLoad calculates and returns the current average load across all members.
148
// It is a public method that provides thread-safe access to the load calculation.
149
func (c *Consistent) GetAverageLoad() float64 {
2✔
150
        // Acquire a read lock to ensure thread-safe access to shared resources.
2✔
151
        c.mu.RLock()
2✔
152
        defer c.mu.RUnlock()
2✔
153

2✔
154
        // Delegate the actual load calculation to the internal helper method.
2✔
155
        return c.calculateAverageLoad()
2✔
156
}
2✔
157

158
// calculateAverageLoad is a private helper method that performs the actual calculation
159
// of the average load across all members. It is not thread-safe and should be called
160
// only from within methods that already manage locking.
161
func (c *Consistent) calculateAverageLoad() float64 {
3,254✔
162
        // If there are no members, return an average load of 0 to prevent division by zero.
3,254✔
163
        if len(c.members) == 0 {
3,255✔
164
                return 0
1✔
165
        }
1✔
166

167
        // Calculate the average load by dividing the total partition count by the number of members
168
        // and multiplying by the configured load factor.
169
        avgLoad := float64(c.partitionCount/uint64(len(c.members))) * c.config.Load
3,253✔
170

3,253✔
171
        // Use math.Ceil to round up the average load to the nearest whole number.
3,253✔
172
        return math.Ceil(avgLoad)
3,253✔
173
}
174

175
// assignPartitionWithLoad distributes a partition to a member based on the load factor.
176
// It ensures that no member exceeds the calculated average load while distributing partitions.
177
// If the distribution fails due to insufficient capacity, it panics with an error message.
178
func (c *Consistent) assignPartitionWithLoad(
179
        partitionID, startIndex int,
180
        partitionAssignments map[int]*Member,
181
        memberLoads map[string]float64,
182
) {
3,252✔
183
        // Calculate the average load to determine the maximum load a member can handle.
3,252✔
184
        averageLoad := c.calculateAverageLoad()
3,252✔
185
        var attempts int
3,252✔
186

3,252✔
187
        // Iterate to find a suitable member for the partition.
3,252✔
188
        for {
15,654✔
189
                attempts++
12,402✔
190

12,402✔
191
                // If the loop exceeds the number of members, it indicates that the partition
12,402✔
192
                // cannot be distributed with the current configuration.
12,402✔
193
                if attempts >= len(c.sortedSet) {
12,402✔
NEW
194
                        panic("not enough capacity to distribute partitions: consider decreasing the partition count, increasing the member count, or increasing the load factor")
×
195
                }
196

197
                // Get the current hash value from the sorted set.
198
                currentHash := c.sortedSet[startIndex]
12,402✔
199

12,402✔
200
                // Retrieve the member associated with the hash value.
12,402✔
201
                currentMember := *c.ring[currentHash]
12,402✔
202

12,402✔
203
                // Check the current load of the member.
12,402✔
204
                currentLoad := memberLoads[currentMember.String()]
12,402✔
205

12,402✔
206
                // If the member's load is within the acceptable range, assign the partition.
12,402✔
207
                if currentLoad+1 <= averageLoad {
15,654✔
208
                        partitionAssignments[partitionID] = &currentMember
3,252✔
209
                        memberLoads[currentMember.String()]++
3,252✔
210
                        return
3,252✔
211
                }
3,252✔
212

213
                // Move to the next member in the sorted set.
214
                startIndex++
9,150✔
215
                if startIndex >= len(c.sortedSet) {
9,150✔
NEW
216
                        // Loop back to the beginning of the sorted set if we reach the end.
×
NEW
217
                        startIndex = 0
×
NEW
218
                }
×
219
        }
220
}
221

222
// distributePartitions evenly distributes partitions among members while respecting the load factor.
223
// It ensures that partitions are assigned to members based on consistent hashing and load constraints.
224
func (c *Consistent) distributePartitions() {
12✔
225
        // Initialize maps to track the load for each member and partition assignments.
12✔
226
        memberLoads := make(map[string]float64)
12✔
227
        partitionAssignments := make(map[int]*Member)
12✔
228

12✔
229
        // Create a buffer for converting partition IDs into byte slices for hashing.
12✔
230
        partitionKeyBuffer := make([]byte, 8)
12✔
231

12✔
232
        // Iterate over all partition IDs to distribute them among members.
12✔
233
        for partitionID := uint64(0); partitionID < c.partitionCount; partitionID++ {
3,264✔
234
                // Convert the partition ID into a byte slice for hashing.
3,252✔
235
                binary.LittleEndian.PutUint64(partitionKeyBuffer, partitionID)
3,252✔
236

3,252✔
237
                // Generate a hash key for the partition using the configured hasher.
3,252✔
238
                hashKey := c.hasher(partitionKeyBuffer)
3,252✔
239

3,252✔
240
                // Find the index of the member in the sorted set where the hash key should be placed.
3,252✔
241
                index := sort.Search(len(c.sortedSet), func(i int) bool {
19,422✔
242
                        return c.sortedSet[i] >= hashKey
16,170✔
243
                })
16,170✔
244

245
                // If the index is beyond the end of the sorted set, wrap around to the beginning.
246
                if index >= len(c.sortedSet) {
3,252✔
NEW
247
                        index = 0
×
NEW
248
                }
×
249

250
                // Assign the partition to a member, ensuring the load factor is respected.
251
                c.assignPartitionWithLoad(int(partitionID), index, partitionAssignments, memberLoads)
3,252✔
252
        }
253

254
        // Update the Consistent instance with the new partition assignments and member loads.
255
        c.partitions = partitionAssignments
12✔
256
        c.loads = memberLoads
12✔
257
}
258

259
// addMemberToRing adds a member to the consistent hash ring and updates the sorted set of hashes.
260
func (c *Consistent) addMemberToRing(member Member) {
12✔
261
        // Add replication factor entries for the member in the hash ring.
12✔
262
        for replicaIndex := 0; replicaIndex < c.config.ReplicationFactor; replicaIndex++ {
252✔
263
                // Generate a unique key for each replica of the member.
240✔
264
                replicaKey := []byte(fmt.Sprintf("%s%d", member.String(), replicaIndex))
240✔
265
                hashValue := c.hasher(replicaKey)
240✔
266

240✔
267
                // Add the hash value to the ring and associate it with the member.
240✔
268
                c.ring[hashValue] = &member
240✔
269

240✔
270
                // Append the hash value to the sorted set of hashes.
240✔
271
                c.sortedSet = append(c.sortedSet, hashValue)
240✔
272
        }
240✔
273

274
        // Sort the hash values to maintain the ring's order.
275
        sort.Slice(c.sortedSet, func(i, j int) bool {
1,449✔
276
                return c.sortedSet[i] < c.sortedSet[j]
1,437✔
277
        })
1,437✔
278

279
        // Add the member to the members map.
280
        c.members[member.String()] = &member
12✔
281
}
282

283
// Add safely adds a new member to the consistent hash circle.
284
// It ensures thread safety and redistributes partitions after adding the member.
285
func (c *Consistent) Add(member Member) {
13✔
286
        // Acquire a write lock to ensure thread safety.
13✔
287
        c.mu.Lock()
13✔
288
        defer c.mu.Unlock()
13✔
289

13✔
290
        // Check if the member already exists in the ring. If it does, exit early.
13✔
291
        if _, exists := c.members[member.String()]; exists {
14✔
292
                return
1✔
293
        }
1✔
294

295
        // Add the member to the ring and redistribute partitions.
296
        c.addMemberToRing(member)
12✔
297
        c.distributePartitions()
12✔
298
}
299

300
// removeFromSortedSet removes a hash value from the sorted set of hashes.
301
func (c *Consistent) removeFromSortedSet(hashValue uint64) {
20✔
302
        for i := 0; i < len(c.sortedSet); i++ {
40✔
303
                if c.sortedSet[i] == hashValue {
40✔
304
                        // Remove the hash value by slicing the sorted set.
20✔
305
                        c.sortedSet = append(c.sortedSet[:i], c.sortedSet[i+1:]...)
20✔
306
                        break
20✔
307
                }
308
        }
309
}
310

311
// Remove deletes a member from the consistent hash circle and redistributes partitions.
312
// If the member does not exist, the method exits early.
313
func (c *Consistent) Remove(memberName string) {
1✔
314
        // Acquire a write lock to ensure thread-safe access.
1✔
315
        c.mu.Lock()
1✔
316
        defer c.mu.Unlock()
1✔
317

1✔
318
        // Check if the member exists in the hash ring. If not, exit early.
1✔
319
        if _, exists := c.members[memberName]; !exists {
1✔
NEW
320
                return
×
NEW
321
        }
×
322

323
        // Remove all replicas of the member from the hash ring and sorted set.
324
        for replicaIndex := 0; replicaIndex < c.config.ReplicationFactor; replicaIndex++ {
21✔
325
                // Generate the unique key for each replica of the member.
20✔
326
                replicaKey := []byte(fmt.Sprintf("%s%d", memberName, replicaIndex))
20✔
327
                hashValue := c.hasher(replicaKey)
20✔
328

20✔
329
                // Remove the hash value from the hash ring.
20✔
330
                delete(c.ring, hashValue)
20✔
331

20✔
332
                // Remove the hash value from the sorted set.
20✔
333
                c.removeFromSortedSet(hashValue)
20✔
334
        }
20✔
335

336
        // Remove the member from the members map.
337
        delete(c.members, memberName)
1✔
338

1✔
339
        // If no members remain, reset the partition table and exit.
1✔
340
        if len(c.members) == 0 {
2✔
341
                c.partitions = make(map[int]*Member)
1✔
342
                return
1✔
343
        }
1✔
344

345
        // Redistribute partitions among the remaining members.
NEW
346
        c.distributePartitions()
×
347
}
348

349
// GetLoadDistribution provides a thread-safe snapshot of the current load distribution across members.
350
// It returns a map where the keys are member identifiers and the values are their respective loads.
351
func (c *Consistent) GetLoadDistribution() map[string]float64 {
1✔
352
        // Acquire a read lock to ensure thread-safe access to the loads map.
1✔
353
        c.mu.RLock()
1✔
354
        defer c.mu.RUnlock()
1✔
355

1✔
356
        // Create a copy of the loads map to avoid exposing internal state.
1✔
357
        loadDistribution := make(map[string]float64)
1✔
358
        for memberName, memberLoad := range c.loads {
4✔
359
                loadDistribution[memberName] = memberLoad
3✔
360
        }
3✔
361

362
        return loadDistribution
1✔
363
}
364

365
// GetPartitionID calculates and returns the partition ID for a given key.
366
// The partition ID is determined by hashing the key and applying modulo operation with the partition count.
367
func (c *Consistent) GetPartitionID(key []byte) int {
3✔
368
        // Generate a hash value for the given key using the configured hasher.
3✔
369
        hashValue := c.hasher(key)
3✔
370

3✔
371
        // Calculate the partition ID by taking the modulus of the hash value with the partition count.
3✔
372
        return int(hashValue % c.partitionCount)
3✔
373
}
3✔
374

375
// GetPartitionOwner retrieves the owner of the specified partition in a thread-safe manner.
376
// It ensures that the access to shared resources is synchronized.
377
func (c *Consistent) GetPartitionOwner(partitionID int) Member {
1✔
378
        // Acquire a read lock to ensure thread-safe access to the partitions map.
1✔
379
        c.mu.RLock()
1✔
380
        defer c.mu.RUnlock()
1✔
381

1✔
382
        // Delegate the actual lookup to the non-thread-safe internal helper function.
1✔
383
        return c.getPartitionOwnerInternal(partitionID)
1✔
384
}
1✔
385

386
// getPartitionOwnerInternal retrieves the owner of the specified partition without thread safety.
387
// This function assumes that synchronization has been handled by the caller.
388
func (c *Consistent) getPartitionOwnerInternal(partitionID int) Member {
2✔
389
        // Lookup the member associated with the given partition ID.
2✔
390
        member, exists := c.partitions[partitionID]
2✔
391
        if !exists {
2✔
NEW
392
                // If the partition ID does not exist, return a nil Member.
×
NEW
393
                return nil
×
NEW
394
        }
×
395

396
        // Return a copy of the member to ensure thread safety.
397
        return *member
2✔
398
}
399

400
// LocateKey determines the owner of the partition corresponding to the given key.
401
// It calculates the partition ID for the key and retrieves the associated member in a thread-safe manner.
402
func (c *Consistent) LocateKey(key []byte) Member {
1✔
403
        // Calculate the partition ID based on the hash of the key.
1✔
404
        partitionID := c.GetPartitionID(key)
1✔
405

1✔
406
        // Retrieve the owner of the partition using the thread-safe method.
1✔
407
        return c.GetPartitionOwner(partitionID)
1✔
408
}
1✔
409

410
// closestN retrieves the closest N members to the given partition ID in the consistent hash ring.
411
// It ensures thread-safe access and validates that the requested count of members can be satisfied.
412
func (c *Consistent) closestN(partitionID, count int) ([]Member, error) {
2✔
413
        // Acquire a read lock to ensure thread-safe access to the members map.
2✔
414
        c.mu.RLock()
2✔
415
        defer c.mu.RUnlock()
2✔
416

2✔
417
        // Validate that the requested number of members can be satisfied.
2✔
418
        if count > len(c.members) {
3✔
419
                return nil, errors.New("not enough members to satisfy the request")
1✔
420
        }
1✔
421

422
        // Prepare a result slice to store the closest members.
423
        var closestMembers []Member
1✔
424

1✔
425
        // Get the owner of the given partition.
1✔
426
        partitionOwner := c.getPartitionOwnerInternal(partitionID)
1✔
427
        var partitionOwnerHash uint64
1✔
428

1✔
429
        // Build a hash ring by hashing all member names.
1✔
430
        var memberHashes []uint64
1✔
431
        hashToMember := make(map[uint64]*Member)
1✔
432
        for memberName, member := range c.members {
4✔
433
                // Compute the hash value for each member name.
3✔
434
                hash := c.hasher([]byte(memberName))
3✔
435

3✔
436
                // Track the hash for the partition owner.
3✔
437
                if memberName == partitionOwner.String() {
4✔
438
                        partitionOwnerHash = hash
1✔
439
                }
1✔
440

441
                // Append the hash value and map it to the corresponding member.
442
                memberHashes = append(memberHashes, hash)
3✔
443
                hashToMember[hash] = member
3✔
444
        }
445

446
        // Sort the hash values to create a consistent hash ring.
447
        sort.Slice(memberHashes, func(i, j int) bool {
3✔
448
                return memberHashes[i] < memberHashes[j]
2✔
449
        })
2✔
450

451
        // Find the index of the partition owner's hash in the sorted hash ring.
452
        ownerIndex := -1
1✔
453
        for i, hash := range memberHashes {
2✔
454
                if hash == partitionOwnerHash {
2✔
455
                        ownerIndex = i
1✔
456
                        closestMembers = append(closestMembers, *hashToMember[hash])
1✔
457
                        break
1✔
458
                }
459
        }
460

461
        // If the partition owner's hash is not found (unexpected), return an error.
462
        if ownerIndex == -1 {
1✔
NEW
463
                return nil, errors.New("partition owner not found in hash ring")
×
NEW
464
        }
×
465

466
        // Find the additional closest members by iterating around the hash ring.
467
        currentIndex := ownerIndex
1✔
468
        for len(closestMembers) < count {
2✔
469
                // Move to the next hash in the ring, wrapping around if necessary.
1✔
470
                currentIndex++
1✔
471
                if currentIndex >= len(memberHashes) {
1✔
NEW
472
                        currentIndex = 0
×
NEW
473
                }
×
474

475
                // Add the member corresponding to the current hash to the result.
476
                hash := memberHashes[currentIndex]
1✔
477
                closestMembers = append(closestMembers, *hashToMember[hash])
1✔
478
        }
479

480
        return closestMembers, nil
1✔
481
}
482

483
// ClosestN calculates the closest N members to a given key in the consistent hash ring.
484
// It uses the key to determine the partition ID and then retrieves the closest members.
485
// This is useful for identifying members for replication or redundancy.
486
func (c *Consistent) ClosestN(key []byte, count int) ([]Member, error) {
2✔
487
        // Calculate the partition ID based on the hash of the key.
2✔
488
        partitionID := c.GetPartitionID(key)
2✔
489

2✔
490
        // Retrieve the closest N members for the calculated partition ID.
2✔
491
        return c.closestN(partitionID, count)
2✔
492
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc