• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 27453982421

13 Jun 2026 02:32AM UTC coverage: 48.422% (-16.2%) from 64.66%
27453982421

Pull #107

github

llamerada-jp
wip: fixing

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

632 of 2695 new or added lines in 24 files covered. (23.45%)

2 existing lines in 1 file now uncovered.

3299 of 6813 relevant lines covered (48.42%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/node/node.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package node
17

18
import (
19
        "context"
20
        "fmt"
21
        "log/slog"
22
        "net/http"
23
        "net/http/cookiejar"
24
        "time"
25

26
        "github.com/llamerada-jp/colonio/node/internal/geometry"
27
        "github.com/llamerada-jp/colonio/node/internal/kvs"
28
        "github.com/llamerada-jp/colonio/node/internal/kvs/activation"
29
        "github.com/llamerada-jp/colonio/node/internal/kvs/hosting"
30
        "github.com/llamerada-jp/colonio/node/internal/kvs/sector/consensus"
31
        "github.com/llamerada-jp/colonio/node/internal/messaging"
32
        "github.com/llamerada-jp/colonio/node/internal/network"
33
        "github.com/llamerada-jp/colonio/node/internal/network/node_accessor"
34
        "github.com/llamerada-jp/colonio/node/internal/spread"
35
        "github.com/llamerada-jp/colonio/node/observation"
36
        "github.com/llamerada-jp/colonio/types"
37
        kvsTypes "github.com/llamerada-jp/colonio/types/kvs"
38
        networkTypes "github.com/llamerada-jp/colonio/types/network"
39
)
40

41
type MessagingOptions messaging.Options
42
type MessagingOptionSetter func(*MessagingOptions)
43
type MessagingResponseWriter messaging.ResponseWriter
44
type MessagingRequest struct {
45
        SourceNodeID string
46
        Message      []byte
47
        Options      *MessagingOptions
48
}
49

50
type SpreadOptions spread.Options
51
type SpreadOptionSetter func(*SpreadOptions)
52
type SpreadRequest struct {
53
        SourceNodeID string
54
        Message      []byte
55
        Options      *SpreadOptions
56
}
57

58
type Node interface {
59
        Start(ctx context.Context) error
60
        Stop()
61
        IsOnline() bool
62
        IsStable() bool
63
        GetLocalNodeID() string
64
        UpdateLocalPosition(x, y float64) error
65
        // kvs
66
        KvsGet(key string) chan *kvsTypes.GetResult
67
        KvsSet(key string, value []byte) chan error
68
        KvsPatch(key string, value []byte) chan error
69
        KvsDelete(key string) chan error
70
        // messaging
71
        MessagingPost(dst, name string, val []byte, setters ...MessagingOptionSetter) ([]byte, error)
72
        MessagingSetHandler(name string, handler func(*MessagingRequest, MessagingResponseWriter))
73
        MessagingUnsetHandler(name string)
74
        // spread
75
        SpreadPost(x, y, r float64, name string, val []byte, setters ...SpreadOptionSetter) error
76
        SpreadSetHandler(name string, handler func(*SpreadRequest))
77
        SpreadUnsetHandler(name string)
78
}
79

80
type Config struct {
81
        Logger             *slog.Logger
82
        EnableRaftLogging  bool
83
        ObservationHandler *observation.Handler
84
        HttpClient         *http.Client
85
        SeedURL            string
86
        ICEServers         []*networkTypes.ICEServer
87
        CoordinateSystem   geometry.CoordinateSystem
88

89
        // PacketHopLimit is the maximum number of hops that a packet can be relayed.
90
        // If you set 0, the default value of 64 will be set.
91
        PacketHopLimit uint
92

93
        // KvsStore is an actual data store for KVS.
94
        KvsStore kvsTypes.Store
95

96
        // CacheLifetime is the lifetime of the cache. The spread algorithm is
97
        // so simple that the same packet may be received multiple times;
98
        // if the same packet is received within the cache lifetime, it can be suppressed
99
        // for reprocessing. This costs more memory.
100
        SpreadCacheLifetime time.Duration
101

102
        // Before sending a large payload, you can check whether a cache exists
103
        // at the other node using knock packet. For small packets, it is more efficient
104
        // to send the packet without knocking.
105
        // If you set 0, disable the use of knock packets.
106
        SpreadSizeToUseKnock uint
107
}
108
type ConfigSetter func(*Config)
109

110
func WithLogger(logger *slog.Logger) ConfigSetter {
×
111
        return func(c *Config) {
×
112
                c.Logger = logger
×
113
        }
×
114
}
115

NEW
116
func WithRaftLogging() ConfigSetter {
×
NEW
117
        return func(c *Config) {
×
NEW
118
                c.EnableRaftLogging = true
×
NEW
119
        }
×
120
}
121

122
func WithObservation(handler *observation.Handler) ConfigSetter {
×
123
        return func(c *Config) {
×
124
                c.ObservationHandler = handler
×
125
        }
×
126
}
127

128
func WithHttpClient(client *http.Client) ConfigSetter {
×
129
        return func(c *Config) {
×
130
                c.HttpClient = client
×
131
        }
×
132
}
133

134
func WithSeedURL(url string) ConfigSetter {
×
135
        return func(c *Config) {
×
136
                c.SeedURL = url
×
137
        }
×
138
}
139

140
// IceServers is a list of ICE servers.
141
// In Colonio, multiple ICE servers can be used to establish a WebRTC connection.
142
// Each entry contains the URLs of the ICE server, the username, and the credential.
143
func WithICEServers(servers []*networkTypes.ICEServer) ConfigSetter {
×
144
        return func(c *Config) {
×
145
                c.ICEServers = servers
×
146
        }
×
147
}
148

149
// Plane is a configuration for the plane geometry.
150
// If this configuration is set, the 2D position based network will be setup as a plane space.
151
func WithPlaneGeometry(xMin, xMax, yMin, yMax float64) ConfigSetter {
×
152
        return func(c *Config) {
×
153
                c.CoordinateSystem = geometry.NewPlaneCoordinateSystem(xMin, xMax, yMin, yMax)
×
154
        }
×
155
}
156

NEW
157
func WithKvsStore(store kvsTypes.Store) ConfigSetter {
×
NEW
158
        return func(c *Config) {
×
NEW
159
                c.KvsStore = store
×
NEW
160
        }
×
161
}
162

163
// Sphere is a configuration for the sphere geometry.
164
// If this configuration is set, the 2D position based network will be setup as a sphere space.
165
func WithSphereGeometry(radius float64) ConfigSetter {
×
166
        return func(c *Config) {
×
167
                c.CoordinateSystem = geometry.NewSphereCoordinateSystem(radius)
×
168
        }
×
169
}
170

171
type colonioImpl struct {
172
        logger      *slog.Logger
173
        ctx         context.Context
174
        cancel      context.CancelFunc
175
        localNodeID *types.NodeID
176
        network     *network.Network
177
        kvs         *kvs.KVS
178
        messaging   *messaging.Messaging
179
        spread      *spread.Spread
180
}
181

182
func NewNode(setters ...ConfigSetter) (Node, error) {
×
183
        config := &Config{
×
184
                Logger:             slog.Default(),
×
185
                ObservationHandler: nil,
×
186
                PacketHopLimit:     64,
×
187

×
188
                SpreadCacheLifetime:  1 * time.Minute,
×
189
                SpreadSizeToUseKnock: 4096,
×
190
        }
×
191
        for _, setter := range setters {
×
192
                setter(config)
×
193
        }
×
194

195
        if len(config.SeedURL) == 0 {
×
196
                return nil, fmt.Errorf("seed url should be set")
×
197
        }
×
198

199
        if len(config.ICEServers) == 0 {
×
200
                return nil, fmt.Errorf("ICE servers should be set")
×
201
        }
×
202

203
        if config.CoordinateSystem == nil {
×
204
                return nil, fmt.Errorf("coordinate system should be set")
×
205
        }
×
206

207
        if config.HttpClient == nil {
×
208
                jar, err := cookiejar.New(nil)
×
209
                if err != nil {
×
210
                        return nil, err
×
211
                }
×
212

213
                config.HttpClient = &http.Client{
×
214
                        Jar: jar,
×
215
                }
×
216
        }
217

NEW
218
        if config.KvsStore == nil {
×
NEW
219
                config.KvsStore = kvs.NewSimpleStore()
×
NEW
220
        }
×
221

222
        impl := &colonioImpl{
×
223
                logger: config.Logger,
×
224
        }
×
225

×
226
        // create network module
×
227
        net, err := network.NewNetwork(&network.Config{
×
228
                Logger:           config.Logger,
×
229
                Handler:          impl,
×
230
                Observation:      config.ObservationHandler,
×
231
                CoordinateSystem: config.CoordinateSystem,
×
232
                HttpClient:       config.HttpClient,
×
233
                SeedURL:          config.SeedURL,
×
234
                NLC: &node_accessor.NodeLinkConfig{
×
235
                        ICEServers: config.ICEServers,
×
236

×
237
                        // SessionTimeout is used to determine the timeout of the WebRTC session between nodes.
×
238
                        SessionTimeout: 5 * time.Minute,
×
239

×
240
                        // KeepaliveInterval is the interval to send a ping packet to tell living the node for each nodes.
×
241
                        // Keepalive packet is be tried to send  when no packet with content has been sent.
×
242
                        // The value should be less than `sessionTimeout`.
×
243
                        KeepaliveInterval: 1 * time.Minute,
×
244

×
245
                        // BufferInterval is maximum interval for buffering packets between nodes.
×
246
                        // If packets exceeding WebRTCPacketBaseBytes are stored in the buffer even if it is less than interval,
×
247
                        // the packets will be flush.
×
248
                        // If you set 0, disables packet buffering and tries to transport the packet as immediately as possible.
×
249
                        BufferInterval: 10 * time.Millisecond,
×
250

×
251
                        // PacketBaseBytes is a reference value for the packet size to be sent in WebRTC communication,
×
252
                        // since WebRTC data channel may fail to send too large packets.
×
253
                        // If you set 0, 512KiB will be set as the default value.
×
254
                        // For simplification of the internal implementation, the packet size actually sent may be
×
255
                        // larger than this value. Therefore, please set this value with a margin.
×
256
                        // This value is provided as a fallback, although it may not be necessary depending
×
257
                        // on the WebRTC library implementation. In such a case, you can disable this value
×
258
                        // the pseudo setting by setting a very large value.
×
259
                        PacketBaseBytes: 4096,
×
260
                },
×
261
                PacketHopLimit: config.PacketHopLimit,
×
262
        })
×
263
        if err != nil {
×
264
                return nil, err
×
265
        }
×
266
        impl.network = net
×
267

×
NEW
268
        activationResolver := activation.NewResolver(&activation.Config{
×
NEW
269
                Outbound: activation.NewOutbound(net.GetSeedClient()),
×
NEW
270
                CacheTTL: 1 * time.Minute,
×
NEW
271
        })
×
NEW
272

×
NEW
273
        hostingManager := hosting.NewManager(&hosting.Config{
×
NEW
274
                Logger:   config.Logger,
×
NEW
275
                Outbound: hosting.NewOutbound(net.GetTransferer()),
×
NEW
276
        })
×
NEW
277
        hosting.SetupInbound(impl.logger, net.GetTransferer(), hostingManager)
×
NEW
278

×
NEW
279
        impl.kvs = kvs.NewKVS(&kvs.Config{
×
NEW
280
                Logger:             config.Logger,
×
NEW
281
                EnableRaftLogging:  config.EnableRaftLogging,
×
NEW
282
                Handler:            impl,
×
NEW
283
                Outbound:           kvs.NewOutbound(net.GetTransferer()),
×
NEW
284
                ConsensusOutbound:  consensus.NewOutbound(net.GetTransferer()),
×
NEW
285
                ActivationResolver: activationResolver,
×
NEW
286
                HostingManager:     hostingManager,
×
NEW
287
                Observation:        config.ObservationHandler,
×
NEW
288
                Store:              config.KvsStore,
×
NEW
289
        })
×
NEW
290
        kvs.SetupInbound(impl.logger, net.GetTransferer(), impl.kvs)
×
NEW
291

×
292
        impl.messaging = messaging.NewMessaging(&messaging.Config{
×
293
                Logger:     config.Logger,
×
294
                Transferer: net.GetTransferer(),
×
295
        })
×
296

×
297
        impl.spread = spread.NewSpread(&spread.Config{
×
298
                Logger:           config.Logger,
×
299
                Handler:          impl,
×
300
                Transferer:       net.GetTransferer(),
×
301
                CoordinateSystem: config.CoordinateSystem,
×
302
                CacheLifetime:    config.SpreadCacheLifetime,
×
303
                SizeToUseKnock:   config.SpreadSizeToUseKnock,
×
304
        })
×
305

×
306
        return impl, nil
×
307
}
308

309
func (c *colonioImpl) Start(ctx context.Context) error {
×
310
        if c.ctx != nil {
×
311
                return fmt.Errorf("cannot call Start() twice")
×
312
        }
×
313
        c.ctx, c.cancel = context.WithCancel(ctx)
×
314

×
315
        var err error
×
316
        c.localNodeID, err = c.network.Start(c.ctx)
×
317
        if err != nil {
×
318
                return err
×
319
        }
×
320

NEW
321
        c.kvs.Start(c.ctx, c.localNodeID)
×
322
        c.spread.Start(c.ctx, c.localNodeID)
×
323

×
324
        return nil
×
325
}
326

327
func (c *colonioImpl) Stop() {
×
328
        c.cancel()
×
329
}
×
330

331
func (c *colonioImpl) IsOnline() bool {
×
332
        return c.network.IsOnline()
×
333
}
×
334

335
func (c *colonioImpl) IsStable() bool {
×
NEW
336
        s, _, _ := c.network.GetStability()
×
337
        return s
×
338
}
×
339

340
func (c *colonioImpl) GetLocalNodeID() string {
×
341
        return c.localNodeID.String()
×
342
}
×
343

344
func (c *colonioImpl) UpdateLocalPosition(x, y float64) error {
×
345
        position := geometry.NewCoordinate(x, y)
×
346
        c.spread.UpdateLocalPosition(position)
×
347
        return c.network.UpdateLocalPosition(position)
×
348
}
×
349

350
// kvs
351

NEW
352
func (c *colonioImpl) KvsGetStability() (bool, []*types.NodeID, []*types.NodeID) {
×
NEW
353
        return c.network.GetStability()
×
NEW
354
}
×
355

NEW
356
func (c *colonioImpl) KvsGet(key string) chan *kvsTypes.GetResult {
×
NEW
357
        return c.kvs.Get(key)
×
NEW
358
}
×
359

NEW
360
func (c *colonioImpl) KvsSet(key string, value []byte) chan error {
×
NEW
361
        return c.kvs.Set(key, value)
×
NEW
362
}
×
363

NEW
364
func (c *colonioImpl) KvsPatch(key string, value []byte) chan error {
×
NEW
365
        return c.kvs.Patch(key, value)
×
NEW
366
}
×
367

NEW
368
func (c *colonioImpl) KvsDelete(key string) chan error {
×
NEW
369
        return c.kvs.Delete(key)
×
NEW
370
}
×
371

372
// messaging
373

374
func MessagingWithAcceptNearby() MessagingOptionSetter {
×
375
        return func(o *MessagingOptions) {
×
376
                o.AcceptNearby = true
×
377
        }
×
378
}
379

380
func MessagingWithIgnoreResponse() MessagingOptionSetter {
×
381
        return func(o *MessagingOptions) {
×
382
                o.IgnoreResponse = true
×
383
        }
×
384
}
385

386
func (c *colonioImpl) MessagingPost(dst, name string, val []byte, setters ...MessagingOptionSetter) ([]byte, error) {
×
387
        dstNodeID, err := types.NewNodeIDFromString(dst)
×
388
        if err != nil {
×
389
                return nil, fmt.Errorf("invalid node id")
×
390
        }
×
391

392
        opt := &MessagingOptions{}
×
393
        for _, setter := range setters {
×
394
                setter(opt)
×
395
        }
×
396

397
        return c.messaging.Post(dstNodeID, name, val, (*messaging.Options)(opt))
×
398
}
399

400
func (c *colonioImpl) MessagingSetHandler(name string, handler func(*MessagingRequest, MessagingResponseWriter)) {
×
401
        c.messaging.SetHandler(name, func(req *messaging.Request, res messaging.ResponseWriter) {
×
402
                handler(&MessagingRequest{
×
403
                        SourceNodeID: req.SourceNodeID.String(),
×
404
                        Message:      req.Message,
×
405
                        Options:      (*MessagingOptions)(req.Options),
×
406
                }, MessagingResponseWriter(res))
×
407
        })
×
408
}
409

410
func (c *colonioImpl) MessagingUnsetHandler(name string) {
×
411
        c.messaging.UnsetHandler(name)
×
412
}
×
413

414
// spread
415

416
func SpreadWithSomeoneMustExists() SpreadOptionSetter {
×
417
        return func(o *SpreadOptions) {
×
418
                o.SomeoneMustExists = true
×
419
        }
×
420
}
421

422
func (c *colonioImpl) SpreadPost(x, y, r float64, name string, val []byte, setters ...SpreadOptionSetter) error {
×
423
        opt := &SpreadOptions{}
×
424
        for _, setter := range setters {
×
425
                setter(opt)
×
426
        }
×
427

428
        return c.spread.Post(geometry.NewCoordinate(x, y), r, name, val, (*spread.Options)(opt))
×
429
}
430

431
func (c *colonioImpl) SpreadSetHandler(name string, handler func(*SpreadRequest)) {
×
432
        c.spread.SetHandler(name, func(r *spread.Request) {
×
433
                handler(&SpreadRequest{
×
434
                        SourceNodeID: r.SourceNodeID.String(),
×
435
                        Message:      r.Message,
×
436
                        Options:      (*SpreadOptions)(r.Options),
×
437
                })
×
438
        })
×
439
}
440

441
func (c *colonioImpl) SpreadUnsetHandler(name string) {
×
442
        c.spread.UnsetHandler(name)
×
443
}
×
444

445
func (c *colonioImpl) NetworkUpdateNextNodePosition(positions map[types.NodeID]*geometry.Coordinate) {
×
446
        c.spread.UpdateNextNodePosition(positions)
×
447
}
×
448

449
func (c *colonioImpl) SpreadGetRelayNodeID(position *geometry.Coordinate) *types.NodeID {
×
450
        return c.network.GetNextStep2D(position)
×
451
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc