• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.74
/internal/network/node_accessor/node_accessor.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package node_accessor
17

18
import (
19
        "context"
20
        "errors"
21
        "fmt"
22
        "log/slog"
23
        "math/rand"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/google/uuid"
29
        "github.com/llamerada-jp/colonio/internal/network/signal"
30
        "github.com/llamerada-jp/colonio/internal/shared"
31
)
32

33
const (
34
        defaultConnectionTimeout = 30 * time.Second
35
        nextConnectionInterval   = 180 * time.Second
36
)
37

38
type Handler interface {
39
        NodeAccessorRecvPacket(*shared.NodeID, *shared.Packet)
40
        NodeAccessorChangeConnections(map[shared.NodeID]struct{})
41
        NodeAccessorSendSignalOffer(*shared.NodeID, *signal.Offer) error
42
        NodeAccessorSendSignalAnswer(*shared.NodeID, *signal.Answer) error
43
        NodeAccessorSendSignalICE(*shared.NodeID, *signal.ICE) error
44
}
45

46
type Config struct {
47
        Logger         *slog.Logger
48
        Handler        Handler
49
        NodeLinkConfig *NodeLinkConfig
50

51
        // config parameters for testing
52
        connectionTimeout time.Duration
53
}
54

55
type NodeAccessor struct {
56
        logger                  *slog.Logger
57
        handler                 Handler
58
        nodeLinkConfig          *NodeLinkConfig
59
        localNodeID             *shared.NodeID
60
        connectionTimeout       time.Duration
61
        nextConnectionTimestamp time.Time
62

63
        mtx sync.RWMutex
64
        // isAlone is from seedAccessor
65
        isAlone bool
66

67
        // a set of connected links
68
        nodeID2link map[shared.NodeID]*nodeLink
69
        link2nodeID map[*nodeLink]*shared.NodeID
70
        // a set of connecting link states
71
        offerID2state map[uint32]*connectingState
72
        link2offerID  map[*nodeLink]uint32
73
}
74

75
type connectingState struct {
76
        isPrime           bool
77
        icesMtx           sync.Mutex
78
        ices              []string
79
        link              *nodeLink
80
        nodeID            *shared.NodeID
81
        creationTimestamp time.Time
82
}
83

84
func NewNodeAccessor(config *Config) (*NodeAccessor, error) {
19✔
85
        ct := defaultConnectionTimeout
19✔
86
        if config.connectionTimeout != 0 {
20✔
87
                ct = config.connectionTimeout
1✔
88
        }
1✔
89

90
        nlc := *config.NodeLinkConfig
19✔
91
        nlc.logger = config.Logger
19✔
92
        na := &NodeAccessor{
19✔
93
                logger:            config.Logger,
19✔
94
                handler:           config.Handler,
19✔
95
                nodeLinkConfig:    &nlc,
19✔
96
                connectionTimeout: ct,
19✔
97
                isAlone:           true,
19✔
98
                nodeID2link:       make(map[shared.NodeID]*nodeLink),
19✔
99
                link2nodeID:       make(map[*nodeLink]*shared.NodeID),
19✔
100
                offerID2state:     make(map[uint32]*connectingState),
19✔
101
                link2offerID:      make(map[*nodeLink]uint32),
19✔
102
        }
19✔
103

19✔
104
        return na, nil
19✔
105
}
106

107
func (na *NodeAccessor) Start(ctx context.Context, localNodeID *shared.NodeID) {
19✔
108
        na.localNodeID = localNodeID
19✔
109
        na.nodeLinkConfig.ctx = ctx
19✔
110

19✔
111
        go func() {
38✔
112
                ticker := time.NewTicker(1 * time.Second)
19✔
113
                defer ticker.Stop()
19✔
114

19✔
115
                for {
105✔
116
                        select {
86✔
117
                        case <-ctx.Done():
19✔
118
                                na.mtx.Lock()
19✔
119
                                defer na.mtx.Unlock()
19✔
120
                                for link := range na.link2nodeID {
55✔
121
                                        na.disconnectLink(link, false)
36✔
122
                                }
36✔
123
                                return
19✔
124

125
                        case <-ticker.C:
67✔
126
                                na.subRoutine()
67✔
127
                        }
128
                }
129
        }()
130
}
131

132
func (na *NodeAccessor) SetBeAlone(isAlone bool) {
21✔
133
        na.mtx.Lock()
21✔
134
        defer na.mtx.Unlock()
21✔
135
        na.isAlone = isAlone
21✔
136
}
21✔
137

138
func (na *NodeAccessor) subRoutine() {
67✔
139
        na.mtx.Lock()
67✔
140
        defer na.mtx.Unlock()
67✔
141

67✔
142
        if err := na.houseKeeping(); err != nil {
67✔
143
                na.logger.Warn("failed to house keeping", slog.String("error", err.Error()))
×
144
        }
×
145

146
        // do not connect if any other node does not exist
147
        if na.isAlone {
99✔
148
                return
32✔
149
        }
32✔
150

151
        // try to connect to first node
152
        if len(na.link2nodeID) == 0 && len(na.offerID2state) == 0 {
54✔
153
                if err := na.connect(signal.OfferTypeNext, na.localNodeID); err != nil {
19✔
154
                        na.logger.Warn("failed to connect first link", slog.String("error", err.Error()))
×
155
                }
×
156
        }
157

158
        // try to connect to next node for redundancy
159
        if len(na.link2nodeID) != 0 && time.Now().After(na.nextConnectionTimestamp.Add(nextConnectionInterval)) {
39✔
160
                // the first trying will be run only after 10 sec.
4✔
161
                if na.nextConnectionTimestamp.IsZero() {
8✔
162
                        na.nextConnectionTimestamp = na.nextConnectionTimestamp.Add(-10*time.Second - nextConnectionInterval)
4✔
163
                        return
4✔
164
                }
4✔
165
                if err := na.connect(signal.OfferTypeNext, na.localNodeID); err != nil {
×
166
                        na.logger.Warn("failed to connect next link", slog.String("error", err.Error()))
×
167
                }
×
168
        }
169
}
170

171
func (na *NodeAccessor) ConnectLinks(requiredNodeIDs, keepNodeIDs map[shared.NodeID]struct{}) error {
22✔
172
        na.mtx.Lock()
22✔
173
        defer na.mtx.Unlock()
22✔
174

22✔
175
        for nodeID := range requiredNodeIDs {
54✔
176
                if link, ok := na.nodeID2link[nodeID]; ok {
60✔
177
                        if link.getLinkState() == nodeLinkStateDisabled {
28✔
178
                                na.disconnectLink(link, false)
×
179
                        } else {
28✔
180
                                continue
28✔
181
                        }
182
                }
183

184
                skip := false
4✔
185
                for _, state := range na.offerID2state {
6✔
186
                        if state.nodeID.Equal(&nodeID) {
3✔
187
                                skip = true
1✔
188
                                break
1✔
189
                        }
190
                }
191
                if skip {
5✔
192
                        continue
1✔
193
                }
194

195
                err := na.connect(signal.OfferTypeExplicit, &nodeID)
3✔
196
                if err != nil {
3✔
197
                        return err
×
198
                }
×
199
        }
200

201
        for nodeID, link := range na.nodeID2link {
60✔
202
                idToKeep := false
38✔
203

38✔
204
                if _, ok := requiredNodeIDs[nodeID]; ok {
66✔
205
                        idToKeep = true
28✔
206
                } else {
38✔
207
                        if _, ok := keepNodeIDs[nodeID]; ok {
18✔
208
                                idToKeep = true
8✔
209
                        }
8✔
210
                }
211

212
                if !idToKeep {
40✔
213
                        na.disconnectLink(link, false)
2✔
214
                }
2✔
215
        }
216

217
        return nil
22✔
218
}
219

220
func (na *NodeAccessor) SignalingOffer(srcNodeID *shared.NodeID, offer *signal.Offer) {
21✔
221
        na.mtx.Lock()
21✔
222
        defer na.mtx.Unlock()
21✔
223

21✔
224
        offerID := offer.OfferID
21✔
225

21✔
226
        if state, ok := na.offerID2state[offerID]; ok && !state.nodeID.Equal(srcNodeID) {
21✔
227
                na.sendAnswer(srcNodeID, offerID, signal.AnswerStatusReject, "")
×
228
                return
×
229
        }
×
230

231
        link, err := newNodeLink(na.nodeLinkConfig, na, false)
21✔
232
        if err != nil {
21✔
233
                na.logger.Error("failed to create link", slog.String("error", err.Error()))
×
234
                return
×
235
        }
×
236

237
        na.offerID2state[offerID] = &connectingState{
21✔
238
                isPrime:           false,
21✔
239
                ices:              make([]string, 0),
21✔
240
                link:              link,
21✔
241
                nodeID:            srcNodeID.Copy(),
21✔
242
                creationTimestamp: time.Now(),
21✔
243
        }
21✔
244
        na.link2offerID[link] = offerID
21✔
245

21✔
246
        go func() {
42✔
247
                if err := link.setRemoteSDP(offer.Sdp); err != nil {
21✔
248
                        na.logger.Error("failed to set remote SDP", slog.String("error", err.Error()))
×
249
                        na.disconnectLink(link, true)
×
250
                        return
×
251
                }
×
252

253
                sdp, err := link.getLocalSDP()
21✔
254
                if err != nil {
21✔
255
                        na.logger.Error("failed to get local SDP", slog.String("error", err.Error()))
×
256
                        na.disconnectLink(link, true)
×
257
                        return
×
258
                }
×
259

260
                if err = na.sendAnswer(srcNodeID, offerID, signal.AnswerStatusAccept, sdp); err != nil {
21✔
261
                        na.logger.Error("failed to send answer", slog.String("error", err.Error()))
×
262
                        na.disconnectLink(link, true)
×
263
                }
×
264
        }()
265
}
266

267
func (na *NodeAccessor) SignalingAnswer(srcNodeID *shared.NodeID, answer *signal.Answer) {
21✔
268
        na.mtx.Lock()
21✔
269
        defer na.mtx.Unlock()
21✔
270

21✔
271
        offerID := answer.OfferID
21✔
272
        state, ok := na.offerID2state[offerID]
21✔
273
        if !ok {
21✔
274
                return
×
275
        }
×
276

277
        if state.nodeID == nil {
39✔
278
                state.nodeID = srcNodeID.Copy()
18✔
279
        } else if !state.nodeID.Equal(srcNodeID) {
21✔
280
                return
×
281
        }
×
282

283
        if answer.Status == signal.AnswerStatusReject {
21✔
284
                na.disconnectLink(state.link, false)
×
285
                return
×
286
        }
×
287

288
        go func() {
42✔
289
                if err := state.link.setRemoteSDP(answer.Sdp); err != nil {
21✔
290
                        na.logger.Error("failed to set remote SDP", slog.String("error", err.Error()))
×
291
                        return
×
292
                }
×
293

294
                state.icesMtx.Lock()
21✔
295
                defer state.icesMtx.Unlock()
21✔
296
                if len(state.ices) != 0 {
42✔
297
                        na.sendICE(srcNodeID, offerID, state.ices)
21✔
298
                }
21✔
299
                state.ices = nil
21✔
300
        }()
301
}
302

303
func (na *NodeAccessor) SignalingICE(srcNodeID *shared.NodeID, ice *signal.ICE) {
61✔
304
        na.mtx.Lock()
61✔
305
        defer na.mtx.Unlock()
61✔
306

61✔
307
        offerID := ice.OfferID
61✔
308
        state, ok := na.offerID2state[offerID]
61✔
309
        if !ok {
61✔
310
                return
×
311
        }
×
312

313
        go func() {
122✔
314
                for _, i := range ice.Ices {
145✔
315
                        if err := state.link.updateICE(i); err != nil {
84✔
316
                                na.logger.Error("failed to update ICE", slog.String("error", err.Error()))
×
317
                                break
×
318
                        }
319
                }
320

321
                state.icesMtx.Lock()
61✔
322
                defer state.icesMtx.Unlock()
61✔
323
                if len(state.ices) != 0 {
73✔
324
                        na.sendICE(srcNodeID, offerID, state.ices)
12✔
325
                }
12✔
326
                state.ices = nil
61✔
327
        }()
328
}
329

330
func (na *NodeAccessor) connect(ot signal.OfferType, nodeID *shared.NodeID) error {
22✔
331
        if ot == signal.OfferTypeExplicit && na.assignedNodeID(nodeID) {
22✔
332
                return errors.New("node ID is already connected")
×
333
        }
×
334

335
        nlc := *na.nodeLinkConfig
22✔
336
        // make webrtc data channel label unique and ordered
22✔
337
        uuidV7, err := uuid.NewV7()
22✔
338
        if err != nil {
22✔
339
                return fmt.Errorf("failed to create uuid: %w", err)
×
340
        }
×
341
        nlc.label = uuidV7.String()
22✔
342
        link, err := newNodeLink(&nlc, na, true)
22✔
343
        if err != nil {
22✔
344
                return fmt.Errorf("failed to create link: %w", err)
×
345
        }
×
346

347
        state := &connectingState{
22✔
348
                isPrime:           true,
22✔
349
                ices:              make([]string, 0),
22✔
350
                link:              link,
22✔
351
                creationTimestamp: time.Now(),
22✔
352
        }
22✔
353
        if ot == signal.OfferTypeExplicit {
25✔
354
                state.nodeID = nodeID.Copy()
3✔
355
        } else {
22✔
356
                state.nodeID = nil
19✔
357
        }
19✔
358
        // assign unique offerID
359
        offerID := rand.Uint32()
22✔
360
        for _, ok := na.offerID2state[offerID]; ok; {
22✔
361
                offerID = rand.Uint32()
×
362
        }
×
363

364
        na.offerID2state[offerID] = state
22✔
365
        na.link2offerID[link] = offerID
22✔
366

22✔
367
        go func() {
44✔
368
                sdp, err := link.getLocalSDP()
22✔
369
                if err != nil {
22✔
370
                        na.logger.Error("failed to get local SDP", slog.String("error", err.Error()))
×
371
                        na.disconnectLink(link, true)
×
372
                        return
×
373
                }
×
374

375
                err = na.handler.NodeAccessorSendSignalOffer(nodeID, &signal.Offer{
22✔
376
                        OfferID:   offerID,
22✔
377
                        OfferType: ot,
22✔
378
                        Sdp:       sdp,
22✔
379
                })
22✔
380
                if err != nil {
22✔
381
                        na.logger.Error("failed to send offer", slog.String("error", err.Error()))
×
382
                        na.disconnectLink(link, true)
×
383
                }
×
384
        }()
385

386
        return nil
22✔
387
}
388

389
func (na *NodeAccessor) assignedNodeID(nodeID *shared.NodeID) bool {
3✔
390
        if _, ok := na.nodeID2link[*nodeID]; ok {
3✔
391
                return true
×
392
        }
×
393

394
        for _, state := range na.offerID2state {
4✔
395
                if state.nodeID.Equal(nodeID) {
1✔
396
                        return true
×
397
                }
×
398
        }
399

400
        return false
3✔
401
}
402

403
func (na *NodeAccessor) disconnectLink(link *nodeLink, lock bool) {
84✔
404
        if err := link.disconnect(); err != nil {
84✔
405
                na.logger.Error("failed to disconnect link", slog.String("error", err.Error()))
×
406
        }
×
407

408
        if lock {
84✔
409
                na.mtx.Lock()
×
410
                defer na.mtx.Unlock()
×
411
        }
×
412

413
        if nodeID := na.link2nodeID[link]; nodeID != nil {
124✔
414
                delete(na.nodeID2link, *nodeID)
40✔
415
                delete(na.link2nodeID, link)
40✔
416
                na.callChangeConnections()
40✔
417
        }
40✔
418

419
        if offerID, ok := na.link2offerID[link]; ok {
85✔
420
                delete(na.offerID2state, offerID)
1✔
421
                delete(na.link2offerID, link)
1✔
422
        }
1✔
423
}
424

425
func (na *NodeAccessor) houseKeeping() error {
67✔
426
        for link := range na.link2nodeID {
107✔
427
                if link.getLinkState() == nodeLinkStateDisabled {
40✔
428
                        na.disconnectLink(link, false)
×
429
                }
×
430
        }
431

432
        for _, state := range na.offerID2state {
69✔
433
                if time.Now().After(state.creationTimestamp.Add(na.connectionTimeout)) {
3✔
434
                        na.disconnectLink(state.link, false)
1✔
435
                }
1✔
436
        }
437

438
        return nil
67✔
439
}
440

441
func (na *NodeAccessor) IsOnline() bool {
77✔
442
        na.mtx.RLock()
77✔
443
        defer na.mtx.RUnlock()
77✔
444
        for _, link := range na.nodeID2link {
102✔
445
                if link.getLinkState() == nodeLinkStateOnline {
50✔
446
                        return true
25✔
447
                }
25✔
448
        }
449
        return false
52✔
450
}
451

452
func (na *NodeAccessor) RelayPacket(dstNodeID *shared.NodeID, packet *shared.Packet) error {
7✔
453
        if !na.IsOnline() {
7✔
454
                return errors.New("node accessor should be online before relaying packet")
×
455
        }
×
456

457
        na.mtx.RLock()
7✔
458
        defer na.mtx.RUnlock()
7✔
459

7✔
460
        if dstNodeID.Equal(&shared.NodeNeighborhoods) {
7✔
461
                if (packet.Mode & shared.PacketModeOneWay) == shared.PacketModeNone {
×
462
                        return errors.New("packet mode should be one way when multicast")
×
463
                }
×
464
                for nodeID, link := range na.nodeID2link {
×
465
                        copy := *packet
×
466
                        copy.DstNodeID = &nodeID
×
467
                        if link.getLinkState() == nodeLinkStateOnline {
×
468
                                if err := link.sendPacket(&copy); err != nil {
×
469
                                        return errors.New("failed to send packet")
×
470
                                }
×
471
                        }
472
                }
473

474
        } else {
7✔
475
                link, ok := na.nodeID2link[*dstNodeID]
7✔
476
                if !ok || link.getLinkState() != nodeLinkStateOnline {
7✔
477
                        return errors.New("link is not online")
×
478
                }
×
479
                if err := link.sendPacket(packet); err != nil {
7✔
480
                        return errors.New("failed to send packet")
×
481
                }
×
482
        }
483

484
        return nil
7✔
485
}
486

487
func (na *NodeAccessor) sendAnswer(dstNodeID *shared.NodeID, offerID uint32, status signal.AnswerStatus, sdp string) error {
21✔
488
        return na.handler.NodeAccessorSendSignalAnswer(dstNodeID, &signal.Answer{
21✔
489
                OfferID: offerID,
21✔
490
                Status:  status,
21✔
491
                Sdp:     sdp,
21✔
492
        })
21✔
493
}
21✔
494

495
func (na *NodeAccessor) sendICE(dstNodeID *shared.NodeID, offerID uint32, ices []string) error {
61✔
496
        return na.handler.NodeAccessorSendSignalICE(dstNodeID, &signal.ICE{
61✔
497
                OfferID: offerID,
61✔
498
                Ices:    ices,
61✔
499
        })
61✔
500
}
61✔
501

502
func (na *NodeAccessor) nodeLinkChangeState(link *nodeLink, linkState nodeLinkState) {
85✔
503
        go func() {
170✔
504
                na.mtx.Lock()
85✔
505
                defer na.mtx.Unlock()
85✔
506

85✔
507
                if linkState == nodeLinkStateOnline {
127✔
508
                        offerID, ok := na.link2offerID[link]
42✔
509
                        if ok {
84✔
510
                                state := na.offerID2state[offerID]
42✔
511
                                delete(na.offerID2state, offerID)
42✔
512
                                delete(na.link2offerID, link)
42✔
513
                                // There is a possibility that both nodes try to connect at the same time.
42✔
514
                                // Depending on the timing, links created by each other may interfere with each other,
42✔
515
                                // causing link creation to fail. If there are links to the same node,
42✔
516
                                // the link with the larger data channel label is retained.
42✔
517
                                if existingLink, ok := na.nodeID2link[*state.nodeID]; ok {
44✔
518
                                        if existingLink.getLinkState() != nodeLinkStateOnline ||
2✔
519
                                                strings.Compare(existingLink.getLabel(), link.getLabel()) > 0 {
2✔
UNCOV
520
                                                // disconnect existing link and keep new link
×
UNCOV
521
                                                na.disconnectLink(existingLink, false)
×
UNCOV
522
                                                na.nodeID2link[*state.nodeID] = link
×
UNCOV
523
                                                na.link2nodeID[link] = state.nodeID.Copy()
×
524
                                        } else {
2✔
525
                                                if strings.Compare(existingLink.getLabel(), link.getLabel()) == 0 {
2✔
526
                                                        panic(fmt.Sprintf("data channel label should be unique, %d, %s", offerID, existingLink.getLabel()))
×
527
                                                }
528
                                                // disconnect new link and keep existing link
529
                                                na.disconnectLink(link, false)
2✔
530
                                        }
531
                                } else {
40✔
532
                                        na.nodeID2link[*state.nodeID] = link
40✔
533
                                        na.link2nodeID[link] = state.nodeID.Copy()
40✔
534
                                }
40✔
535
                                na.callChangeConnections()
42✔
536
                        }
537
                }
538

539
                if linkState == nodeLinkStateDisabled {
128✔
540
                        na.disconnectLink(link, false)
43✔
541
                }
43✔
542
        }()
543
}
544

545
func (na *NodeAccessor) nodeLinkUpdateICE(link *nodeLink, ice string) {
86✔
546
        go func() {
172✔
547
                na.mtx.Lock()
86✔
548
                defer na.mtx.Unlock()
86✔
549

86✔
550
                offerID, ok := na.link2offerID[link]
86✔
551
                if !ok {
86✔
552
                        return
×
553
                }
×
554

555
                state := na.offerID2state[offerID]
86✔
556
                state.icesMtx.Lock()
86✔
557
                defer state.icesMtx.Unlock()
86✔
558
                if state.ices == nil {
114✔
559
                        na.sendICE(state.nodeID, offerID, []string{ice})
28✔
560
                } else {
86✔
561
                        state.ices = append(state.ices, ice)
58✔
562
                }
58✔
563
        }()
564
}
565

566
func (na *NodeAccessor) nodeLinkRecvPacket(link *nodeLink, packet *shared.Packet) {
7✔
567
        na.mtx.RLock()
7✔
568
        nodeID := na.link2nodeID[link]
7✔
569
        na.mtx.RUnlock()
7✔
570
        na.handler.NodeAccessorRecvPacket(nodeID, packet)
7✔
571
}
7✔
572

573
func (na *NodeAccessor) callChangeConnections() {
82✔
574
        connections := make(map[shared.NodeID]struct{})
82✔
575
        for nodeID := range na.nodeID2link {
174✔
576
                connections[nodeID] = struct{}{}
92✔
577
        }
92✔
578
        na.handler.NodeAccessorChangeConnections(connections)
82✔
579
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc