• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 26680513216

30 May 2026 09:34AM UTC coverage: 47.381% (-17.3%) from 64.66%
26680513216

Pull #107

github

llamerada-jp
wip: TLA+

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

362 of 2277 new or added lines in 24 files covered. (15.9%)

2 existing lines in 1 file now uncovered.

3030 of 6395 relevant lines covered (47.38%)

29.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/node/internal/network/network.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package network
17

18
import (
19
        "context"
20
        "log/slog"
21
        "net/http"
22
        "time"
23

24
        service "github.com/llamerada-jp/colonio/api/colonio/v1alpha/v1alphaconnect"
25
        "github.com/llamerada-jp/colonio/node/internal/constants"
26
        "github.com/llamerada-jp/colonio/node/internal/geometry"
27
        "github.com/llamerada-jp/colonio/node/internal/network/node_accessor"
28
        "github.com/llamerada-jp/colonio/node/internal/network/routing"
29
        "github.com/llamerada-jp/colonio/node/internal/network/seed_accessor"
30
        "github.com/llamerada-jp/colonio/node/internal/network/signal"
31
        "github.com/llamerada-jp/colonio/node/internal/network/transferer"
32
        "github.com/llamerada-jp/colonio/node/observation"
33
        "github.com/llamerada-jp/colonio/types"
34
        networkTypes "github.com/llamerada-jp/colonio/types/network"
35
)
36

37
type Handler interface {
38
        NetworkUpdateNextNodePosition(map[types.NodeID]*geometry.Coordinate)
39
}
40

41
type Config struct {
42
        Logger           *slog.Logger
43
        Handler          Handler
44
        Observation      observation.Caller
45
        CoordinateSystem geometry.CoordinateSystem
46

47
        // config parameters for seed
48
        HttpClient *http.Client // optional
49
        SeedURL    string
50
        // config parameters for webrtc node
51
        NLC *node_accessor.NodeLinkConfig
52

53
        // maximum number of hops that a packet can be relayed.
54
        PacketHopLimit uint
55
}
56

57
type Network struct {
58
        logger      *slog.Logger
59
        handler     Handler
60
        observation observation.Caller
61

62
        localNodeID *types.NodeID
63

64
        seedAccessor *seed_accessor.SeedAccessor
65
        nodeAccessor *node_accessor.NodeAccessor
66
        transferer   *transferer.Transferer
67
        routing      *routing.Routing
68

69
        packetHopLimit uint
70
}
71

72
func NewNetwork(config *Config) (*Network, error) {
10✔
73
        n := &Network{
10✔
74
                logger:         config.Logger,
10✔
75
                handler:        config.Handler,
10✔
76
                observation:    config.Observation,
10✔
77
                packetHopLimit: config.PacketHopLimit,
10✔
78
        }
10✔
79

10✔
80
        n.seedAccessor = seed_accessor.NewSeedAccessor(&seed_accessor.Config{
10✔
81
                Logger:     config.Logger,
10✔
82
                Handler:    n,
10✔
83
                URL:        config.SeedURL,
10✔
84
                HttpClient: config.HttpClient,
10✔
85
        })
10✔
86

10✔
87
        na, err := node_accessor.NewNodeAccessor(&node_accessor.Config{
10✔
88
                Logger:         config.Logger,
10✔
89
                Handler:        n,
10✔
90
                NodeLinkConfig: config.NLC,
10✔
91
        })
10✔
92
        if err != nil {
10✔
93
                return nil, err
×
94
        }
×
95
        n.nodeAccessor = na
10✔
96

10✔
97
        n.transferer = transferer.NewTransferer(&transferer.Config{
10✔
98
                Logger:  config.Logger,
10✔
99
                Handler: n,
10✔
100
        })
10✔
101

10✔
102
        n.routing = routing.NewRouting(&routing.Config{
10✔
103
                Logger:           config.Logger,
10✔
104
                Handler:          n,
10✔
105
                Observation:      config.Observation,
10✔
106
                Transferer:       n.transferer,
10✔
107
                CoordinateSystem: config.CoordinateSystem,
10✔
108
        })
10✔
109

10✔
110
        return n, nil
10✔
111
}
112

113
func (n *Network) Start(ctx context.Context) (*types.NodeID, error) {
10✔
114
        var err error
10✔
115
        n.localNodeID, err = n.seedAccessor.Start(ctx)
10✔
116
        if err != nil {
10✔
117
                return nil, err
×
118
        }
×
119

120
        n.nodeAccessor.Start(ctx, n.localNodeID)
10✔
121
        n.transferer.Start(ctx, n.localNodeID)
10✔
122
        n.routing.Start(ctx, n.localNodeID)
10✔
123

10✔
124
        go func() {
20✔
125
                ticker := time.NewTicker(1 * time.Second)
10✔
126
                defer ticker.Stop()
10✔
127

10✔
128
                for {
132✔
129
                        select {
122✔
130
                        case <-ctx.Done():
10✔
131
                                return
10✔
132
                        case <-ticker.C:
112✔
133
                                n.nodeAccessor.SetBeAlone(n.seedAccessor.IsAlone())
112✔
134
                        }
135
                }
136
        }()
137

138
        return n.localNodeID, nil
10✔
139
}
140

141
func (n *Network) IsOnline() bool {
123✔
142
        return n.seedAccessor.IsAlone() || n.nodeAccessor.IsOnline()
123✔
143
}
123✔
144

145
func (n *Network) GetStability() (bool, []*types.NodeID, []*types.NodeID) {
123✔
146
        return n.routing.GetStability()
123✔
147
}
123✔
148

NEW
149
func (n *Network) GetSeedClient() service.SeedServiceClient {
×
NEW
150
        return n.seedAccessor.GetClient()
×
NEW
151
}
×
152

153
func (n *Network) GetTransferer() *transferer.Transferer {
12✔
154
        return n.transferer
12✔
155
}
12✔
156

157
func (n *Network) UpdateLocalPosition(pos *geometry.Coordinate) error {
10✔
158
        return n.routing.UpdateLocalPosition(pos)
10✔
159
}
10✔
160

161
func (n *Network) GetNextStep2D(dst *geometry.Coordinate) *types.NodeID {
×
162
        return n.routing.GetNextStep2D(dst)
×
163
}
×
164

165
// implements for seed_accessor.Handler
166
func (n *Network) SeedRecvSignalOffer(srcNodeID *types.NodeID, offer *signal.Offer) {
127✔
167
        n.nodeAccessor.SignalingOffer(srcNodeID, offer)
127✔
168
}
127✔
169

170
func (n *Network) SeedRecvSignalAnswer(srcNodeID *types.NodeID, answer *signal.Answer) {
127✔
171
        n.nodeAccessor.SignalingAnswer(srcNodeID, answer)
127✔
172
}
127✔
173

174
func (n *Network) SeedRecvSignalICE(srcNodeID *types.NodeID, ice *signal.ICE) {
254✔
175
        n.nodeAccessor.SignalingICE(srcNodeID, ice)
254✔
176
}
254✔
177

178
// implements for node_accessor.Handler
179
func (n *Network) NodeAccessorRecvPacket(from *types.NodeID, packet *networkTypes.Packet) {
377✔
180
        if !n.checkHopCount(packet) {
377✔
181
                return
×
182
        }
×
183
        if from != nil {
754✔
184
                n.routing.CountRecvPacket(from, packet)
377✔
185
        }
377✔
186

187
        n.classifyPacket(packet)
377✔
188
}
189

190
func (n *Network) NodeAccessorChangeConnections(connections map[types.NodeID]struct{}) {
329✔
191
        // use go routine to avoid deadlock
329✔
192
        go n.routing.UpdateNodeConnections(connections)
329✔
193

329✔
194
        if n.observation != nil {
658✔
195
                n.observation.ChangeConnectedNodes(types.ConvertNodeIDSetToStringMap(connections))
329✔
196
        }
329✔
197
}
198

199
func (n *Network) NodeAccessorSendSignalOffer(dstNodeID *types.NodeID, offer *signal.Offer) error {
127✔
200
        return n.seedAccessor.SendSignalOffer(dstNodeID, offer)
127✔
201
}
127✔
202

203
func (n *Network) NodeAccessorSendSignalAnswer(dstNodeID *types.NodeID, answer *signal.Answer) error {
127✔
204
        return n.seedAccessor.SendSignalAnswer(dstNodeID, answer)
127✔
205
}
127✔
206

207
func (n *Network) NodeAccessorSendSignalICE(dstNodeID *types.NodeID, ice *signal.ICE) error {
254✔
208
        return n.seedAccessor.SendSignalICE(dstNodeID, ice)
254✔
209
}
254✔
210

211
// implements for transferer.Handler
212
func (n *Network) TransfererSendPacket(packet *networkTypes.Packet) {
83✔
213
        n.classifyPacket(packet)
83✔
214
}
83✔
215

216
func (n *Network) TransfererRelayPacket(dstNodeID *types.NodeID, packet *networkTypes.Packet) {
×
217
        if !n.checkHopCount(packet) {
×
218
                return
×
219
        }
×
220

NEW
221
        if err := n.nodeAccessor.RelayPacket(dstNodeID, packet); err != nil {
×
222
                n.logger.Debug("failed to relay packet", slog.String("error", err.Error()))
×
223
        }
×
224
}
225

226
// implements for routing.Handler
227
func (n *Network) RoutingReconcileNextNodes(nextNodeIDs, disconnectedNodeIDs []*types.NodeID) (bool, error) {
511✔
228
        return n.seedAccessor.ReconcileNextNodes(nextNodeIDs, disconnectedNodeIDs)
511✔
229
}
511✔
230

231
func (n *Network) RoutingUpdateConnection(required, keep map[types.NodeID]struct{}) {
62✔
232
        n.nodeAccessor.ConnectLinks(required, keep)
62✔
233
}
62✔
234

235
func (n *Network) RoutingUpdateNextNodePositions(positions map[types.NodeID]*geometry.Coordinate) {
112✔
236
        n.handler.NetworkUpdateNextNodePosition(positions)
112✔
237
}
112✔
238

239
func (n *Network) checkHopCount(packet *networkTypes.Packet) bool {
377✔
240
        if packet.HopCount > uint32(n.packetHopLimit) {
377✔
241
                return false
×
242
        }
×
243

244
        packet.HopCount++
377✔
245

377✔
246
        return true
377✔
247
}
248

249
func (n *Network) classifyPacket(packet *networkTypes.Packet) {
460✔
250
        nextNodeID := n.routing.GetNextStep1D(packet)
460✔
251
        if nextNodeID == nil {
460✔
252
                if (packet.Mode & networkTypes.PacketModeOneWay) == 0x0 {
×
253
                        n.transferer.Error(packet, constants.PacketErrorCodeNoOneReceive, "no one receive the packet")
×
254
                        return
×
255
                }
×
256
        } else if nextNodeID.Equal(&types.NodeLocal) || nextNodeID.Equal(n.localNodeID) {
837✔
257
                n.transferer.Receive(packet)
377✔
258
                return
377✔
259

377✔
260
        } else if nextNodeID.IsNormal() || nextNodeID.Equal(&types.NodeNeighborhoods) {
543✔
261
                if err := n.nodeAccessor.RelayPacket(nextNodeID, packet); err != nil {
83✔
262
                        n.logger.Debug("failed to relay packet", slog.String("error", err.Error()))
×
263
                }
×
264
                return
83✔
265
        }
266

267
        n.logger.Debug("drop packet")
×
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc