• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/kvs/kvs.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package kvs
17

18
import (
19
        "context"
20
        "fmt"
21
        "log/slog"
22
        "sync"
23
        "time"
24

25
        "github.com/google/uuid"
26
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
27
        "github.com/llamerada-jp/colonio/config"
28
        "github.com/llamerada-jp/colonio/internal/constants"
29
        "github.com/llamerada-jp/colonio/internal/network/transferer"
30
        "github.com/llamerada-jp/colonio/internal/shared"
31
)
32

33
type memberState int
34

35
const (
36
        memberStateNormal memberState = iota
37
        memberStateCreating
38
        memberStateAppendingRaft
39
        memberStateAppendingNode
40
        memberStateRemoving
41
)
42

43
const (
44
        coordinatingNodeSequence = uint64(1)
45
)
46

47
type Handler interface {
48
        KvsGetStability() (bool, []*shared.NodeID)
49
        KvsState(state constants.KvsState) (constants.KvsState, error)
50
}
51

52
type Config struct {
53
        Logger     *slog.Logger
54
        Handler    Handler
55
        Store      config.KvsStore
56
        Transferer *transferer.Transferer
57
}
58

59
type memberEntry struct {
60
        nodeID *shared.NodeID
61
        state  memberState
62
}
63

64
type KVS struct {
65
        logger       *slog.Logger
66
        ctx          context.Context
67
        handler      Handler
68
        store        config.KvsStore
69
        transferer   *transferer.Transferer
70
        localNodeID  *shared.NodeID
71
        mtx          sync.RWMutex
72
        sectors      map[config.KvsSectorKey]*sector
73
        coordinating *config.KvsSectorKey
74
        lastSequence uint64
75
        // currentNode's member state
76
        members map[uint64]*memberEntry
77
}
78

NEW
79
func NewKVS(conf *Config) *KVS {
×
NEW
80
        k := &KVS{
×
NEW
81
                logger:     conf.Logger,
×
NEW
82
                handler:    conf.Handler,
×
NEW
83
                store:      conf.Store,
×
NEW
84
                transferer: conf.Transferer,
×
NEW
85
                sectors:    make(map[config.KvsSectorKey]*sector),
×
NEW
86
                members:    make(map[uint64]*memberEntry),
×
NEW
87
        }
×
NEW
88

×
NEW
89
        transferer.SetRequestHandler[proto.PacketContent_KvsOperation](k.transferer, k.recvOperation)
×
NEW
90
        transferer.SetRequestHandler[proto.PacketContent_RaftConfig](k.transferer, k.recvConfig)
×
NEW
91
        transferer.SetRequestHandler[proto.PacketContent_RaftConfigResponse](k.transferer, k.recvConfigResponse)
×
NEW
92
        transferer.SetRequestHandler[proto.PacketContent_RaftMessage](k.transferer, k.recvMessage)
×
NEW
93

×
NEW
94
        return k
×
NEW
95
}
×
96

NEW
97
func (k *KVS) Start(ctx context.Context, localNodeID *shared.NodeID) {
×
NEW
98
        k.localNodeID = localNodeID
×
NEW
99
        k.ctx = ctx
×
NEW
100

×
NEW
101
        go func() {
×
NEW
102
                ticker := time.NewTicker(1 * time.Second)
×
NEW
103
                defer ticker.Stop()
×
NEW
104

×
NEW
105
                for {
×
NEW
106
                        select {
×
NEW
107
                        case <-ctx.Done():
×
NEW
108
                                return
×
109

NEW
110
                        case <-ticker.C:
×
NEW
111
                                k.subRoutine()
×
112
                        }
113
                }
114
        }()
115
}
116

NEW
117
func (k *KVS) Get(key string) chan *config.KvsGetResult {
×
NEW
118
        c := make(chan *config.KvsGetResult, 1)
×
NEW
119
        k.sendOperation(proto.KvsOperation_COMMAND_GET, key, nil, func(res *proto.KvsOperationResponse, err error) {
×
NEW
120
                defer close(c)
×
NEW
121

×
NEW
122
                if err != nil {
×
NEW
123
                        c <- &config.KvsGetResult{
×
NEW
124
                                Data: nil,
×
NEW
125
                                Err:  err,
×
NEW
126
                        }
×
NEW
127
                        return
×
NEW
128
                }
×
129

NEW
130
                switch res.Error {
×
NEW
131
                case proto.KvsOperationResponse_ERROR_NONE:
×
NEW
132
                        // ok
×
NEW
133
                        c <- &config.KvsGetResult{
×
NEW
134
                                Data: res.Value,
×
NEW
135
                                Err:  nil,
×
NEW
136
                        }
×
137

NEW
138
                case proto.KvsOperationResponse_ERROR_NOT_FOUND:
×
NEW
139
                        c <- &config.KvsGetResult{
×
NEW
140
                                Data: nil,
×
NEW
141
                                Err:  fmt.Errorf("key not found: %s", key),
×
NEW
142
                        }
×
143

NEW
144
                default:
×
NEW
145
                        c <- &config.KvsGetResult{
×
NEW
146
                                Data: nil,
×
NEW
147
                                Err:  fmt.Errorf("unknown error: %d", res.Error),
×
NEW
148
                        }
×
149
                }
150
        })
151

NEW
152
        return c
×
153
}
154

NEW
155
func (k *KVS) Set(key string, value []byte) chan error {
×
NEW
156
        c := make(chan error, 1)
×
NEW
157
        k.sendOperation(proto.KvsOperation_COMMAND_SET, key, value, func(res *proto.KvsOperationResponse, err error) {
×
NEW
158
                defer close(c)
×
NEW
159

×
NEW
160
                if err != nil {
×
NEW
161
                        c <- err
×
NEW
162
                        return
×
NEW
163
                }
×
164

NEW
165
                if res.Error != proto.KvsOperationResponse_ERROR_NONE {
×
NEW
166
                        c <- fmt.Errorf("kvs set error: %d", res.Error)
×
NEW
167
                        return
×
NEW
168
                }
×
169

NEW
170
                c <- nil
×
171
        })
172

NEW
173
        return c
×
174
}
175

NEW
176
func (k *KVS) Patch(key string, value []byte) chan error {
×
NEW
177
        c := make(chan error, 1)
×
NEW
178
        k.sendOperation(proto.KvsOperation_COMMAND_PATCH, key, value, func(res *proto.KvsOperationResponse, err error) {
×
NEW
179
                defer close(c)
×
NEW
180

×
NEW
181
                if err != nil {
×
NEW
182
                        c <- err
×
NEW
183
                        return
×
NEW
184
                }
×
185

NEW
186
                if res.Error != proto.KvsOperationResponse_ERROR_NONE {
×
NEW
187
                        c <- fmt.Errorf("kvs patch error: %d", res.Error)
×
NEW
188
                        return
×
NEW
189
                }
×
190

NEW
191
                c <- nil
×
192
        })
193

NEW
194
        return c
×
195
}
196

NEW
197
func (k *KVS) Delete(key string) chan error {
×
NEW
198
        c := make(chan error, 1)
×
NEW
199
        k.sendOperation(proto.KvsOperation_COMMAND_DELETE, key, nil, func(res *proto.KvsOperationResponse, err error) {
×
NEW
200
                defer close(c)
×
NEW
201

×
NEW
202
                if err != nil {
×
NEW
203
                        c <- err
×
NEW
204
                        return
×
NEW
205
                }
×
206

NEW
207
                if res.Error != proto.KvsOperationResponse_ERROR_NONE {
×
NEW
208
                        c <- fmt.Errorf("kvs delete error: %d", res.Error)
×
NEW
209
                        return
×
NEW
210
                }
×
211

NEW
212
                c <- nil
×
213
        })
214

NEW
215
        return c
×
216
}
217

NEW
218
func (k *KVS) subRoutine() {
×
NEW
219
        isStable, nextNodeIDs := k.handler.KvsGetStability()
×
NEW
220
        if !isStable {
×
NEW
221
                return
×
NEW
222
        }
×
223

NEW
224
        k.mtx.Lock()
×
NEW
225
        defer k.mtx.Unlock()
×
NEW
226

×
NEW
227
        if k.coordinating == nil {
×
NEW
228
                k.createCluster(nextNodeIDs)
×
NEW
229
                if k.coordinating == nil {
×
NEW
230
                        return
×
NEW
231
                }
×
232
        }
233

NEW
234
        toAppend, toRemove := k.getNodeDiff(nextNodeIDs)
×
NEW
235
        for _, nodeID := range toAppend {
×
NEW
236
                k.lastSequence++
×
NEW
237
                k.members[k.lastSequence] = &memberEntry{
×
NEW
238
                        nodeID: nodeID,
×
NEW
239
                        state:  memberStateAppendingRaft,
×
NEW
240
                }
×
NEW
241
        }
×
NEW
242
        for seq := range toRemove {
×
NEW
243
                k.members[seq].state = memberStateRemoving
×
NEW
244
        }
×
245

NEW
246
        if err := k.applyConfigRaft(); err != nil {
×
NEW
247
                k.logger.Warn("Failed to apply Raft config", "error", err)
×
NEW
248
        }
×
249

NEW
250
        k.sendSettingMessage()
×
251
}
252

NEW
253
func (k *KVS) getNodeDiff(nextNodeIDs []*shared.NodeID) ([]*shared.NodeID, map[uint64]*memberEntry) {
×
NEW
254
        nextNodeIDMap := make(map[shared.NodeID]struct{})
×
NEW
255
        for _, nodeID := range nextNodeIDs {
×
NEW
256
                nextNodeIDMap[*nodeID] = struct{}{}
×
NEW
257
        }
×
258

NEW
259
        memberMap := make(map[shared.NodeID]uint64)
×
NEW
260
        for seq, entry := range k.members {
×
NEW
261
                if entry.state == memberStateRemoving {
×
NEW
262
                        continue
×
263
                }
NEW
264
                memberMap[*entry.nodeID] = seq
×
265
        }
266

NEW
267
        toAppend := make([]*shared.NodeID, 0)
×
NEW
268
        toRemove := make(map[uint64]*memberEntry)
×
NEW
269
        for nodeID := range nextNodeIDMap {
×
NEW
270
                if _, ok := memberMap[nodeID]; !ok {
×
NEW
271
                        toAppend = append(toAppend, &nodeID)
×
NEW
272
                }
×
273
        }
NEW
274
        for seq, entry := range k.members {
×
NEW
275
                if entry.state == memberStateRemoving {
×
NEW
276
                        continue
×
277
                }
NEW
278
                if _, ok := nextNodeIDMap[*entry.nodeID]; !ok {
×
NEW
279
                        toRemove[seq] = entry
×
NEW
280
                }
×
281
        }
NEW
282
        return toAppend, toRemove
×
283
}
284

NEW
285
func (k *KVS) createCluster(nextNodeIDs []*shared.NodeID) {
×
NEW
286
        clusterID, err := uuid.NewV7()
×
NEW
287
        if err != nil {
×
NEW
288
                k.logger.Error("Failed to create new cluster ID", "error", err)
×
NEW
289
                return
×
NEW
290
        }
×
291

NEW
292
        k.lastSequence = coordinatingNodeSequence
×
NEW
293
        member := make(map[uint64]*shared.NodeID)
×
NEW
294
        member[coordinatingNodeSequence] = k.localNodeID
×
NEW
295
        for _, nodeID := range nextNodeIDs {
×
NEW
296
                k.lastSequence++
×
NEW
297
                member[k.lastSequence] = nodeID
×
NEW
298
                k.members[k.lastSequence] = &memberEntry{
×
NEW
299
                        nodeID: nodeID,
×
NEW
300
                        state:  memberStateCreating,
×
NEW
301
                }
×
NEW
302
        }
×
303

NEW
304
        sectorKey := &config.KvsSectorKey{
×
NEW
305
                SectorID: clusterID,
×
NEW
306
                Sequence: coordinatingNodeSequence,
×
NEW
307
        }
×
NEW
308
        k.coordinating = sectorKey
×
NEW
309
        k.allocateCluster(sectorKey, k.localNodeID, false, member)
×
310
}
311

NEW
312
func (k *KVS) allocateCluster(sectorKey *config.KvsSectorKey, head *shared.NodeID, append bool, members map[uint64]*shared.NodeID) {
×
NEW
313
        s := &sector{}
×
NEW
314
        k.sectors[*sectorKey] = s
×
NEW
315

×
NEW
316
        raftNode := newRaftNode(&raftNodeConfig{
×
NEW
317
                logger:    k.logger,
×
NEW
318
                manager:   k,
×
NEW
319
                store:     s,
×
NEW
320
                sectorKey: sectorKey,
×
NEW
321
                join:      append,
×
NEW
322
                member:    members,
×
NEW
323
        })
×
NEW
324

×
NEW
325
        store := newStore(&storeConfig{
×
NEW
326
                sectorKey: sectorKey,
×
NEW
327
                handler:   s,
×
NEW
328
                head:      head,
×
NEW
329
        })
×
NEW
330

×
NEW
331
        s.raft = raftNode
×
NEW
332
        s.store = store
×
NEW
333

×
NEW
334
        raftNode.start(k.ctx)
×
NEW
335
}
×
336

NEW
337
func (k *KVS) applyConfigRaft() error {
×
NEW
338
        raftNode := k.sectors[*k.coordinating].raft
×
NEW
339

×
NEW
340
        for sequence, member := range k.members {
×
NEW
341
                switch member.state {
×
NEW
342
                case memberStateAppendingRaft:
×
NEW
343
                        return raftNode.appendNode(sequence, member.nodeID)
×
344

NEW
345
                case memberStateRemoving:
×
NEW
346
                        return raftNode.removeNode(sequence)
×
347
                }
348
        }
349

NEW
350
        return nil
×
351
}
352

353
type kvsOperationHandler struct {
354
        receiver func(res *proto.KvsOperationResponse, err error)
355
}
356

NEW
357
func (h *kvsOperationHandler) OnResponse(packet *shared.Packet) {
×
NEW
358
        h.receiver(packet.Content.GetKvsOperationResponse(), nil)
×
NEW
359
}
×
360

NEW
361
func (h *kvsOperationHandler) OnError(code constants.PacketErrorCode, message string) {
×
NEW
362
        h.receiver(nil, fmt.Errorf("packet error %d: %s", code, message))
×
NEW
363
}
×
364

NEW
365
func (k *KVS) sendOperation(command proto.KvsOperation_Command, key string, value []byte, receiver func(res *proto.KvsOperationResponse, err error)) {
×
NEW
366
        dst := shared.NewHashedNodeID([]byte(key))
×
NEW
367

×
NEW
368
        content := &proto.PacketContent{
×
NEW
369
                Content: &proto.PacketContent_KvsOperation{
×
NEW
370
                        KvsOperation: &proto.KvsOperation{
×
NEW
371
                                Command: command,
×
NEW
372
                                Key:     key,
×
NEW
373
                                Value:   value,
×
NEW
374
                        },
×
NEW
375
                },
×
NEW
376
        }
×
NEW
377

×
NEW
378
        k.transferer.Request(dst, shared.PacketModeNone, content, &kvsOperationHandler{
×
NEW
379
                receiver: receiver,
×
NEW
380
        })
×
NEW
381
}
×
382

NEW
383
func (k *KVS) sendSettingMessage() {
×
NEW
384
        for sequence, member := range k.members {
×
NEW
385
                if member.state == memberStateRemoving {
×
NEW
386
                        continue
×
387
                }
388

NEW
389
                members := make(map[uint64]*proto.NodeID)
×
NEW
390
                members[coordinatingNodeSequence] = k.localNodeID.Proto()
×
NEW
391
                for ms, mm := range k.members {
×
NEW
392
                        members[ms] = mm.nodeID.Proto()
×
NEW
393
                }
×
394

NEW
395
                var command proto.RaftConfig_Command
×
NEW
396
                if member.state == memberStateCreating {
×
NEW
397
                        command = proto.RaftConfig_COMMAND_CREATE
×
NEW
398
                } else {
×
NEW
399
                        command = proto.RaftConfig_COMMAND_APPEND
×
NEW
400
                }
×
401

NEW
402
                k.transferer.RequestOneWay(
×
NEW
403
                        member.nodeID,
×
NEW
404
                        shared.PacketModeExplicit|shared.PacketModeNoRetry,
×
NEW
405
                        &proto.PacketContent{
×
NEW
406
                                Content: &proto.PacketContent_RaftConfig{
×
NEW
407
                                        RaftConfig: &proto.RaftConfig{
×
NEW
408
                                                SectorId: MustMarshalUUID(k.coordinating.SectorID),
×
NEW
409
                                                Sequence: sequence,
×
NEW
410
                                                Command:  command,
×
NEW
411
                                                Members:  members,
×
NEW
412
                                        },
×
NEW
413
                                },
×
NEW
414
                        },
×
NEW
415
                )
×
416
        }
417
}
418

NEW
419
func (k *KVS) recvOperation(packet *shared.Packet) {
×
NEW
420
        content := packet.Content.GetKvsOperation()
×
NEW
421
        command := content.Command
×
NEW
422
        key := content.Key
×
NEW
423
        value := content.Value
×
NEW
424
        store := k.sectors[*k.coordinating].store
×
NEW
425

×
NEW
426
        switch command {
×
NEW
427
        case proto.KvsOperation_COMMAND_GET:
×
NEW
428
                data, err := store.get(key)
×
NEW
429

×
NEW
430
                e := proto.KvsOperationResponse_ERROR_NONE
×
NEW
431
                if err != nil {
×
NEW
432
                        if err == config.ErrorKvsStoreKeyNotFound {
×
NEW
433
                                e = proto.KvsOperationResponse_ERROR_NOT_FOUND
×
NEW
434
                        } else {
×
NEW
435
                                e = proto.KvsOperationResponse_ERROR_UNKNOWN
×
NEW
436
                        }
×
437
                }
438

NEW
439
                k.transferer.Response(packet, &proto.PacketContent{
×
NEW
440
                        Content: &proto.PacketContent_KvsOperationResponse{
×
NEW
441
                                KvsOperationResponse: &proto.KvsOperationResponse{
×
NEW
442
                                        Error: e,
×
NEW
443
                                        Value: data,
×
NEW
444
                                },
×
NEW
445
                        },
×
NEW
446
                })
×
447

NEW
448
        case proto.KvsOperation_COMMAND_SET:
×
NEW
449
                err := store.set(key, value)
×
NEW
450
                e := proto.KvsOperationResponse_ERROR_NONE
×
NEW
451
                if err != nil {
×
NEW
452
                        e = proto.KvsOperationResponse_ERROR_UNKNOWN
×
NEW
453
                }
×
454

NEW
455
                k.transferer.Response(packet, &proto.PacketContent{
×
NEW
456
                        Content: &proto.PacketContent_KvsOperationResponse{
×
NEW
457
                                KvsOperationResponse: &proto.KvsOperationResponse{
×
NEW
458
                                        Error: e,
×
NEW
459
                                },
×
NEW
460
                        },
×
NEW
461
                })
×
462

NEW
463
        case proto.KvsOperation_COMMAND_PATCH:
×
NEW
464
                err := store.patch(key, value)
×
NEW
465
                e := proto.KvsOperationResponse_ERROR_NONE
×
NEW
466
                if err != nil {
×
NEW
467
                        e = proto.KvsOperationResponse_ERROR_UNKNOWN
×
NEW
468
                }
×
469

NEW
470
                k.transferer.Response(packet, &proto.PacketContent{
×
NEW
471
                        Content: &proto.PacketContent_KvsOperationResponse{
×
NEW
472
                                KvsOperationResponse: &proto.KvsOperationResponse{
×
NEW
473
                                        Error: e,
×
NEW
474
                                },
×
NEW
475
                        },
×
NEW
476
                })
×
477

NEW
478
        case proto.KvsOperation_COMMAND_DELETE:
×
NEW
479
                err := store.delete(key)
×
NEW
480
                e := proto.KvsOperationResponse_ERROR_NONE
×
NEW
481
                if err != nil {
×
NEW
482
                        e = proto.KvsOperationResponse_ERROR_UNKNOWN
×
NEW
483
                }
×
484

NEW
485
                k.transferer.Response(packet, &proto.PacketContent{
×
NEW
486
                        Content: &proto.PacketContent_KvsOperationResponse{
×
NEW
487
                                KvsOperationResponse: &proto.KvsOperationResponse{
×
NEW
488
                                        Error: e,
×
NEW
489
                                },
×
NEW
490
                        },
×
NEW
491
                })
×
492

NEW
493
        default:
×
NEW
494
                k.transferer.Response(packet, &proto.PacketContent{
×
NEW
495
                        Content: &proto.PacketContent_KvsOperationResponse{
×
NEW
496
                                KvsOperationResponse: &proto.KvsOperationResponse{
×
NEW
497
                                        Error: proto.KvsOperationResponse_ERROR_UNKNOWN,
×
NEW
498
                                },
×
NEW
499
                        },
×
NEW
500
                })
×
501
        }
502
}
503

NEW
504
func (k *KVS) recvConfig(packet *shared.Packet) {
×
NEW
505
        content := packet.Content.GetRaftConfig()
×
NEW
506
        sectorID, err := uuid.ParseBytes(content.SectorId)
×
NEW
507
        if err != nil {
×
NEW
508
                k.logger.Warn("Failed to parse promoter NodeID", "error", err)
×
NEW
509
                return
×
NEW
510
        }
×
NEW
511
        sequence := content.Sequence
×
NEW
512
        command := content.Command
×
NEW
513
        members := make(map[uint64]*shared.NodeID)
×
NEW
514
        for seq, nodeID := range content.Members {
×
NEW
515
                var err error
×
NEW
516
                members[seq], err = shared.NewNodeIDFromProto(nodeID)
×
NEW
517
                if err != nil {
×
NEW
518
                        k.logger.Warn("Failed to create NodeID from proto", "error", err, "nodeID", nodeID)
×
NEW
519
                        return
×
NEW
520
                }
×
521
        }
522

NEW
523
        k.mtx.Lock()
×
NEW
524
        defer k.mtx.Unlock()
×
NEW
525

×
NEW
526
        sectorKey := config.KvsSectorKey{
×
NEW
527
                SectorID: sectorID,
×
NEW
528
                Sequence: sequence,
×
NEW
529
        }
×
NEW
530
        switch command {
×
NEW
531
        case proto.RaftConfig_COMMAND_CREATE, proto.RaftConfig_COMMAND_APPEND:
×
NEW
532
                if _, ok := k.sectors[sectorKey]; !ok {
×
NEW
533
                        append := true
×
NEW
534
                        if command == proto.RaftConfig_COMMAND_CREATE {
×
NEW
535
                                append = false
×
NEW
536
                        }
×
NEW
537
                        k.allocateCluster(&sectorKey, packet.SrcNodeID, append, members)
×
538
                }
539

NEW
540
        default:
×
NEW
541
                k.logger.Warn("Unknown RaftConfig command", "command", command)
×
NEW
542
                return
×
543
        }
544

545
        // send response when the node is created or appended
NEW
546
        response := &proto.PacketContent{
×
NEW
547
                Content: &proto.PacketContent_RaftConfigResponse{
×
NEW
548
                        RaftConfigResponse: &proto.RaftConfigResponse{
×
NEW
549
                                SectorId: MustMarshalUUID(sectorID),
×
NEW
550
                                Sequence: sequence,
×
NEW
551
                        },
×
NEW
552
                },
×
NEW
553
        }
×
NEW
554
        k.transferer.RequestOneWay(packet.SrcNodeID, shared.PacketModeExplicit|shared.PacketModeNoRetry, response)
×
555
}
556

NEW
557
func (k *KVS) recvConfigResponse(packet *shared.Packet) {
×
NEW
558
        content := packet.Content.GetRaftConfigResponse()
×
NEW
559
        sectorID, err := uuid.ParseBytes(content.SectorId)
×
NEW
560
        if err != nil {
×
NEW
561
                k.logger.Warn("Failed to parse promoter NodeID", "error", err)
×
NEW
562
                return
×
NEW
563
        }
×
NEW
564
        sequence := content.Sequence
×
NEW
565

×
NEW
566
        k.mtx.Lock()
×
NEW
567
        defer k.mtx.Unlock()
×
NEW
568

×
NEW
569
        // ignore response if it does not match the current node
×
NEW
570
        if k.coordinating == nil || k.coordinating.SectorID != sectorID {
×
NEW
571
                return
×
NEW
572
        }
×
573

NEW
574
        if entry, ok := k.members[sequence]; ok && entry.nodeID.Equal(packet.SrcNodeID) {
×
NEW
575
                entry.state = memberStateNormal
×
NEW
576
        }
×
577
}
578

NEW
579
func (k *KVS) recvMessage(packet *shared.Packet) {
×
NEW
580
        content := packet.Content.GetRaftMessage()
×
NEW
581
        sectorID, err := uuid.ParseBytes(content.SectorId)
×
NEW
582
        if err != nil {
×
NEW
583
                k.logger.Warn("Failed to parse promoter NodeID", "error", err)
×
NEW
584
                return
×
NEW
585
        }
×
586

NEW
587
        key := config.KvsSectorKey{
×
NEW
588
                SectorID: sectorID,
×
NEW
589
                Sequence: content.Sequence,
×
NEW
590
        }
×
NEW
591

×
NEW
592
        k.mtx.RLock()
×
NEW
593
        n, ok := k.sectors[key]
×
NEW
594
        k.mtx.RUnlock()
×
NEW
595
        if !ok {
×
NEW
596
                return
×
NEW
597
        }
×
598

NEW
599
        if err := n.raft.processMessage(content); err != nil {
×
NEW
600
                k.logger.Error("Failed to process Raft message", "error", err)
×
NEW
601
        }
×
602
}
603

NEW
604
func (k *KVS) raftNodeError(sectorKey *config.KvsSectorKey, err error) {
×
NEW
605
        panic(fmt.Sprintf("raftNodeError not implemented: %s", err))
×
606
}
607

NEW
608
func (k *KVS) raftNodeSendMessage(dstNodeID *shared.NodeID, message *proto.RaftMessage) {
×
NEW
609
        k.transferer.RequestOneWay(
×
NEW
610
                dstNodeID,
×
NEW
611
                shared.PacketModeExplicit|shared.PacketModeNoRetry,
×
NEW
612
                &proto.PacketContent{
×
NEW
613
                        Content: &proto.PacketContent_RaftMessage{
×
NEW
614
                                RaftMessage: message,
×
NEW
615
                        },
×
NEW
616
                })
×
NEW
617
}
×
618

NEW
619
func (k *KVS) raftNodeApplyProposal(sectorKey *config.KvsSectorKey, proposal *proto.RaftProposalManagement) {
×
NEW
620
        _, ok := k.sectors[*sectorKey]
×
NEW
621
        if !ok {
×
NEW
622
                return
×
NEW
623
        }
×
624

NEW
625
        panic("raftNodeApplyProposal not implemented")
×
626
}
627

NEW
628
func (k *KVS) raftNodeAppendNode(sectorKey *config.KvsSectorKey, sequence uint64, nodeID *shared.NodeID) {
×
NEW
629
        if *sectorKey != *k.coordinating {
×
NEW
630
                return
×
NEW
631
        }
×
632

NEW
633
        k.mtx.Lock()
×
NEW
634
        defer k.mtx.Unlock()
×
NEW
635

×
NEW
636
        member, ok := k.members[sequence]
×
NEW
637
        // unknown
×
NEW
638
        if !ok {
×
NEW
639
                k.logger.Warn("Unknown node sequence for appending", "sequence", sequence, "nodeID", nodeID)
×
NEW
640
                k.members[sequence] = &memberEntry{
×
NEW
641
                        nodeID: nodeID,
×
NEW
642
                        state:  memberStateRemoving,
×
NEW
643
                }
×
NEW
644
                return
×
NEW
645
        }
×
646

NEW
647
        if member.state == memberStateAppendingRaft {
×
NEW
648
                member.state = memberStateAppendingNode
×
NEW
649
        } else {
×
NEW
650
                k.logger.Warn("Unexpected state for appending node", "sequence", sequence, "state", member.state, "nodeID", nodeID)
×
NEW
651
        }
×
652
}
653

NEW
654
func (k *KVS) raftNodeRemoveNode(sectorKey *config.KvsSectorKey, sequence uint64) {
×
NEW
655
        k.mtx.RLock()
×
NEW
656
        sector, ok := k.sectors[*sectorKey]
×
NEW
657
        k.mtx.RUnlock()
×
NEW
658
        if !ok {
×
NEW
659
                return
×
NEW
660
        }
×
661

NEW
662
        if *sectorKey == *k.coordinating {
×
NEW
663
                delete(k.members, sequence)
×
NEW
664
        }
×
665

NEW
666
        sector.raft.stop()
×
NEW
667
        k.mtx.Lock()
×
NEW
668
        defer k.mtx.Unlock()
×
NEW
669
        delete(k.sectors, *sectorKey)
×
670
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc