• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.85
/seed/controller/controller.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package controller
17

18
import (
19
        "context"
20
        "fmt"
21
        "log/slog"
22
        "sync"
23
        "time"
24

25
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
26
        "github.com/llamerada-jp/colonio/internal/constants"
27
        "github.com/llamerada-jp/colonio/internal/shared"
28
        "github.com/llamerada-jp/colonio/seed/gateway"
29
        "github.com/llamerada-jp/colonio/seed/misc"
30
)
31

32
type Options struct {
33
        Logger         *slog.Logger
34
        Gateway        gateway.Gateway
35
        NormalLifespan time.Duration // lifetime for everyday keepalive
36
        ShortLifespan  time.Duration // lifetime for reconcile to explicitly confirming
37
        Interval       time.Duration // interval for checking node status
38
}
39

40
type Controller interface {
41
        AssignNode(ctx context.Context) (*shared.NodeID, bool, error)
42
        UnassignNode(ctx context.Context, nodeID *shared.NodeID) error
43
        Keepalive(ctx context.Context, nodeID *shared.NodeID) (bool, error)
44
        ReconcileNextNodes(ctx context.Context, nodeID *shared.NodeID, nextNodeIDs, disconnectedIDs []*shared.NodeID) (bool, error)
45
        SendSignal(ctx context.Context, nodeID *shared.NodeID, signal *proto.Signal) error
46
        PollSignal(ctx context.Context, nodeID *shared.NodeID, send func(*proto.Signal) error) error
47
        StateKvs(ctx context.Context, nodeID *shared.NodeID, active bool) (constants.KvsState, error)
48
}
49

50
type ControllerImpl struct {
51
        logger            *slog.Logger
52
        gateway           gateway.Gateway
53
        normalLifespan    time.Duration
54
        shortLifespan     time.Duration
55
        interval          time.Duration
56
        mtx               sync.Mutex
57
        signalChannels    map[shared.NodeID]*misc.Channel[*proto.Signal]
58
        keepaliveChannels map[shared.NodeID]*misc.Channel[bool]
59
}
60

61
var _ gateway.Handler = &ControllerImpl{}
62
var _ Controller = &ControllerImpl{}
63

64
func NewController(options *Options) *ControllerImpl {
19✔
65
        return &ControllerImpl{
19✔
66
                logger:            options.Logger,
19✔
67
                gateway:           options.Gateway,
19✔
68
                normalLifespan:    options.NormalLifespan,
19✔
69
                shortLifespan:     options.ShortLifespan,
19✔
70
                interval:          options.Interval,
19✔
71
                signalChannels:    make(map[shared.NodeID]*misc.Channel[*proto.Signal]),
19✔
72
                keepaliveChannels: make(map[shared.NodeID]*misc.Channel[bool]),
19✔
73
        }
19✔
74
}
19✔
75

76
func (c *ControllerImpl) Run(ctx context.Context) error {
1✔
77
        ticker := time.NewTicker(c.interval)
1✔
78
        defer ticker.Stop()
1✔
79

1✔
80
        for {
12✔
81
                select {
11✔
82
                case <-ctx.Done():
1✔
83
                        return nil
1✔
84
                case <-ticker.C:
10✔
85
                        if err := c.cleanup(ctx); err != nil {
10✔
86
                                c.logger.Error("failed to cleanup nodes", slog.Any("error", err))
×
87
                        }
×
88
                }
89
        }
90
}
91

92
func (c *ControllerImpl) HandleUnassignNode(ctx context.Context, nodeID *shared.NodeID) error {
2✔
93
        c.mtx.Lock()
2✔
94
        defer c.mtx.Unlock()
2✔
95
        if ch, exists := c.signalChannels[*nodeID]; exists {
2✔
96
                ch.Close()
×
97
                delete(c.signalChannels, *nodeID)
×
98
        }
×
99
        if ch, exists := c.keepaliveChannels[*nodeID]; exists {
3✔
100
                ch.Close()
1✔
101
                delete(c.keepaliveChannels, *nodeID)
1✔
102
        }
1✔
103
        return nil
2✔
104
}
105

106
func (c *ControllerImpl) HandleKeepaliveRequest(ctx context.Context, nodeID *shared.NodeID) error {
2✔
107
        if err := c.gateway.UpdateNodeLifespan(ctx, nodeID, time.Now().Add(c.shortLifespan)); err != nil {
2✔
108
                return fmt.Errorf("failed to update node lifespan: %w", err)
×
109
        }
×
110

111
        c.mtx.Lock()
2✔
112
        defer c.mtx.Unlock()
2✔
113

2✔
114
        if ch, exists := c.keepaliveChannels[*nodeID]; exists {
3✔
115
                ch.SendWhenNotFull(true) // send a keepalive signal if channel is not full
1✔
116
        }
1✔
117

118
        return nil
2✔
119
}
120

121
func (c *ControllerImpl) HandleSignal(ctx context.Context, signal *proto.Signal, relayToNext bool) error {
3✔
122
        dstNodeID, err := shared.NewNodeIDFromProto(signal.GetDstNodeId())
3✔
123
        if err != nil {
3✔
124
                // dst node id in the signal is already checked in the validateSignal method,
×
125
                panic(fmt.Sprintf("invalid dst node id in the packet: %v", err))
×
126
        }
127

128
        c.mtx.Lock()
3✔
129
        defer c.mtx.Unlock()
3✔
130

3✔
131
        var ch *misc.Channel[*proto.Signal]
3✔
132
        if relayToNext {
4✔
133
                _, ch = misc.GetNextByMap(dstNodeID, c.signalChannels, nil)
1✔
134
        } else {
3✔
135
                ch = c.signalChannels[*dstNodeID]
2✔
136
        }
2✔
137
        if ch == nil {
4✔
138
                return fmt.Errorf("node not found: %s", dstNodeID)
1✔
139
        }
1✔
140

141
        ch.Send(signal)
2✔
142

2✔
143
        return nil
2✔
144
}
145

146
func (c *ControllerImpl) AssignNode(ctx context.Context) (*shared.NodeID, bool, error) {
9✔
147
        logger := misc.NewLogger(ctx, c.logger)
9✔
148
        nodeID, err := c.gateway.AssignNode(ctx, time.Now().Add(c.normalLifespan))
9✔
149
        if err != nil {
10✔
150
                return nil, false, fmt.Errorf("failed to assign node: %w", err)
1✔
151
        }
1✔
152

153
        isAlone, err := c.isAlone(ctx, nodeID)
8✔
154
        if err != nil {
9✔
155
                return nil, false, fmt.Errorf("failed to check if node is alone: %w", err)
1✔
156
        }
1✔
157

158
        logger.Info("node assigned", slog.String("nodeID", nodeID.String()))
7✔
159

7✔
160
        return nodeID, isAlone, nil
7✔
161
}
162

163
func (c *ControllerImpl) UnassignNode(ctx context.Context, nodeID *shared.NodeID) error {
3✔
164
        logger := misc.NewLogger(ctx, c.logger)
3✔
165

3✔
166
        if err := c.gateway.UnassignNode(ctx, nodeID); err != nil {
4✔
167
                return fmt.Errorf("failed to unassign node %s: %w", nodeID.String(), err)
1✔
168
        }
1✔
169

170
        logger.Info("node unassigned", slog.String("nodeID", nodeID.String()))
2✔
171
        return nil
2✔
172
}
173

174
func (c *ControllerImpl) Keepalive(ctx context.Context, nodeID *shared.NodeID) (bool, error) {
4✔
175
        c.mtx.Lock()
4✔
176
        if _, exists := c.keepaliveChannels[*nodeID]; exists {
5✔
177
                c.mtx.Unlock()
1✔
178
                return false, fmt.Errorf("node %s already subscribed to keepalive", nodeID.String())
1✔
179
        }
1✔
180
        ch := misc.NewChannel[bool](1) // buffered channel to avoid blocking
3✔
181
        c.keepaliveChannels[*nodeID] = ch
3✔
182
        c.mtx.Unlock()
3✔
183

3✔
184
        if err := c.gateway.UpdateNodeLifespan(ctx, nodeID, time.Now().Add(c.normalLifespan)); err != nil {
3✔
185
                return false, fmt.Errorf("failed to update node lifespan: %w", err)
×
186
        }
×
187

188
        if err := c.gateway.SubscribeKeepalive(ctx, nodeID); err != nil {
3✔
189
                return false, fmt.Errorf("failed to subscribe keepalive request: %w", err)
×
190
        }
×
191

192
        timer := time.NewTimer(c.normalLifespan / 2)
3✔
193

3✔
194
        defer func() {
6✔
195
                ch.Close()
3✔
196
                timer.Stop()
3✔
197
                c.mtx.Lock()
3✔
198
                delete(c.keepaliveChannels, *nodeID)
3✔
199
                c.mtx.Unlock()
3✔
200

3✔
201
                if err := c.gateway.UnsubscribeKeepalive(ctx, nodeID); err != nil {
3✔
202
                        logger := misc.NewLogger(ctx, c.logger)
×
203
                        logger.Error("failed to unsubscribe from keepalive", slog.String("nodeID", nodeID.String()), slog.Any("error", err))
×
204
                }
×
205
        }()
206

207
        cc := ch.C()
3✔
208
        if cc == nil {
3✔
209
                return false, nil // channel closed
×
210
        }
×
211
        select {
3✔
212
        case <-ctx.Done():
×
213
                return false, nil
×
214

215
        case ok := <-cc:
2✔
216
                if ok {
3✔
217
                        isAlone, err := c.isAlone(ctx, nodeID)
1✔
218
                        if err != nil {
1✔
219
                                return false, fmt.Errorf("failed to check if node is alone: %w", err)
×
220
                        }
×
221
                        return isAlone, nil
1✔
222
                } else {
1✔
223
                        return false, fmt.Errorf("keepalive channel closed unexpectedly for node %s", nodeID.String())
1✔
224
                }
1✔
225

226
        case <-timer.C:
1✔
227
                isAlone, err := c.isAlone(ctx, nodeID)
1✔
228
                if err != nil {
1✔
229
                        return false, fmt.Errorf("failed to check if node is alone: %w", err)
×
230
                }
×
231
                return isAlone, nil
1✔
232
        }
233
}
234

235
func (c *ControllerImpl) ReconcileNextNodes(ctx context.Context, nodeID *shared.NodeID, nextNodeIDs, disconnectedIDs []*shared.NodeID) (bool, error) {
6✔
236
        for _, disconnectedID := range disconnectedIDs {
8✔
237
                if err := c.gateway.PublishKeepaliveRequest(ctx, disconnectedID); err != nil {
2✔
238
                        return false, fmt.Errorf("failed to publish keepalive request for disconnected node %s: %w", disconnectedID.String(), err)
×
239
                }
×
240
        }
241

242
        if len(nextNodeIDs) == 0 {
8✔
243
                isAlone, err := c.isAlone(ctx, nodeID)
2✔
244
                if err != nil {
2✔
245
                        return false, fmt.Errorf("failed to check if node is alone: %w", err)
×
246
                }
×
247
                return isAlone, nil
2✔
248
        }
249

250
        nodeIDs, err := c.gateway.GetNodesByRange(ctx, nextNodeIDs[0], nextNodeIDs[len(nextNodeIDs)-1])
4✔
251
        if err != nil {
4✔
252
                return false, fmt.Errorf("failed to get nodes by range: %w", err)
×
253
        }
×
254

255
        nodeIDMap := make(map[shared.NodeID]struct{})
4✔
256
        for _, n := range nodeIDs {
17✔
257
                if n.Compare(nodeID) == 0 {
17✔
258
                        continue // skip the current node
4✔
259
                }
260
                nodeIDMap[*n] = struct{}{}
9✔
261
        }
262

263
        if len(nodeIDMap) != len(nextNodeIDs) {
6✔
264
                return false, nil
2✔
265
        }
2✔
266

267
        for _, nextNodeID := range nextNodeIDs {
7✔
268
                if _, exists := nodeIDMap[*nextNodeID]; !exists {
5✔
269
                        return false, nil // nextNodeID not found in the map, meaning it is not connected
×
270
                }
×
271
                // Remove matched node from the map to avoid double counting
272
                delete(nodeIDMap, *nextNodeID)
5✔
273
        }
274

275
        return len(nodeIDMap) == 0, nil
2✔
276
}
277

278
func (c *ControllerImpl) SendSignal(ctx context.Context, nodeID *shared.NodeID, signal *proto.Signal) error {
3✔
279
        isAlone, err := c.isAlone(ctx, nodeID)
3✔
280
        if err != nil {
3✔
281
                return fmt.Errorf("failed to check if node is alone: %w", err)
×
282
        }
×
283

284
        if !isAlone {
5✔
285
                if signal == nil {
2✔
286
                        return fmt.Errorf("request error (signal is required)")
×
287
                }
×
288

289
                if err := c.validateSignal(signal, nodeID); err != nil {
2✔
290
                        return err
×
291
                }
×
292

293
                relayToNext := false
2✔
294
                if signal.GetOffer() != nil && signal.GetOffer().GetType() == proto.SignalOffer_TYPE_NEXT {
3✔
295
                        relayToNext = true
1✔
296
                }
1✔
297

298
                if err := c.gateway.PublishSignal(ctx, signal, relayToNext); err != nil {
2✔
299
                        return fmt.Errorf("failed to publish signal: %w", err)
×
300
                }
×
301
        }
302

303
        return nil
3✔
304
}
305

306
func (c *ControllerImpl) PollSignal(ctx context.Context, nodeID *shared.NodeID, send func(*proto.Signal) error) error {
2✔
307
        logger := misc.NewLogger(ctx, c.logger)
2✔
308

2✔
309
        var ch *misc.Channel[*proto.Signal]
2✔
310
        err := func() error {
4✔
311
                c.mtx.Lock()
2✔
312
                defer c.mtx.Unlock()
2✔
313
                if _, exists := c.signalChannels[*nodeID]; exists {
3✔
314
                        return fmt.Errorf("node %s already subscribed to signal", nodeID.String())
1✔
315
                }
1✔
316
                if err := c.gateway.SubscribeSignal(ctx, nodeID); err != nil {
1✔
NEW
317
                        return fmt.Errorf("failed to subscribe to signal: %w", err)
×
NEW
318
                }
×
319
                ch = misc.NewChannel[*proto.Signal](100)
1✔
320
                c.signalChannels[*nodeID] = ch
1✔
321
                return nil
1✔
322
        }()
323
        if err != nil {
3✔
324
                return err
1✔
325
        }
1✔
326

327
        defer func() {
2✔
328
                if err := c.gateway.UnsubscribeSignal(ctx, nodeID); err != nil {
1✔
329
                        logger.Error("failed to unsubscribe from signal", slog.String("nodeID", nodeID.String()), slog.Any("error", err))
×
330
                }
×
331
                c.mtx.Lock()
1✔
332
                defer c.mtx.Unlock()
1✔
333
                ch.Close()
1✔
334
                delete(c.signalChannels, *nodeID)
1✔
335
        }()
336

337
        for {
4✔
338
                cc := ch.C()
3✔
339
                if cc == nil {
3✔
340
                        // channel closed
×
341
                        return nil
×
342
                }
×
343
                select {
3✔
344
                case <-ctx.Done():
1✔
345
                        return nil
1✔
346
                case signal, ok := <-cc:
2✔
347
                        if !ok || signal == nil {
2✔
348
                                return nil // channel closed, exit the loop
×
349
                        }
×
350
                        if err := send(signal); err != nil {
2✔
351
                                logger.Warn("failed to send signal", slog.String("nodeID", nodeID.String()), slog.Any("error", err))
×
352
                        }
×
353
                }
354
        }
355
}
356

357
func (c *ControllerImpl) StateKvs(ctx context.Context, nodeID *shared.NodeID, active bool) (constants.KvsState, error) {
6✔
358
        if active {
7✔
359
                _ = c.gateway.UnsetKvsFirstActiveCandidate(ctx)
1✔
360
        }
1✔
361

362
        if err := c.gateway.SetKvsState(ctx, nodeID, active); err != nil {
6✔
NEW
363
                return constants.KvsStateUnknown, fmt.Errorf("failed to set KVS state: %w", err)
×
NEW
364
        }
×
365

366
        active, err := c.gateway.ExistsKvsActiveNode(ctx)
6✔
367
        if err != nil {
6✔
NEW
368
                return constants.KvsStateUnknown, fmt.Errorf("failed to check if KVS active node exists: %w", err)
×
NEW
369
        }
×
370
        if active {
8✔
371
                return constants.KvsStateActive, nil
2✔
372
        }
2✔
373

374
        err = c.gateway.SetKvsFirstActiveCandidate(ctx, nodeID)
4✔
375
        if err != nil {
6✔
376
                return constants.KvsStateUnknown, nil
2✔
377
        }
2✔
378

379
        return constants.KvsStateInactive, nil
2✔
380
}
381

382
func (c *ControllerImpl) cleanup(ctx context.Context) error {
10✔
383
        nodes, err := c.gateway.GetNodes(ctx)
10✔
384
        if err != nil {
10✔
385
                return fmt.Errorf("failed to get nodes: %w", err)
×
386
        }
×
387

388
        for nodeID, lifetime := range nodes {
30✔
389
                if time.Now().After(lifetime) {
30✔
390
                        c.gateway.UnassignNode(ctx, &nodeID)
10✔
391
                }
10✔
392
        }
393

394
        return nil
10✔
395
}
396

397
func (c *ControllerImpl) isAlone(ctx context.Context, nodeID *shared.NodeID) (bool, error) {
15✔
398
        nodeCount, err := c.gateway.GetNodeCount(ctx)
15✔
399
        if err != nil {
16✔
400
                return false, fmt.Errorf("failed to get node count: %w", err)
1✔
401
        }
1✔
402
        if nodeCount == 0 {
15✔
403
                return true, nil
1✔
404
        }
1✔
405
        if nodeCount != 1 {
21✔
406
                return false, nil
8✔
407
        }
8✔
408

409
        nodes, err := c.gateway.GetNodesByRange(ctx, nil, nil)
5✔
410
        if err != nil {
5✔
411
                return false, fmt.Errorf("failed to get nodes by range: %w", err)
×
412
        }
×
413
        if len(nodes) == 0 {
5✔
414
                return true, nil // no nodes found, consider it as alone
×
415
        }
×
416
        if len(nodes) == 1 && nodes[0].Equal(nodeID) {
9✔
417
                return true, nil // only this node is found, consider it as alone
4✔
418
        }
4✔
419

420
        return false, nil
1✔
421
}
422

423
func (c *ControllerImpl) validateSignal(signal *proto.Signal, srcNodeID *shared.NodeID) error {
2✔
424
        signalSrcNodeID, err := shared.NewNodeIDFromProto(signal.GetSrcNodeId())
2✔
425
        if err != nil {
2✔
426
                return fmt.Errorf("failed to parse src node id in signal packet: %w", err)
×
427
        }
×
428
        if !srcNodeID.Equal(signalSrcNodeID) {
2✔
429
                return fmt.Errorf("src node id is invalid expected:%s, got: %s", srcNodeID.String(), signalSrcNodeID.String())
×
430
        }
×
431
        _, err = shared.NewNodeIDFromProto(signal.GetDstNodeId())
2✔
432
        if err != nil {
2✔
433
                return fmt.Errorf("failed to parse dst node id in signal packet: %w", err)
×
434
        }
×
435
        if signal.GetOffer() == nil && signal.GetAnswer() == nil && signal.GetIce() == nil {
2✔
436
                return fmt.Errorf("signal content is required")
×
437
        }
×
438
        return nil
2✔
439
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc