• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 27453982421

13 Jun 2026 02:32AM UTC coverage: 48.422% (-16.2%) from 64.66%
27453982421

Pull #107

github

llamerada-jp
wip: fixing

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

632 of 2695 new or added lines in 24 files covered. (23.45%)

2 existing lines in 1 file now uncovered.

3299 of 6813 relevant lines covered (48.42%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/node/internal/network/routing/routing.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package routing
17

18
import (
19
        "context"
20
        "fmt"
21
        "log/slog"
22
        "sync"
23
        "time"
24

25
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
26
        "github.com/llamerada-jp/colonio/node/internal/geometry"
27
        "github.com/llamerada-jp/colonio/node/internal/network/transferer"
28
        observation "github.com/llamerada-jp/colonio/node/observation"
29
        "github.com/llamerada-jp/colonio/types"
30
        networkTypes "github.com/llamerada-jp/colonio/types/network"
31
)
32

33
const (
34
        connectionUpdateInterval = 10 * time.Second
35
        requireUpdateConnections = 0x1
36
        requireExchangeRouting   = 0x2
37
)
38

39
type Handler interface {
40
        RoutingReconcileNextNodes(nextNodeIDs, disconnectedNodeIDs []*types.NodeID) (bool, error)
41
        RoutingUpdateConnection(required, keep map[types.NodeID]struct{})
42
        RoutingUpdateNextNodePositions(map[types.NodeID]*geometry.Coordinate)
43
}
44

45
type Config struct {
46
        Logger           *slog.Logger
47
        Handler          Handler
48
        Observation      observation.Caller
49
        Transferer       *transferer.Transferer
50
        CoordinateSystem geometry.CoordinateSystem
51
}
52

53
type Routing struct {
54
        logger           *slog.Logger
55
        handler          Handler
56
        observation      observation.Caller
57
        transferer       *transferer.Transferer
58
        coordinateSystem geometry.CoordinateSystem
59

60
        localNodeID          *types.NodeID
61
        r1d                  *routing1D
62
        r2d                  *routing2D
63
        mtx                  sync.Mutex
64
        lastRouteUpdate      time.Time
65
        triggeredAction      int
66
        lastConnectionUpdate time.Time
67
}
68

69
func NewRouting(config *Config) *Routing {
×
70
        r := &Routing{
×
71
                logger:               config.Logger,
×
72
                handler:              config.Handler,
×
73
                observation:          config.Observation,
×
74
                transferer:           config.Transferer,
×
75
                coordinateSystem:     config.CoordinateSystem,
×
76
                lastRouteUpdate:      time.Now(),
×
77
                lastConnectionUpdate: time.Now(),
×
78
        }
×
79

×
80
        transferer.SetRequestHandler[proto.PacketContent_Routing](config.Transferer, r.recvRouting)
×
81

×
82
        return r
×
83
}
×
84

85
func (r *Routing) Start(ctx context.Context, localNodeID *types.NodeID) {
×
86
        r.localNodeID = localNodeID
×
87

×
88
        r.r1d = newRouting1D(&routing1DConfig{
×
89
                logger:             r.logger,
×
90
                localNodeID:        localNodeID,
×
91
                reconcileNextNodes: r.handler.RoutingReconcileNextNodes,
×
92
        })
×
93

×
94
        if r.coordinateSystem != nil {
×
95
                r.r2d = newRouting2D(&routing2DConfig{
×
96
                        logger:      r.logger,
×
97
                        localNodeID: localNodeID,
×
98
                        geometry:    r.coordinateSystem,
×
99
                })
×
100
        }
×
101

102
        go func() {
×
103
                ticker := time.NewTicker(1 * time.Second)
×
104
                defer ticker.Stop()
×
105
                for {
×
106
                        select {
×
107
                        case <-ctx.Done():
×
108
                                return
×
109

110
                        case <-ticker.C:
×
111
                                r.subRoutine()
×
112
                        }
113
                }
114
        }()
115
}
116

NEW
117
func (r *Routing) GetStability() (bool, []*types.NodeID, []*types.NodeID) {
×
118
        return r.r1d.getStability()
×
119
}
×
120

121
func (r *Routing) GetNextStep1D(packet *networkTypes.Packet) *types.NodeID {
×
122
        return r.r1d.getNextStep(packet)
×
123
}
×
124

125
func (r *Routing) GetNextStep2D(dst *geometry.Coordinate) *types.NodeID {
×
126
        if r.r2d == nil {
×
127
                panic("routing 2D is not enabled")
×
128
        }
129

130
        return r.r2d.getNextStep(dst)
×
131
}
132

133
func (r *Routing) UpdateLocalPosition(pos *geometry.Coordinate) error {
×
134
        if r.r2d == nil {
×
135
                return fmt.Errorf("position based network is not enabled")
×
136
        }
×
137

138
        r.mtx.Lock()
×
139
        defer r.mtx.Unlock()
×
140

×
141
        r.triggeredAction = r.triggeredAction | r.r2d.updateLocalPosition(pos)
×
142
        return nil
×
143
}
144

145
func (r *Routing) UpdateNodeConnections(connections map[types.NodeID]struct{}) {
×
146
        r.mtx.Lock()
×
147
        defer r.mtx.Unlock()
×
148
        r.triggeredAction = r.triggeredAction | r.r1d.updateNodeConnections(connections)
×
149
        if r.r2d != nil {
×
150
                r.triggeredAction = r.triggeredAction | r.r2d.updateNodeConnections(connections)
×
151
        }
×
152
}
153

154
func (r *Routing) CountRecvPacket(from *types.NodeID, packet *networkTypes.Packet) {
×
155
        r.r1d.countRecvPacket(from)
×
156
}
×
157

158
func (r *Routing) subRoutine() {
×
159
        r.r1d.subRoutine()
×
160

×
161
        r.mtx.Lock()
×
162
        defer r.mtx.Unlock()
×
163

×
164
        // TODO: should be optimized
×
165
        if r.r2d != nil {
×
166
                r.handler.RoutingUpdateNextNodePositions(r.r2d.getNextNodePositions())
×
167
        }
×
168

169
        if (r.triggeredAction&requireUpdateConnections) != 0 ||
×
170
                time.Now().After(r.lastConnectionUpdate.Add(connectionUpdateInterval)) {
×
171
                required, keep := r.r1d.getConnections()
×
172
                if r.observation != nil {
×
173
                        r.observation.UpdateRequiredNodeIDs1D(types.ConvertNodeIDSetToStringMap(required))
×
174
                }
×
175
                if r.r2d != nil {
×
176
                        required2d := r.r2d.getConnections()
×
177
                        for nodeID := range required2d {
×
178
                                required[nodeID] = struct{}{}
×
179
                        }
×
180
                        if r.observation != nil {
×
181
                                r.observation.UpdateRequiredNodeIDs2D(types.ConvertNodeIDSetToStringMap(required2d))
×
182
                        }
×
183
                }
184
                r.handler.RoutingUpdateConnection(required, keep)
×
185
                r.triggeredAction = r.triggeredAction &^ requireUpdateConnections
×
186
                r.lastConnectionUpdate = time.Now()
×
187
        }
188

189
        if (r.triggeredAction & requireExchangeRouting) != 0 {
×
190
                r.sendRouting()
×
191
                r.triggeredAction = r.triggeredAction &^ requireExchangeRouting
×
192
                r.lastRouteUpdate = time.Now()
×
193
        }
×
194
}
195

196
func (r *Routing) sendRouting() {
×
197
        content := &proto.Routing{
×
198
                NodeRecords: make(map[string]*proto.RoutingNodeRecord),
×
199
        }
×
200
        r.r1d.setupRoutingPacket(content)
×
201
        if r.r2d != nil {
×
202
                r.r2d.setupRoutingPacket(content)
×
203
        }
×
204

205
        r.transferer.RequestOneWay(&types.NodeNeighborhoods, networkTypes.PacketModeNoRetry, &proto.PacketContent{
×
206
                Content: &proto.PacketContent_Routing{
×
207
                        Routing: content,
×
208
                },
×
209
        })
×
210
}
211

212
func (r *Routing) recvRouting(p *networkTypes.Packet) {
×
213
        src := p.SrcNodeID
×
214
        content := p.Content.GetRouting()
×
215

×
216
        r.mtx.Lock()
×
217
        defer r.mtx.Unlock()
×
218

×
219
        action, err := r.r1d.recvRoutingPacket(src, content)
×
220
        if err != nil {
×
221
                r.logger.Warn("error on processing routing packet", slog.String("error", err.Error()))
×
222
                return
×
223
        }
×
224
        r.triggeredAction = r.triggeredAction | action
×
225
        if r.r2d != nil {
×
226
                action, err = r.r2d.recvRoutingPacket(src, content)
×
227
                if err != nil {
×
228
                        r.logger.Warn("error on processing routing packet", slog.String("error", err.Error()))
×
229
                        return
×
230
                }
×
231
                r.triggeredAction = r.triggeredAction | action
×
232
        }
233
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc