• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 24928251196

25 Apr 2026 09:54AM UTC coverage: 64.66% (-0.1%) from 64.788%
24928251196

push

github

llamerada-jp
fix datarace on ci

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>

2675 of 4137 relevant lines covered (64.66%)

43.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.45
/node/internal/network/node_accessor/node_accessor.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package node_accessor
17

18
import (
19
        "context"
20
        "errors"
21
        "fmt"
22
        "log/slog"
23
        "math/rand"
24
        "strings"
25
        "sync"
26
        "time"
27

28
        "github.com/google/uuid"
29
        "github.com/llamerada-jp/colonio/node/internal/network/signal"
30
        "github.com/llamerada-jp/colonio/types"
31
        networkTypes "github.com/llamerada-jp/colonio/types/network"
32
)
33

34
const (
35
        defaultConnectionTimeout = 30 * time.Second
36
        nextConnectionInterval   = 180 * time.Second
37
)
38

39
type Handler interface {
40
        NodeAccessorRecvPacket(*types.NodeID, *networkTypes.Packet)
41
        NodeAccessorChangeConnections(map[types.NodeID]struct{})
42
        NodeAccessorSendSignalOffer(*types.NodeID, *signal.Offer) error
43
        NodeAccessorSendSignalAnswer(*types.NodeID, *signal.Answer) error
44
        NodeAccessorSendSignalICE(*types.NodeID, *signal.ICE) error
45
}
46

47
type Config struct {
48
        Logger         *slog.Logger
49
        Handler        Handler
50
        NodeLinkConfig *NodeLinkConfig
51

52
        // config parameters for testing
53
        connectionTimeout time.Duration
54
}
55

56
type NodeAccessor struct {
57
        logger                  *slog.Logger
58
        handler                 Handler
59
        nodeLinkConfig          *NodeLinkConfig
60
        localNodeID             *types.NodeID
61
        connectionTimeout       time.Duration
62
        nextConnectionTimestamp time.Time
63

64
        mtx sync.RWMutex
65
        // isAlone is from seedAccessor
66
        isAlone bool
67

68
        // a set of connected links
69
        nodeID2link map[types.NodeID]*nodeLink
70
        link2nodeID map[*nodeLink]*types.NodeID
71
        // a set of connecting link states
72
        offerID2state map[uint32]*connectingState
73
        link2offerID  map[*nodeLink]uint32
74
}
75

76
type connectingState struct {
77
        isPrime           bool
78
        icesMtx           sync.Mutex
79
        ices              []string
80
        link              *nodeLink
81
        nodeID            *types.NodeID
82
        creationTimestamp time.Time
83
}
84

85
func NewNodeAccessor(config *Config) (*NodeAccessor, error) {
19✔
86
        ct := defaultConnectionTimeout
19✔
87
        if config.connectionTimeout != 0 {
20✔
88
                ct = config.connectionTimeout
1✔
89
        }
1✔
90

91
        nlc := *config.NodeLinkConfig
19✔
92
        nlc.logger = config.Logger
19✔
93
        na := &NodeAccessor{
19✔
94
                logger:            config.Logger,
19✔
95
                handler:           config.Handler,
19✔
96
                nodeLinkConfig:    &nlc,
19✔
97
                connectionTimeout: ct,
19✔
98
                isAlone:           true,
19✔
99
                nodeID2link:       make(map[types.NodeID]*nodeLink),
19✔
100
                link2nodeID:       make(map[*nodeLink]*types.NodeID),
19✔
101
                offerID2state:     make(map[uint32]*connectingState),
19✔
102
                link2offerID:      make(map[*nodeLink]uint32),
19✔
103
        }
19✔
104

19✔
105
        return na, nil
19✔
106
}
107

108
func (na *NodeAccessor) Start(ctx context.Context, localNodeID *types.NodeID) {
19✔
109
        na.localNodeID = localNodeID
19✔
110
        na.nodeLinkConfig.ctx = ctx
19✔
111

19✔
112
        go func() {
38✔
113
                ticker := time.NewTicker(1 * time.Second)
19✔
114
                defer ticker.Stop()
19✔
115

19✔
116
                for {
105✔
117
                        select {
86✔
118
                        case <-ctx.Done():
19✔
119
                                na.mtx.Lock()
19✔
120
                                defer na.mtx.Unlock()
19✔
121
                                for link := range na.link2nodeID {
55✔
122
                                        na.disconnectLink(link, false)
36✔
123
                                }
36✔
124
                                return
19✔
125

126
                        case <-ticker.C:
67✔
127
                                na.subRoutine()
67✔
128
                        }
129
                }
130
        }()
131
}
132

133
func (na *NodeAccessor) SetBeAlone(isAlone bool) {
21✔
134
        na.mtx.Lock()
21✔
135
        defer na.mtx.Unlock()
21✔
136
        na.isAlone = isAlone
21✔
137
}
21✔
138

139
func (na *NodeAccessor) subRoutine() {
67✔
140
        na.mtx.Lock()
67✔
141
        defer na.mtx.Unlock()
67✔
142

67✔
143
        if err := na.houseKeeping(); err != nil {
67✔
144
                na.logger.Warn("failed to house keeping", slog.String("error", err.Error()))
×
145
        }
×
146

147
        // do not connect if any other node does not exist
148
        if na.isAlone {
99✔
149
                return
32✔
150
        }
32✔
151

152
        // try to connect to first node
153
        if len(na.link2nodeID) == 0 && len(na.offerID2state) == 0 {
54✔
154
                if err := na.connect(signal.OfferTypeNext, na.localNodeID); err != nil {
19✔
155
                        na.logger.Warn("failed to connect first link", slog.String("error", err.Error()))
×
156
                }
×
157
        }
158

159
        // try to connect to next node for redundancy
160
        if len(na.link2nodeID) != 0 && time.Now().After(na.nextConnectionTimestamp.Add(nextConnectionInterval)) {
39✔
161
                // the first trying will be run only after 10 sec.
4✔
162
                if na.nextConnectionTimestamp.IsZero() {
8✔
163
                        na.nextConnectionTimestamp = na.nextConnectionTimestamp.Add(-10*time.Second - nextConnectionInterval)
4✔
164
                        return
4✔
165
                }
4✔
166
                if err := na.connect(signal.OfferTypeNext, na.localNodeID); err != nil {
×
167
                        na.logger.Warn("failed to connect next link", slog.String("error", err.Error()))
×
168
                }
×
169
        }
170
}
171

172
func (na *NodeAccessor) ConnectLinks(requiredNodeIDs, keepNodeIDs map[types.NodeID]struct{}) error {
22✔
173
        na.mtx.Lock()
22✔
174
        defer na.mtx.Unlock()
22✔
175

22✔
176
        for nodeID := range requiredNodeIDs {
54✔
177
                if link, ok := na.nodeID2link[nodeID]; ok {
61✔
178
                        if link.getLinkState() == nodeLinkStateDisabled {
29✔
179
                                na.disconnectLink(link, false)
×
180
                        } else {
29✔
181
                                continue
29✔
182
                        }
183
                }
184

185
                skip := false
3✔
186
                for _, state := range na.offerID2state {
3✔
187
                        if state.nodeID.Equal(&nodeID) {
×
188
                                skip = true
×
189
                                break
×
190
                        }
191
                }
192
                if skip {
3✔
193
                        continue
×
194
                }
195

196
                err := na.connect(signal.OfferTypeExplicit, &nodeID)
3✔
197
                if err != nil {
3✔
198
                        return err
×
199
                }
×
200
        }
201

202
        for nodeID, link := range na.nodeID2link {
63✔
203
                idToKeep := false
41✔
204

41✔
205
                if _, ok := requiredNodeIDs[nodeID]; ok {
70✔
206
                        idToKeep = true
29✔
207
                } else {
41✔
208
                        if _, ok := keepNodeIDs[nodeID]; ok {
22✔
209
                                idToKeep = true
10✔
210
                        }
10✔
211
                }
212

213
                if !idToKeep {
43✔
214
                        na.disconnectLink(link, false)
2✔
215
                }
2✔
216
        }
217

218
        return nil
22✔
219
}
220

221
func (na *NodeAccessor) SignalingOffer(srcNodeID *types.NodeID, offer *signal.Offer) {
21✔
222
        na.mtx.Lock()
21✔
223
        defer na.mtx.Unlock()
21✔
224

21✔
225
        offerID := offer.OfferID
21✔
226

21✔
227
        if state, ok := na.offerID2state[offerID]; ok && !state.nodeID.Equal(srcNodeID) {
21✔
228
                na.sendAnswer(srcNodeID, offerID, signal.AnswerStatusReject, "")
×
229
                return
×
230
        }
×
231

232
        link, err := newNodeLink(na.nodeLinkConfig, na, false)
21✔
233
        if err != nil {
21✔
234
                na.logger.Error("failed to create link", slog.String("error", err.Error()))
×
235
                return
×
236
        }
×
237

238
        na.offerID2state[offerID] = &connectingState{
21✔
239
                isPrime:           false,
21✔
240
                ices:              make([]string, 0),
21✔
241
                link:              link,
21✔
242
                nodeID:            srcNodeID.Copy(),
21✔
243
                creationTimestamp: time.Now(),
21✔
244
        }
21✔
245
        na.link2offerID[link] = offerID
21✔
246

21✔
247
        go func() {
42✔
248
                if err := link.setRemoteSDP(offer.Sdp); err != nil {
21✔
249
                        na.logger.Error("failed to set remote SDP", slog.String("error", err.Error()))
×
250
                        na.disconnectLink(link, true)
×
251
                        return
×
252
                }
×
253

254
                sdp, err := link.getLocalSDP()
21✔
255
                if err != nil {
21✔
256
                        na.logger.Error("failed to get local SDP", slog.String("error", err.Error()))
×
257
                        na.disconnectLink(link, true)
×
258
                        return
×
259
                }
×
260

261
                if err = na.sendAnswer(srcNodeID, offerID, signal.AnswerStatusAccept, sdp); err != nil {
21✔
262
                        na.logger.Error("failed to send answer", slog.String("error", err.Error()))
×
263
                        na.disconnectLink(link, true)
×
264
                }
×
265
        }()
266
}
267

268
func (na *NodeAccessor) SignalingAnswer(srcNodeID *types.NodeID, answer *signal.Answer) {
21✔
269
        na.mtx.Lock()
21✔
270
        defer na.mtx.Unlock()
21✔
271

21✔
272
        offerID := answer.OfferID
21✔
273
        state, ok := na.offerID2state[offerID]
21✔
274
        if !ok {
21✔
275
                return
×
276
        }
×
277

278
        if state.nodeID == nil {
39✔
279
                state.nodeID = srcNodeID.Copy()
18✔
280
        } else if !state.nodeID.Equal(srcNodeID) {
21✔
281
                return
×
282
        }
×
283

284
        if answer.Status == signal.AnswerStatusReject {
21✔
285
                na.disconnectLink(state.link, false)
×
286
                return
×
287
        }
×
288

289
        go func() {
42✔
290
                if err := state.link.setRemoteSDP(answer.Sdp); err != nil {
21✔
291
                        na.logger.Error("failed to set remote SDP", slog.String("error", err.Error()))
×
292
                        return
×
293
                }
×
294

295
                state.icesMtx.Lock()
21✔
296
                defer state.icesMtx.Unlock()
21✔
297
                if len(state.ices) != 0 {
41✔
298
                        na.sendICE(srcNodeID, offerID, state.ices)
20✔
299
                }
20✔
300
                state.ices = nil
21✔
301
        }()
302
}
303

304
func (na *NodeAccessor) SignalingICE(srcNodeID *types.NodeID, ice *signal.ICE) {
61✔
305
        na.mtx.Lock()
61✔
306
        defer na.mtx.Unlock()
61✔
307

61✔
308
        offerID := ice.OfferID
61✔
309
        state, ok := na.offerID2state[offerID]
61✔
310
        if !ok {
61✔
311
                return
×
312
        }
×
313

314
        go func() {
122✔
315
                for _, i := range ice.Ices {
145✔
316
                        if err := state.link.updateICE(i); err != nil {
84✔
317
                                na.logger.Error("failed to update ICE", slog.String("error", err.Error()))
×
318
                                break
×
319
                        }
320
                }
321

322
                state.icesMtx.Lock()
61✔
323
                defer state.icesMtx.Unlock()
61✔
324
                if len(state.ices) != 0 {
72✔
325
                        na.sendICE(srcNodeID, offerID, state.ices)
11✔
326
                }
11✔
327
                state.ices = nil
61✔
328
        }()
329
}
330

331
func (na *NodeAccessor) connect(ot signal.OfferType, nodeID *types.NodeID) error {
22✔
332
        if ot == signal.OfferTypeExplicit && na.assignedNodeID(nodeID) {
22✔
333
                return errors.New("node ID is already connected")
×
334
        }
×
335

336
        nlc := *na.nodeLinkConfig
22✔
337
        // make webrtc data channel label unique and ordered
22✔
338
        uuidV7, err := uuid.NewV7()
22✔
339
        if err != nil {
22✔
340
                return fmt.Errorf("failed to create uuid: %w", err)
×
341
        }
×
342
        nlc.label = uuidV7.String()
22✔
343
        link, err := newNodeLink(&nlc, na, true)
22✔
344
        if err != nil {
22✔
345
                return fmt.Errorf("failed to create link: %w", err)
×
346
        }
×
347

348
        state := &connectingState{
22✔
349
                isPrime:           true,
22✔
350
                ices:              make([]string, 0),
22✔
351
                link:              link,
22✔
352
                creationTimestamp: time.Now(),
22✔
353
        }
22✔
354
        if ot == signal.OfferTypeExplicit {
25✔
355
                state.nodeID = nodeID.Copy()
3✔
356
        } else {
22✔
357
                state.nodeID = nil
19✔
358
        }
19✔
359
        // assign unique offerID
360
        offerID := rand.Uint32()
22✔
361
        for _, ok := na.offerID2state[offerID]; ok; {
22✔
362
                offerID = rand.Uint32()
×
363
        }
×
364

365
        na.offerID2state[offerID] = state
22✔
366
        na.link2offerID[link] = offerID
22✔
367

22✔
368
        go func() {
44✔
369
                sdp, err := link.getLocalSDP()
22✔
370
                if err != nil {
22✔
371
                        na.logger.Error("failed to get local SDP", slog.String("error", err.Error()))
×
372
                        na.disconnectLink(link, true)
×
373
                        return
×
374
                }
×
375

376
                err = na.handler.NodeAccessorSendSignalOffer(nodeID, &signal.Offer{
22✔
377
                        OfferID:   offerID,
22✔
378
                        OfferType: ot,
22✔
379
                        Sdp:       sdp,
22✔
380
                })
22✔
381
                if err != nil {
22✔
382
                        na.logger.Error("failed to send offer", slog.String("error", err.Error()))
×
383
                        na.disconnectLink(link, true)
×
384
                }
×
385
        }()
386

387
        return nil
22✔
388
}
389

390
func (na *NodeAccessor) assignedNodeID(nodeID *types.NodeID) bool {
3✔
391
        if _, ok := na.nodeID2link[*nodeID]; ok {
3✔
392
                return true
×
393
        }
×
394

395
        for _, state := range na.offerID2state {
3✔
396
                if state.nodeID.Equal(nodeID) {
×
397
                        return true
×
398
                }
×
399
        }
400

401
        return false
3✔
402
}
403

404
func (na *NodeAccessor) disconnectLink(link *nodeLink, lock bool) {
84✔
405
        if err := link.disconnect(); err != nil {
84✔
406
                na.logger.Error("failed to disconnect link", slog.String("error", err.Error()))
×
407
        }
×
408

409
        if lock {
84✔
410
                na.mtx.Lock()
×
411
                defer na.mtx.Unlock()
×
412
        }
×
413

414
        if nodeID := na.link2nodeID[link]; nodeID != nil {
124✔
415
                delete(na.nodeID2link, *nodeID)
40✔
416
                delete(na.link2nodeID, link)
40✔
417
                na.callChangeConnections()
40✔
418
        }
40✔
419

420
        if offerID, ok := na.link2offerID[link]; ok {
85✔
421
                delete(na.offerID2state, offerID)
1✔
422
                delete(na.link2offerID, link)
1✔
423
        }
1✔
424
}
425

426
func (na *NodeAccessor) houseKeeping() error {
67✔
427
        for link := range na.link2nodeID {
107✔
428
                if link.getLinkState() == nodeLinkStateDisabled {
40✔
429
                        na.disconnectLink(link, false)
×
430
                }
×
431
        }
432

433
        for _, state := range na.offerID2state {
69✔
434
                if time.Now().After(state.creationTimestamp.Add(na.connectionTimeout)) {
3✔
435
                        na.disconnectLink(state.link, false)
1✔
436
                }
1✔
437
        }
438

439
        return nil
67✔
440
}
441

442
func (na *NodeAccessor) IsOnline() bool {
191✔
443
        na.mtx.RLock()
191✔
444
        defer na.mtx.RUnlock()
191✔
445
        for _, link := range na.nodeID2link {
224✔
446
                if link.getLinkState() == nodeLinkStateOnline {
66✔
447
                        return true
33✔
448
                }
33✔
449
        }
450
        return false
158✔
451
}
452

453
func (na *NodeAccessor) RelayPacket(dstNodeID *types.NodeID, packet *networkTypes.Packet) error {
7✔
454
        if !na.IsOnline() {
7✔
455
                return errors.New("node accessor should be online before relaying packet")
×
456
        }
×
457

458
        na.mtx.RLock()
7✔
459
        defer na.mtx.RUnlock()
7✔
460

7✔
461
        if dstNodeID.Equal(&types.NodeNeighborhoods) {
7✔
462
                if (packet.Mode & networkTypes.PacketModeOneWay) == networkTypes.PacketModeNone {
×
463
                        return errors.New("packet mode should be one way when multicast")
×
464
                }
×
465
                for nodeID, link := range na.nodeID2link {
×
466
                        copy := *packet
×
467
                        copy.DstNodeID = &nodeID
×
468
                        if link.getLinkState() == nodeLinkStateOnline {
×
469
                                if err := link.sendPacket(&copy); err != nil {
×
470
                                        return errors.New("failed to send packet")
×
471
                                }
×
472
                        }
473
                }
474

475
        } else {
7✔
476
                link, ok := na.nodeID2link[*dstNodeID]
7✔
477
                if !ok || link.getLinkState() != nodeLinkStateOnline {
7✔
478
                        return errors.New("link is not online")
×
479
                }
×
480
                if err := link.sendPacket(packet); err != nil {
7✔
481
                        return errors.New("failed to send packet")
×
482
                }
×
483
        }
484

485
        return nil
7✔
486
}
487

488
func (na *NodeAccessor) sendAnswer(dstNodeID *types.NodeID, offerID uint32, status signal.AnswerStatus, sdp string) error {
21✔
489
        return na.handler.NodeAccessorSendSignalAnswer(dstNodeID, &signal.Answer{
21✔
490
                OfferID: offerID,
21✔
491
                Status:  status,
21✔
492
                Sdp:     sdp,
21✔
493
        })
21✔
494
}
21✔
495

496
func (na *NodeAccessor) sendICE(dstNodeID *types.NodeID, offerID uint32, ices []string) error {
61✔
497
        return na.handler.NodeAccessorSendSignalICE(dstNodeID, &signal.ICE{
61✔
498
                OfferID: offerID,
61✔
499
                Ices:    ices,
61✔
500
        })
61✔
501
}
61✔
502

503
func (na *NodeAccessor) nodeLinkChangeState(link *nodeLink, linkState nodeLinkState) {
85✔
504
        go func() {
170✔
505
                na.mtx.Lock()
85✔
506
                defer na.mtx.Unlock()
85✔
507

85✔
508
                if linkState == nodeLinkStateOnline {
127✔
509
                        offerID, ok := na.link2offerID[link]
42✔
510
                        if ok {
84✔
511
                                state := na.offerID2state[offerID]
42✔
512
                                delete(na.offerID2state, offerID)
42✔
513
                                delete(na.link2offerID, link)
42✔
514
                                // There is a possibility that both nodes try to connect at the same time.
42✔
515
                                // Depending on the timing, links created by each other may interfere with each other,
42✔
516
                                // causing link creation to fail. If there are links to the same node,
42✔
517
                                // the link with the larger data channel label is retained.
42✔
518
                                if existingLink, ok := na.nodeID2link[*state.nodeID]; ok {
44✔
519
                                        if existingLink.getLinkState() != nodeLinkStateOnline ||
2✔
520
                                                strings.Compare(existingLink.getLabel(), link.getLabel()) > 0 {
2✔
521
                                                // disconnect existing link and keep new link
×
522
                                                na.disconnectLink(existingLink, false)
×
523
                                                na.nodeID2link[*state.nodeID] = link
×
524
                                                na.link2nodeID[link] = state.nodeID.Copy()
×
525
                                        } else {
2✔
526
                                                if strings.Compare(existingLink.getLabel(), link.getLabel()) == 0 {
2✔
527
                                                        panic(fmt.Sprintf("data channel label should be unique, %d, %s", offerID, existingLink.getLabel()))
×
528
                                                }
529
                                                // disconnect new link and keep existing link
530
                                                na.disconnectLink(link, false)
2✔
531
                                        }
532
                                } else {
40✔
533
                                        na.nodeID2link[*state.nodeID] = link
40✔
534
                                        na.link2nodeID[link] = state.nodeID.Copy()
40✔
535
                                }
40✔
536
                                na.callChangeConnections()
42✔
537
                        }
538
                }
539

540
                if linkState == nodeLinkStateDisabled {
128✔
541
                        na.disconnectLink(link, false)
43✔
542
                }
43✔
543
        }()
544
}
545

546
func (na *NodeAccessor) nodeLinkUpdateICE(link *nodeLink, ice string) {
86✔
547
        go func() {
172✔
548
                na.mtx.Lock()
86✔
549
                defer na.mtx.Unlock()
86✔
550

86✔
551
                offerID, ok := na.link2offerID[link]
86✔
552
                if !ok {
86✔
553
                        return
×
554
                }
×
555

556
                state := na.offerID2state[offerID]
86✔
557
                state.icesMtx.Lock()
86✔
558
                defer state.icesMtx.Unlock()
86✔
559
                if state.ices == nil {
116✔
560
                        na.sendICE(state.nodeID, offerID, []string{ice})
30✔
561
                } else {
86✔
562
                        state.ices = append(state.ices, ice)
56✔
563
                }
56✔
564
        }()
565
}
566

567
func (na *NodeAccessor) nodeLinkRecvPacket(link *nodeLink, packet *networkTypes.Packet) {
7✔
568
        na.mtx.RLock()
7✔
569
        nodeID := na.link2nodeID[link]
7✔
570
        na.mtx.RUnlock()
7✔
571
        na.handler.NodeAccessorRecvPacket(nodeID, packet)
7✔
572
}
7✔
573

574
func (na *NodeAccessor) callChangeConnections() {
82✔
575
        connections := make(map[types.NodeID]struct{})
82✔
576
        for nodeID := range na.nodeID2link {
174✔
577
                connections[nodeID] = struct{}{}
92✔
578
        }
92✔
579
        na.handler.NodeAccessorChangeConnections(connections)
82✔
580
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc