• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 27415771421

12 Jun 2026 12:30PM UTC coverage: 46.062% (-18.6%) from 64.66%
27415771421

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

456 of 2649 new or added lines in 24 files covered. (17.21%)

6 existing lines in 2 files now uncovered.

3117 of 6767 relevant lines covered (46.06%)

28.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.77
/node/internal/network/node_accessor/node_link.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package node_accessor
17

18
import (
19
        "bytes"
20
        "context"
21
        "fmt"
22
        "log/slog"
23
        "math/bits"
24
        "sync"
25
        "time"
26

27
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
28
        "github.com/llamerada-jp/colonio/types"
29
        networkTypes "github.com/llamerada-jp/colonio/types/network"
30
        proto3 "google.golang.org/protobuf/proto"
31
)
32

33
type nodeLinkState int
34

35
const (
36
        nodeLinkStateConnecting nodeLinkState = iota
37
        nodeLinkStateOnline
38
        nodeLinkStateDisabled
39
)
40

41
type NodeLinkConfig struct {
42
        ctx    context.Context
43
        logger *slog.Logger
44

45
        // label is used for WebRTC data channel's label when isOffer is true
46
        label string
47

48
        ICEServers []*networkTypes.ICEServer
49
        // SessionTimeout is used to determine the timeout of the WebRTC session between nodes.
50
        SessionTimeout time.Duration
51
        // KeepaliveInterval is the interval to send a ping packet to tell living the node for each nodes.
52
        KeepaliveInterval time.Duration
53
        //  BufferInterval is maximum interval for buffering packets between nodes.
54
        BufferInterval time.Duration
55
        // PacketBaseBytes is a reference value for the packet size to be sent in WebRTC communication,
56
        // since WebRTC data channel may fail to send too large packets.
57
        PacketBaseBytes int
58
}
59

60
type nodeLinkHandler interface {
61
        nodeLinkChangeState(*nodeLink, nodeLinkState)
62
        nodeLinkUpdateICE(*nodeLink, string)
63
        nodeLinkRecvPacket(*nodeLink, *networkTypes.Packet)
64
}
65

66
type waiting struct {
67
        packet  *networkTypes.Packet
68
        content []byte
69
}
70

71
type nodeLink struct {
72
        config             *NodeLinkConfig
73
        handler            nodeLinkHandler
74
        ctx                context.Context
75
        cancel             context.CancelFunc
76
        webrtc             webRTCLink
77
        flushMtx           sync.Mutex
78
        stateMtx           sync.RWMutex
79
        state              nodeLinkState
80
        keepaliveTimestamp time.Time
81
        queueMtx           sync.Mutex
82
        queue              []*waiting
83
        keepaliveTicker    *time.Ticker
84
        bufferTicker       *time.Ticker
85
        stackMtx           sync.Mutex
86
        stackID            uint32
87
        stack              []*proto.NodePacket
88
}
89

90
func newNodeLink(config *NodeLinkConfig, handler nodeLinkHandler, isOffer bool) (*nodeLink, error) {
51✔
91
        ctx, cancel := context.WithCancel(config.ctx)
51✔
92

51✔
93
        // The bufferTicker fires for configured intervals when some packets are in the buffer,
51✔
94
        // otherwise it disabled immediately.
51✔
95
        bufferTicker := time.NewTicker(1 * time.Second)
51✔
96
        if config.BufferInterval == 0 {
52✔
97
                bufferTicker.Stop()
1✔
98
        }
1✔
99

100
        var keepaliveTicker *time.Ticker
51✔
101
        if config.KeepaliveInterval > 0 {
102✔
102
                keepaliveTicker = time.NewTicker(config.KeepaliveInterval)
51✔
103
        }
51✔
104

105
        link := &nodeLink{
51✔
106
                config:             config,
51✔
107
                ctx:                ctx,
51✔
108
                cancel:             cancel,
51✔
109
                handler:            handler,
51✔
110
                stateMtx:           sync.RWMutex{},
51✔
111
                state:              nodeLinkStateConnecting,
51✔
112
                keepaliveTimestamp: time.Now(),
51✔
113
                queueMtx:           sync.Mutex{},
51✔
114
                queue:              make([]*waiting, 0),
51✔
115
                keepaliveTicker:    keepaliveTicker,
51✔
116
                bufferTicker:       bufferTicker,
51✔
117
                stackMtx:           sync.Mutex{},
51✔
118
                stackID:            0,
51✔
119
                stack:              nil,
51✔
120
        }
51✔
121

51✔
122
        label := ""
51✔
123
        if isOffer {
77✔
124
                label = config.label
26✔
125
        }
26✔
126
        var err error
51✔
127
        link.webrtc, err = defaultWebRTCLinkFactory(&webRTCLinkConfig{
51✔
128
                iceServers: config.ICEServers,
51✔
129
                isOffer:    isOffer,
51✔
130
                label:      label,
51✔
131
        }, &webRTCLinkEventHandler{
51✔
132
                raiseError:      link.webrtcRaiseError,
51✔
133
                changeLinkState: link.webrtcChangeLinkState,
51✔
134
                updateICE:       link.webrtcUpdateICE,
51✔
135
                recvData:        link.webrtcRecvData,
51✔
136
        })
51✔
137
        if err != nil {
51✔
138
                return nil, fmt.Errorf("failed to create WebRTCLink %w", err)
×
139
        }
×
140

141
        go link.routine()
51✔
142

51✔
143
        return link, nil
51✔
144
}
145

146
func (n *nodeLink) getLabel() string {
8✔
147
        return n.webrtc.getLabel()
8✔
148
}
8✔
149

150
func (n *nodeLink) getLocalSDP() (string, error) {
50✔
151
        sdp, err := n.webrtc.getLocalSDP()
50✔
152
        if err != nil {
50✔
153
                n.webrtc.disconnect()
×
154
                return "", fmt.Errorf("failed to get local SDP %w", err)
×
155
        }
×
156
        return sdp, nil
50✔
157
}
158

159
func (n *nodeLink) setRemoteSDP(sdp string) error {
49✔
160
        err := n.webrtc.setRemoteSDP(sdp)
49✔
161
        if err != nil {
49✔
162
                n.webrtc.disconnect()
×
163
                return fmt.Errorf("failed to set remote SDP %w", err)
×
164
        }
×
165
        return nil
49✔
166
}
167

168
func (n *nodeLink) updateICE(ice string) error {
98✔
169
        err := n.webrtc.updateICE(ice)
98✔
170
        if err != nil {
98✔
171
                n.webrtc.disconnect()
×
172
                return fmt.Errorf("failed to update ICE %w", err)
×
173
        }
×
174
        return nil
98✔
175
}
176

177
func (n *nodeLink) disconnect() error {
151✔
178
        n.stateMtx.Lock()
151✔
179
        if n.state != nodeLinkStateDisabled {
196✔
180
                n.state = nodeLinkStateDisabled
45✔
181
                defer n.handler.nodeLinkChangeState(n, nodeLinkStateDisabled)
45✔
182
        }
45✔
183
        n.stateMtx.Unlock()
151✔
184

151✔
185
        if n.bufferTicker != nil {
302✔
186
                n.bufferTicker.Stop()
151✔
187
        }
151✔
188
        if n.keepaliveTicker != nil {
302✔
189
                n.keepaliveTicker.Stop()
151✔
190
        }
151✔
191

192
        n.cancel()
151✔
193
        return n.webrtc.disconnect()
151✔
194
}
195

196
func (n *nodeLink) getLinkState() nodeLinkState {
380✔
197
        n.stateMtx.RLock()
380✔
198
        defer n.stateMtx.RUnlock()
380✔
199
        return n.state
380✔
200
}
380✔
201

202
func (n *nodeLink) sendPacket(packet *networkTypes.Packet) error {
534✔
203
        content, err := proto3.Marshal(packet.Content)
534✔
204
        if err != nil {
534✔
205
                return fmt.Errorf("failed to marshal packet content %w", err)
×
206
        }
×
207

208
        waiting := &waiting{
534✔
209
                packet:  packet,
534✔
210
                content: content,
534✔
211
        }
534✔
212

534✔
213
        // send packet immediately if bufferInterval is 0
534✔
214
        if n.config.BufferInterval == 0 {
544✔
215
                n.queueMtx.Lock()
10✔
216
                n.queue = append(n.queue, waiting)
10✔
217
                n.queueMtx.Unlock()
10✔
218
                return n.flush()
10✔
219
        }
10✔
220

221
        // push packet to queue
222
        n.queueMtx.Lock()
524✔
223
        n.queue = append(n.queue, waiting)
524✔
224
        sum := 0
524✔
225
        count := len(n.queue)
524✔
226
        for _, w := range n.queue {
20,132✔
227
                sum += len(w.content)
19,608✔
228
        }
19,608✔
229
        n.queueMtx.Unlock()
524✔
230

524✔
231
        // send packet if passed bufferInterval or buffer size is over packetBaseBytes
524✔
232
        if sum > n.config.PacketBaseBytes {
533✔
233
                return n.flush()
9✔
234
        }
9✔
235

236
        if count == 1 {
532✔
237
                n.bufferTicker.Reset(n.config.BufferInterval)
17✔
238
        }
17✔
239

240
        return nil
515✔
241
}
242

243
func (n *nodeLink) flush() error {
131✔
244
        switch n.getLinkState() {
131✔
245
        case nodeLinkStateConnecting:
6✔
246
                return nil
6✔
247

248
        case nodeLinkStateDisabled:
×
249
                n.queueMtx.Lock()
×
250
                defer n.queueMtx.Unlock()
×
251
                n.queue = nil
×
252
                n.config.logger.Warn("link is disabled when flushing packet")
×
253
                return nil
×
254
        }
255

256
        n.queueMtx.Lock()
125✔
257
        queue := n.queue
125✔
258
        n.queue = make([]*waiting, 0)
125✔
259
        n.queueMtx.Unlock()
125✔
260

125✔
261
        n.flushMtx.Lock()
125✔
262
        defer n.flushMtx.Unlock()
125✔
263

125✔
264
        p := &proto.NodePackets{}
125✔
265
        contentSize := 0
125✔
266
        for _, w := range queue {
659✔
267
                if contentSize+len(w.content) > n.config.PacketBaseBytes {
543✔
268
                        count, r := bits.Div(0, uint(len(w.content)+contentSize), uint(n.config.PacketBaseBytes))
9✔
269
                        if r != 0 {
18✔
270
                                count++
9✔
271
                        }
9✔
272
                        send := 0
9✔
273
                        for i := int(count - 1); i >= 0; i-- {
36✔
274
                                size := len(w.content) - send
27✔
275
                                if size+contentSize > n.config.PacketBaseBytes {
45✔
276
                                        size = n.config.PacketBaseBytes - contentSize
18✔
277
                                }
18✔
278
                                if i == 0 {
36✔
279
                                        p.Packets = append(p.Packets, &proto.NodePacket{
9✔
280
                                                Head: &proto.NodePacketHead{
9✔
281
                                                        DstNodeId: w.packet.DstNodeID.Proto(),
9✔
282
                                                        SrcNodeId: w.packet.SrcNodeID.Proto(),
9✔
283
                                                        HopCount:  w.packet.HopCount,
9✔
284
                                                        Mode:      uint32(w.packet.Mode),
9✔
285
                                                },
9✔
286
                                                Id:      w.packet.ID,
9✔
287
                                                Index:   uint32(i),
9✔
288
                                                Content: w.content[send : send+size],
9✔
289
                                        })
9✔
290
                                        contentSize += size
9✔
291
                                } else {
27✔
292
                                        p.Packets = append(p.Packets, &proto.NodePacket{
18✔
293
                                                Id:      w.packet.ID,
18✔
294
                                                Index:   uint32(i),
18✔
295
                                                Content: w.content[send : send+size],
18✔
296
                                        })
18✔
297
                                        send += size
18✔
298
                                        if !n.send(p) {
18✔
299
                                                return nil
×
300
                                        }
×
301
                                        p = &proto.NodePackets{}
18✔
302
                                        contentSize = 0
18✔
303
                                }
304
                        }
305

306
                } else {
525✔
307
                        p.Packets = append(p.Packets, &proto.NodePacket{
525✔
308
                                Head: &proto.NodePacketHead{
525✔
309
                                        DstNodeId: w.packet.DstNodeID.Proto(),
525✔
310
                                        SrcNodeId: w.packet.SrcNodeID.Proto(),
525✔
311
                                        HopCount:  w.packet.HopCount,
525✔
312
                                        Mode:      uint32(w.packet.Mode),
525✔
313
                                },
525✔
314
                                Id:      w.packet.ID,
525✔
315
                                Index:   0,
525✔
316
                                Content: w.content,
525✔
317
                        })
525✔
318
                        contentSize += len(w.content)
525✔
319
                }
525✔
320
        }
321
        if len(p.Packets) != 0 {
154✔
322
                if !n.send(p) {
29✔
323
                        return nil
×
324
                }
×
325
        }
326

327
        if n.config.KeepaliveInterval != 0 {
250✔
328
                n.keepaliveTicker.Reset(n.config.KeepaliveInterval)
125✔
329
        }
125✔
330
        // wait next packet idly
331
        if n.config.BufferInterval > 0 {
239✔
332
                n.bufferTicker.Reset(1 * time.Second)
114✔
333
        }
114✔
334

335
        return nil
125✔
336
}
337

338
func (n *nodeLink) routine() {
51✔
339
        // ticker to check keepalive timeout
51✔
340
        interval := n.config.KeepaliveInterval / 2
51✔
341
        if interval > 1*time.Second {
57✔
342
                interval = 1 * time.Second
6✔
343
        }
6✔
344
        ticker := time.NewTicker(interval)
51✔
345

51✔
346
        for {
331✔
347
                select {
280✔
348
                case <-n.ctx.Done():
51✔
349
                        n.disconnect()
51✔
350
                        return
51✔
351

352
                case <-n.keepaliveTicker.C:
49✔
353
                        n.queueMtx.Lock()
49✔
354
                        count := len(n.queue)
49✔
355
                        n.queueMtx.Unlock()
49✔
356
                        if count != 0 {
49✔
357
                                err := n.flush()
×
358
                                if err != nil {
×
359
                                        n.config.logger.Warn("failed to flush packet", slog.String("err", err.Error()))
×
360
                                        n.disconnect()
×
361
                                }
×
362
                        } else {
49✔
363
                                n.sendKeepalive()
49✔
364
                        }
49✔
365

366
                case <-n.bufferTicker.C:
63✔
367
                        err := n.flush()
63✔
368
                        if err != nil {
63✔
369
                                n.config.logger.Warn("failed to flush packet", slog.String("err", err.Error()))
×
370
                                n.disconnect()
×
371
                        }
×
372

373
                case <-ticker.C:
117✔
374
                        // wake up to check keepalive timeout
375
                }
376

377
                n.stateMtx.RLock()
229✔
378
                timedOut := time.Now().After(n.keepaliveTimestamp.Add(n.config.SessionTimeout))
229✔
379
                n.stateMtx.RUnlock()
229✔
380
                if timedOut {
231✔
381
                        n.disconnect()
2✔
382
                }
2✔
383
        }
384
}
385

386
func (n *nodeLink) sendKeepalive() {
49✔
387
        if n.getLinkState() != nodeLinkStateOnline {
54✔
388
                return
5✔
389
        }
5✔
390

391
        if n.send(&proto.NodePackets{}) {
88✔
392
                n.keepaliveTicker.Reset(n.config.KeepaliveInterval)
44✔
393
        }
44✔
394
}
395

396
func (n *nodeLink) send(packet *proto.NodePackets) bool {
91✔
397
        data, err := proto3.Marshal(packet)
91✔
398
        if err != nil {
91✔
399
                panic(err)
×
400
        }
401

402
        err = n.webrtc.send(data)
91✔
403
        if err != nil {
91✔
404
                n.config.logger.Warn("failed to send packet", slog.String("err", err.Error()))
×
405
                err = n.disconnect()
×
406
                if err != nil {
×
407
                        n.config.logger.Warn("failed to disconnect", slog.String("err", err.Error()))
×
408
                }
×
409
                return false
×
410
        }
411
        return true
91✔
412
}
413

UNCOV
414
func (n *nodeLink) webrtcRaiseError(err string) {
×
UNCOV
415
        n.config.logger.Warn("webrtc error", slog.String("err", err))
×
UNCOV
416
        n.disconnect()
×
UNCOV
417
}
×
418

419
func (n *nodeLink) webrtcChangeLinkState(active, online bool) {
143✔
420
        n.stateMtx.Lock()
143✔
421
        prevState := n.state
143✔
422
        if active {
192✔
423
                if online {
98✔
424
                        n.state = nodeLinkStateOnline
49✔
425
                } else {
49✔
426
                        n.state = nodeLinkStateConnecting
×
427
                }
×
428
        } else {
94✔
429
                n.state = nodeLinkStateDisabled
94✔
430
        }
94✔
431

432
        if prevState != n.state {
198✔
433
                defer func(s nodeLinkState) {
110✔
434
                        n.handler.nodeLinkChangeState(n, s)
55✔
435
                        switch s {
55✔
436
                        case nodeLinkStateOnline:
49✔
437
                                err := n.flush()
49✔
438
                                if err != nil {
49✔
439
                                        n.config.logger.Warn("failed to flush packet", slog.String("err", err.Error()))
×
440
                                        n.disconnect()
×
441
                                }
×
442

443
                        case nodeLinkStateDisabled:
6✔
444
                                n.disconnect()
6✔
445
                        }
446
                }(n.state)
447
        }
448
        n.stateMtx.Unlock()
143✔
449
}
450

451
func (n *nodeLink) webrtcUpdateICE(ice string) {
100✔
452
        n.handler.nodeLinkUpdateICE(n, ice)
100✔
453
}
100✔
454

455
func (n *nodeLink) webrtcRecvData(data []byte) {
86✔
456
        p := &proto.NodePackets{}
86✔
457
        err := proto3.Unmarshal(data, p)
86✔
458
        if err != nil {
86✔
459
                n.config.logger.Warn("failed to unmarshal packet", slog.String("err", err.Error()))
×
460
                n.disconnect()
×
461
                return
×
462
        }
×
463

464
        n.stackMtx.Lock()
86✔
465
        defer n.stackMtx.Unlock()
86✔
466

86✔
467
        for _, packet := range p.Packets {
638✔
468
                if n.stack != nil && n.stackID != packet.Id {
552✔
469
                        n.config.logger.Warn("received packet id is not continuous")
×
470
                        n.disconnect()
×
471
                        return
×
472
                }
×
473

474
                var packets []*proto.NodePacket
552✔
475
                if packet.Index != 0 {
570✔
476
                        if n.stack == nil {
27✔
477
                                n.stack = []*proto.NodePacket{packet}
9✔
478
                                n.stackID = packet.Id
9✔
479
                        } else {
18✔
480
                                n.stack = append(n.stack, packet)
9✔
481
                        }
9✔
482
                        continue
18✔
483
                } else if n.stack != nil {
543✔
484
                        packets = append(n.stack, packet)
9✔
485
                        n.stack = nil
9✔
486
                } else {
534✔
487
                        packets = []*proto.NodePacket{packet}
525✔
488
                }
525✔
489

490
                contentsList := make([][]byte, 0)
534✔
491
                var head *proto.NodePacketHead
534✔
492
                for i, p := range packets {
1,086✔
493
                        // check packet format
552✔
494
                        if p.Index != uint32(len(packets)-i-1) {
552✔
495
                                n.config.logger.Warn("stacked packet index is not continuous")
×
496
                                n.disconnect()
×
497
                                return
×
498
                        }
×
499
                        if (p.Index == 0 && p.Head == nil) || (p.Index != 0 && p.Head != nil) {
552✔
500
                                n.config.logger.Warn("packet head is not set correctly")
×
501
                                n.disconnect()
×
502
                                return
×
503
                        }
×
504

505
                        contentsList = append(contentsList, p.Content)
552✔
506
                        if p.Index == 0 {
1,086✔
507
                                head = p.Head
534✔
508
                        }
534✔
509
                }
510

511
                content := &proto.PacketContent{}
534✔
512
                var contentBin []byte
534✔
513
                if len(contentsList) == 1 {
1,059✔
514
                        contentBin = contentsList[0]
525✔
515
                } else {
534✔
516
                        contentBin = bytes.Join(contentsList, []byte{})
9✔
517
                }
9✔
518
                err := proto3.Unmarshal(contentBin, content)
534✔
519
                if err != nil {
534✔
520
                        n.config.logger.Warn("failed to unmarshal packet content", slog.String("err", err.Error()))
×
521
                        n.disconnect()
×
522
                        return
×
523
                }
×
524
                dstNodeID, err := types.NewNodeIDFromProto(head.DstNodeId)
534✔
525
                if err != nil {
534✔
526
                        n.config.logger.Warn("failed to unmarshal destination node id", slog.String("err", err.Error()))
×
527
                        n.disconnect()
×
528
                        return
×
529
                }
×
530
                srcNodeID, err := types.NewNodeIDFromProto(head.SrcNodeId)
534✔
531
                if err != nil {
534✔
532
                        n.config.logger.Warn("failed to unmarshal source node id", slog.String("err", err.Error()))
×
533
                        n.disconnect()
×
534
                        return
×
535
                }
×
536
                packet := &networkTypes.Packet{
534✔
537
                        DstNodeID: dstNodeID,
534✔
538
                        SrcNodeID: srcNodeID,
534✔
539
                        ID:        packet.Id,
534✔
540
                        HopCount:  head.HopCount,
534✔
541
                        Mode:      networkTypes.PacketMode(head.Mode),
534✔
542
                        Content:   content,
534✔
543
                }
534✔
544
                n.handler.nodeLinkRecvPacket(n, packet)
534✔
545
        }
546

547
        n.stateMtx.Lock()
86✔
548
        n.keepaliveTimestamp = time.Now()
86✔
549
        n.stateMtx.Unlock()
86✔
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc