• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.82
/internal/kvs/raft_node.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package kvs
17

18
import (
19
        "context"
20
        "errors"
21
        "log/slog"
22
        "time"
23

24
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
25
        "github.com/llamerada-jp/colonio/config"
26
        "github.com/llamerada-jp/colonio/internal/shared"
27
        "go.etcd.io/raft/v3"
28
        "go.etcd.io/raft/v3/raftpb"
29
        proto3 "google.golang.org/protobuf/proto"
30
)
31

32
const (
33
        raftTickDuration = 100 * time.Millisecond
34
)
35

36
type raftNodeManager interface {
37
        raftNodeError(sectorKey *config.KvsSectorKey, err error)
38
        raftNodeSendMessage(dstNodeID *shared.NodeID, data *proto.RaftMessage)
39
        raftNodeApplyProposal(sectorKey *config.KvsSectorKey, proposal *proto.RaftProposalManagement)
40
        raftNodeAppendNode(sectorKey *config.KvsSectorKey, sequence uint64, nodeID *shared.NodeID)
41
        raftNodeRemoveNode(sectorKey *config.KvsSectorKey, sequence uint64)
42
}
43

44
type raftNodeStore interface {
45
        raftNodeApplyProposal(command *proto.RaftProposalStore)
46
        raftNodeGetSnapshot() ([]byte, error)
47
        raftNodeApplySnapshot(snapshot []byte) error
48
}
49

50
type raftNodeConfig struct {
51
        logger    *slog.Logger
52
        manager   raftNodeManager
53
        store     raftNodeStore
54
        sectorKey *config.KvsSectorKey
55
        join      bool
56
        member    map[uint64]*shared.NodeID
57
}
58

59
type raftNode struct {
60
        logger       *slog.Logger
61
        manager      raftNodeManager
62
        store        raftNodeStore
63
        sectorKey    config.KvsSectorKey
64
        ctx          context.Context
65
        etcdRaftNode raft.Node
66
        raftStorage  *raft.MemoryStorage
67

68
        snapshotCatchUpEntriesN uint64
69
        snapCount               uint64
70

71
        confState     raftpb.ConfState
72
        snapshotIndex uint64
73
        appliedIndex  uint64
74

75
        member map[uint64]*shared.NodeID
76
}
77

78
func newRaftNode(config *raftNodeConfig) *raftNode {
4✔
79
        n := &raftNode{
4✔
80
                logger:      config.logger,
4✔
81
                manager:     config.manager,
4✔
82
                store:       config.store,
4✔
83
                sectorKey:   *config.sectorKey,
4✔
84
                raftStorage: raft.NewMemoryStorage(),
4✔
85

4✔
86
                snapshotCatchUpEntriesN: 100,
4✔
87
                snapCount:               1000,
4✔
88

4✔
89
                member: config.member,
4✔
90
        }
4✔
91

4✔
92
        raftConfig := &raft.Config{
4✔
93
                ID:                        config.sectorKey.Sequence,
4✔
94
                ElectionTick:              10,
4✔
95
                HeartbeatTick:             1,
4✔
96
                Storage:                   n.raftStorage,
4✔
97
                MaxSizePerMsg:             1024 * 1024,
4✔
98
                MaxInflightMsgs:           256,
4✔
99
                MaxUncommittedEntriesSize: 1 << 30,
4✔
100
        }
4✔
101

4✔
102
        if !config.join {
7✔
103
                peers := make([]raft.Peer, len(config.member))
3✔
104
                i := 0
3✔
105
                for seq := range config.member {
12✔
106
                        peers[i] = raft.Peer{
9✔
107
                                ID: seq,
9✔
108
                        }
9✔
109
                        i++
9✔
110
                }
9✔
111
                n.etcdRaftNode = raft.StartNode(raftConfig, peers)
3✔
112
        } else {
1✔
113
                n.etcdRaftNode = raft.RestartNode(raftConfig)
1✔
114
        }
1✔
115

116
        return n
4✔
117
}
118

119
func (n *raftNode) start(ctx context.Context) {
4✔
120
        n.ctx = ctx
4✔
121

4✔
122
        go func() {
8✔
123
                ticker := time.NewTicker(raftTickDuration)
4✔
124
                defer ticker.Stop()
4✔
125

4✔
126
                for {
112✔
127
                        select {
108✔
128
                        case <-n.ctx.Done():
4✔
129
                                n.etcdRaftNode.Stop()
4✔
130
                                return
4✔
131

132
                        case <-ticker.C:
49✔
133
                                n.etcdRaftNode.Tick()
49✔
134

135
                        case rd := <-n.etcdRaftNode.Ready():
55✔
136
                                if !raft.IsEmptySnap(rd.Snapshot) {
55✔
NEW
137
                                        n.raftStorage.ApplySnapshot(rd.Snapshot)
×
NEW
138
                                        n.confState = rd.Snapshot.Metadata.ConfState
×
NEW
139
                                        n.snapshotIndex = rd.Snapshot.Metadata.Index
×
NEW
140
                                        n.appliedIndex = rd.Snapshot.Metadata.Index
×
NEW
141
                                        n.store.raftNodeApplySnapshot(rd.Snapshot.Data)
×
NEW
142
                                }
×
143

144
                                if err := n.raftStorage.Append(rd.Entries); err != nil {
55✔
NEW
145
                                        n.logger.Error("Failed to append entries to storage", "error", err)
×
NEW
146
                                }
×
147

148
                                if err := n.sendMessages(rd.Messages); err != nil {
55✔
NEW
149
                                        n.logger.Error("Failed to send raft messages", "error", err)
×
NEW
150
                                }
×
151

152
                                entries := rd.CommittedEntries
55✔
153
                                if err := n.publishEntries(entries); err != nil {
55✔
NEW
154
                                        n.logger.Error("Failed to publish committed entries", "error", err)
×
NEW
155
                                }
×
156

157
                                if err := n.maybeTriggerSnapshot(); err != nil {
55✔
NEW
158
                                        n.logger.Error("Failed to trigger snapshot", "error", err)
×
NEW
159
                                }
×
160

161
                                n.etcdRaftNode.Advance()
55✔
162
                        }
163
                }
164
        }()
165
}
166

167
func (n *raftNode) stop() {
1✔
168
        n.etcdRaftNode.Stop()
1✔
169
}
1✔
170

171
func (n *raftNode) appendNode(sequence uint64, nodeID *shared.NodeID) error {
1✔
172
        return n.etcdRaftNode.ProposeConfChange(n.ctx, raftpb.ConfChange{
1✔
173
                Type:    raftpb.ConfChangeAddNode,
1✔
174
                NodeID:  sequence,
1✔
175
                Context: []byte(nodeID.String()),
1✔
176
        })
1✔
177
}
1✔
178

179
func (n *raftNode) removeNode(sequence uint64) error {
2✔
180
        return n.etcdRaftNode.ProposeConfChange(n.ctx, raftpb.ConfChange{
2✔
181
                Type:   raftpb.ConfChangeRemoveNode,
2✔
182
                NodeID: sequence,
2✔
183
        })
2✔
184
}
2✔
185

186
func (n *raftNode) propose(p *proto.RaftProposal) {
3✔
187
        data, err := proto3.Marshal(p)
3✔
188
        if err != nil {
3✔
NEW
189
                panic("Failed to marshal Raft proposal: " + err.Error())
×
190
        }
191

192
        if err := n.etcdRaftNode.Propose(n.ctx, data); err != nil {
3✔
NEW
193
                n.manager.raftNodeError(&n.sectorKey, err)
×
NEW
194
        }
×
195
}
196

197
func (n *raftNode) processMessage(p *proto.RaftMessage) error {
83✔
198
        var msg raftpb.Message
83✔
199
        if err := msg.Unmarshal(p.Message); err != nil {
83✔
NEW
200
                return err
×
NEW
201
        }
×
202

203
        return n.etcdRaftNode.Step(n.ctx, msg)
83✔
204
}
205

206
func (n *raftNode) sendMessages(messages []raftpb.Message) error {
55✔
207
        for _, msg := range messages {
141✔
208
                if msg.To == 0 {
86✔
NEW
209
                        continue // skip messages without a target
×
210
                }
211

212
                // When there is a `raftpb.EntryConfChange` after creating the snapshot,
213
                // then the confState included in the snapshot is out of date. so We need
214
                // to update the confState before sending a snapshot to a follower.
215
                if msg.Type == raftpb.MsgSnap {
86✔
NEW
216
                        msg.Snapshot.Metadata.ConfState = n.confState
×
NEW
217
                }
×
218

219
                sequenceTo := uint64(msg.To)
86✔
220
                data, err := msg.Marshal()
86✔
221
                if err != nil {
86✔
NEW
222
                        return err
×
NEW
223
                }
×
224
                dstNodeID, ok := n.member[sequenceTo]
86✔
225
                if !ok {
86✔
NEW
226
                        panic("Unknown node sequence for sending Raft message")
×
227
                }
228

229
                n.manager.raftNodeSendMessage(dstNodeID, &proto.RaftMessage{
86✔
230
                        SectorId: MustMarshalUUID(n.sectorKey.SectorID),
86✔
231
                        Sequence: sequenceTo,
86✔
232
                        Message:  data,
86✔
233
                })
86✔
234
        }
235
        return nil
55✔
236
}
237

238
func (n *raftNode) publishEntries(entries []raftpb.Entry) error {
55✔
239
        proposals := make([]*proto.RaftProposal, 0)
55✔
240

55✔
241
        for _, entry := range entries {
90✔
242
                switch entry.Type {
35✔
243
                case raftpb.EntryNormal:
13✔
244
                        if len(entry.Data) > 0 {
22✔
245
                                p := &proto.RaftProposal{}
9✔
246
                                if err := proto3.Unmarshal(entry.Data, p); err != nil {
9✔
NEW
247
                                        return err
×
NEW
248
                                }
×
249
                                proposals = append(proposals, p)
9✔
250
                        }
251

252
                case raftpb.EntryConfChange:
22✔
253
                        var cc raftpb.ConfChange
22✔
254
                        if err := cc.Unmarshal(entry.Data); err != nil {
22✔
NEW
255
                                return err
×
NEW
256
                        }
×
257
                        n.confState = *n.etcdRaftNode.ApplyConfChange(cc)
22✔
258

22✔
259
                        switch cc.Type {
22✔
260
                        case raftpb.ConfChangeAddNode:
16✔
261
                                var nodeID *shared.NodeID
16✔
262
                                if len(cc.Context) != 0 {
20✔
263
                                        var err error
4✔
264
                                        nodeID, err = shared.NewNodeIDFromString(string(cc.Context))
4✔
265
                                        if err != nil {
4✔
NEW
266
                                                return err
×
NEW
267
                                        }
×
268
                                        n.member[cc.NodeID] = nodeID
4✔
269
                                } else {
12✔
270
                                        var ok bool
12✔
271
                                        nodeID, ok = n.member[cc.NodeID]
12✔
272
                                        if !ok {
12✔
NEW
273
                                                return errors.New("missing node ID in conf change")
×
NEW
274
                                        }
×
275
                                }
276
                                n.manager.raftNodeAppendNode(&n.sectorKey, cc.NodeID, nodeID)
16✔
277

278
                        case raftpb.ConfChangeRemoveNode:
6✔
279
                                delete(n.member, cc.NodeID)
6✔
280
                                n.manager.raftNodeRemoveNode(&n.sectorKey, cc.NodeID)
6✔
281

NEW
282
                        default:
×
NEW
283
                                n.logger.Warn("Unknown conf change type", "type", cc.Type)
×
284
                        }
285
                }
286
        }
287

288
        for _, proposal := range proposals {
64✔
289
                if p := proposal.GetManagement(); p != nil {
12✔
290
                        n.manager.raftNodeApplyProposal(&n.sectorKey, p)
3✔
291
                } else if p := proposal.GetStore(); p != nil {
15✔
292
                        n.store.raftNodeApplyProposal(p)
6✔
293
                }
6✔
294
        }
295

296
        return nil
55✔
297
}
298

299
func (n *raftNode) maybeTriggerSnapshot() error {
55✔
300
        // Trigger a snapshot if the number of applied entries exceeds the threshold
55✔
301
        if n.appliedIndex-n.snapshotIndex <= n.snapCount {
110✔
302
                return nil
55✔
303
        }
55✔
304

NEW
305
        data, err := n.store.raftNodeGetSnapshot()
×
NEW
306
        if err != nil {
×
NEW
307
                return err
×
NEW
308
        }
×
309

NEW
310
        _, err = n.raftStorage.CreateSnapshot(n.appliedIndex, &n.confState, data)
×
NEW
311
        if err != nil {
×
NEW
312
                return err
×
NEW
313
        }
×
314

NEW
315
        compactIndex := uint64(1)
×
NEW
316
        if n.appliedIndex > n.snapshotCatchUpEntriesN {
×
NEW
317
                compactIndex = n.appliedIndex - n.snapshotCatchUpEntriesN
×
NEW
318
        }
×
NEW
319
        if err := n.raftStorage.Compact(compactIndex); err != nil {
×
NEW
320
                if !errors.Is(err, raft.ErrCompacted) {
×
NEW
321
                        return err
×
NEW
322
                }
×
323
        }
324

NEW
325
        n.snapshotIndex = n.appliedIndex
×
NEW
326
        return nil
×
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc