• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.43
/internal/network/seed_accessor/seed_accessor.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package seed_accessor
17

18
import (
19
        "context"
20
        "errors"
21
        "fmt"
22
        "log/slog"
23
        "net/http"
24
        "net/http/cookiejar"
25
        "sync"
26
        "time"
27

28
        "connectrpc.com/connect"
29
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
30
        service "github.com/llamerada-jp/colonio/api/colonio/v1alpha/v1alphaconnect"
31
        "github.com/llamerada-jp/colonio/internal/constants"
32
        "github.com/llamerada-jp/colonio/internal/network/signal"
33
        "github.com/llamerada-jp/colonio/internal/shared"
34
)
35

36
type Handler interface {
37
        SeedRecvSignalOffer(*shared.NodeID, *signal.Offer)
38
        SeedRecvSignalAnswer(*shared.NodeID, *signal.Answer)
39
        SeedRecvSignalICE(*shared.NodeID, *signal.ICE)
40
}
41

42
type Config struct {
43
        Logger     *slog.Logger
44
        Handler    Handler
45
        URL        string
46
        HttpClient *http.Client // optional
47
}
48

49
type SeedAccessor struct {
50
        logger      *slog.Logger
51
        ctx         context.Context
52
        handler     Handler
53
        client      service.SeedServiceClient
54
        localNodeID *shared.NodeID
55

56
        // mtx is for waiting and isAlone
57
        mtx sync.RWMutex
58
        // isAlone is true if this node is the only online node of the seed
59
        isAlone bool
60
}
61

62
func NewSeedAccessor(config *Config) *SeedAccessor {
9✔
63
        if len(config.URL) == 0 {
9✔
64
                panic("URL should not be empty")
×
65
        }
66

67
        sa := &SeedAccessor{
9✔
68
                logger:  config.Logger,
9✔
69
                handler: config.Handler,
9✔
70
                isAlone: false,
9✔
71
        }
9✔
72

9✔
73
        httpClient := config.HttpClient
9✔
74
        if httpClient == nil {
9✔
75
                jar, err := cookiejar.New(nil)
×
76
                if err != nil {
×
77
                        panic(err)
×
78
                }
79

80
                httpClient = &http.Client{
×
81
                        Transport: &http.Transport{},
×
82
                        Jar:       jar,
×
83
                }
×
84
        }
85

86
        sa.client = service.NewSeedServiceClient(httpClient, config.URL)
9✔
87

9✔
88
        return sa
9✔
89
}
90

91
func (sa *SeedAccessor) Start(ctx context.Context) (*shared.NodeID, error) {
9✔
92
        sa.ctx = ctx
9✔
93

9✔
94
        res, err := sa.client.AssignNode(sa.ctx, &connect.Request[proto.AssignNodeRequest]{})
9✔
95
        if err != nil {
10✔
96
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
1✔
97
                        return nil, fmt.Errorf("context is canceled before starting: %w", err)
×
98
                }
×
99
                return nil, fmt.Errorf("failed to assign node ID: %w", err)
1✔
100
        }
101

102
        sa.mtx.Lock()
8✔
103
        defer sa.mtx.Unlock()
8✔
104
        sa.localNodeID, err = shared.NewNodeIDFromProto(res.Msg.GetNodeId())
8✔
105
        if err != nil {
8✔
106
                return nil, fmt.Errorf("failed to parse node id: %w", err)
×
107
        }
×
108
        sa.isAlone = res.Msg.GetIsAlone()
8✔
109

8✔
110
        go func() {
16✔
111
                // try to unassign the node when the context is done
8✔
112
                defer sa.unassign()
8✔
113

8✔
114
                // call round every second or finish when context is done
8✔
115
                for {
16✔
116
                        select {
8✔
117
                        case <-sa.ctx.Done():
×
118
                                return
×
119

120
                        default:
8✔
121
                                if err := sa.poll(); err != nil {
15✔
122
                                        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
14✔
123
                                                return
7✔
124
                                        }
7✔
125
                                        sa.logger.Warn("failed to poll", slog.String("error", err.Error()))
×
126
                                        time.Sleep(10 * time.Second)
×
127
                                }
128
                        }
129
                }
130
        }()
131

132
        go func() {
16✔
133
                for {
17✔
134
                        select {
9✔
135
                        case <-sa.ctx.Done():
1✔
136
                                return
1✔
137

138
                        default:
8✔
139
                                if err := sa.keepalive(); err != nil {
8✔
140
                                        if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
141
                                                return
×
142
                                        }
×
143
                                        sa.logger.Warn("failed to keepalive", slog.String("error", err.Error()))
×
144
                                }
145
                                time.Sleep(10 * time.Second)
7✔
146
                        }
147
                }
148
        }()
149

150
        return sa.localNodeID, nil
8✔
151
}
152

153
// IsAlone indicates whether the node is the only one online of the seed.
154
func (sa *SeedAccessor) IsAlone() bool {
12✔
155
        sa.mtx.RLock()
12✔
156
        defer sa.mtx.RUnlock()
12✔
157
        return sa.isAlone
12✔
158
}
12✔
159

160
func (sa *SeedAccessor) SendSignalOffer(dstNodeID *shared.NodeID, offer *signal.Offer) error {
22✔
161
        var offerType proto.SignalOffer_Type
22✔
162
        switch offer.OfferType {
22✔
163
        case signal.OfferTypeExplicit:
19✔
164
                offerType = proto.SignalOffer_TYPE_EXPLICIT
19✔
165
        case signal.OfferTypeNext:
3✔
166
                offerType = proto.SignalOffer_TYPE_NEXT
3✔
167
        default:
×
168
                return fmt.Errorf("unknown offer type: %d", offer.OfferType)
×
169
        }
170

171
        _, err := sa.client.SendSignal(sa.ctx, &connect.Request[proto.SendSignalRequest]{
22✔
172
                Msg: &proto.SendSignalRequest{
22✔
173
                        Signal: &proto.Signal{
22✔
174
                                DstNodeId: dstNodeID.Proto(),
22✔
175
                                SrcNodeId: sa.localNodeID.Proto(),
22✔
176
                                Content: &proto.Signal_Offer{
22✔
177
                                        Offer: &proto.SignalOffer{
22✔
178
                                                OfferId: offer.OfferID,
22✔
179
                                                Type:    offerType,
22✔
180
                                                Sdp:     offer.Sdp,
22✔
181
                                        },
22✔
182
                                },
22✔
183
                        },
22✔
184
                },
22✔
185
        })
22✔
186

22✔
187
        if err != nil {
22✔
188
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
189
                        return nil // context is canceled, just return
×
190
                }
×
191
                return fmt.Errorf("failed to send signal offer: %w", err)
×
192
        }
193

194
        return nil
22✔
195
}
196

197
func (sa *SeedAccessor) SendSignalAnswer(dstNodeID *shared.NodeID, answer *signal.Answer) error {
1✔
198
        _, err := sa.client.SendSignal(sa.ctx, &connect.Request[proto.SendSignalRequest]{
1✔
199
                Msg: &proto.SendSignalRequest{
1✔
200
                        Signal: &proto.Signal{
1✔
201
                                DstNodeId: dstNodeID.Proto(),
1✔
202
                                SrcNodeId: sa.localNodeID.Proto(),
1✔
203
                                Content: &proto.Signal_Answer{
1✔
204
                                        Answer: &proto.SignalAnswer{
1✔
205
                                                OfferId: answer.OfferID,
1✔
206
                                                Status:  uint32(answer.Status),
1✔
207
                                                Sdp:     answer.Sdp,
1✔
208
                                        },
1✔
209
                                },
1✔
210
                        },
1✔
211
                },
1✔
212
        })
1✔
213

1✔
214
        if err != nil {
1✔
215
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
216
                        return nil // context is canceled, just return
×
217
                }
×
218
                return fmt.Errorf("failed to send signal answer: %w", err)
×
219
        }
220

221
        return nil
1✔
222
}
223

224
func (sa *SeedAccessor) SendSignalICE(dstNodeID *shared.NodeID, ices *signal.ICE) error {
1✔
225
        _, err := sa.client.SendSignal(sa.ctx, &connect.Request[proto.SendSignalRequest]{
1✔
226
                Msg: &proto.SendSignalRequest{
1✔
227
                        Signal: &proto.Signal{
1✔
228
                                DstNodeId: dstNodeID.Proto(),
1✔
229
                                SrcNodeId: sa.localNodeID.Proto(),
1✔
230
                                Content: &proto.Signal_Ice{
1✔
231
                                        Ice: &proto.SignalICE{
1✔
232
                                                OfferId: ices.OfferID,
1✔
233
                                                Ices:    ices.Ices,
1✔
234
                                        },
1✔
235
                                },
1✔
236
                        },
1✔
237
                },
1✔
238
        })
1✔
239

1✔
240
        if err != nil {
1✔
241
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
242
                        return nil // context is canceled, just return
×
243
                }
×
244
                return fmt.Errorf("failed to send signal ICE: %w", err)
×
245
        }
246

247
        return nil
1✔
248
}
249

250
func (sa *SeedAccessor) ReconcileNextNodes(nextNodeIDs, disconnectedNodeIDs []*shared.NodeID) (bool, error) {
1✔
251
        res, err := sa.client.ReconcileNextNodes(sa.ctx, &connect.Request[proto.ReconcileNextNodesRequest]{
1✔
252
                Msg: &proto.ReconcileNextNodesRequest{
1✔
253
                        NextNodeIds:         shared.ConvertNodeIDsToProto(nextNodeIDs),
1✔
254
                        DisconnectedNodeIds: shared.ConvertNodeIDsToProto(disconnectedNodeIDs),
1✔
255
                },
1✔
256
        })
1✔
257

1✔
258
        if err != nil {
1✔
259
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
260
                        return false, nil
×
261
                }
×
262
                return false, fmt.Errorf("failed to reconcile next nodes: %w", err)
×
263
        }
264

265
        return res.Msg.Matched, nil
1✔
266
}
267

NEW
268
func (sa *SeedAccessor) StateKvs(state constants.KvsState) (constants.KvsState, error) {
×
NEW
269
        if state == constants.KvsStateUnknown {
×
NEW
270
                panic("logic error: state should not be KvsStateUnknown")
×
271
        }
272

NEW
273
        res, err := sa.client.StateKvs(sa.ctx, &connect.Request[proto.StateKvsRequest]{
×
NEW
274
                Msg: &proto.StateKvsRequest{
×
NEW
275
                        State: proto.KvsState(state),
×
NEW
276
                },
×
NEW
277
        })
×
NEW
278

×
NEW
279
        if err != nil {
×
NEW
280
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
NEW
281
                        return constants.KvsStateUnknown, nil
×
NEW
282
                }
×
NEW
283
                return constants.KvsStateUnknown, fmt.Errorf("failed to set/get state KVS: %w", err)
×
284
        }
285

NEW
286
        return constants.KvsState(res.Msg.EntireState), nil
×
287
}
288

289
func (sa *SeedAccessor) unassign() {
7✔
290
        // use context.Background() because sa.ctx may be already canceled when call this
7✔
291
        _, err := sa.client.UnassignNode(context.Background(), &connect.Request[proto.UnassignNodeRequest]{})
7✔
292
        if err != nil {
12✔
293
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
5✔
294
                        return // context is canceled, just return
×
295
                }
×
296
                sa.logger.Warn("failed to unassign node", slog.String("error", err.Error()))
5✔
297
        }
298
}
299

300
func (sa *SeedAccessor) keepalive() error {
8✔
301
        res, err := sa.client.Keepalive(sa.ctx, &connect.Request[proto.KeepaliveRequest]{})
8✔
302
        if err != nil {
14✔
303
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
12✔
304
                        return nil // context is canceled, just return
6✔
305
                }
6✔
306
                return err
×
307
        }
308
        sa.mtx.Lock()
1✔
309
        defer sa.mtx.Unlock()
1✔
310
        sa.isAlone = res.Msg.GetIsAlone()
1✔
311
        return nil
1✔
312
}
313

314
func (sa *SeedAccessor) poll() error {
8✔
315
        res, err := sa.client.PollSignal(sa.ctx, &connect.Request[proto.PollSignalRequest]{
8✔
316
                Msg: &proto.PollSignalRequest{},
8✔
317
        })
8✔
318
        if err != nil {
8✔
319
                if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
×
320
                        return nil
×
321
                }
×
322
                return fmt.Errorf("failed to poll signal: %w", err)
×
323
        }
324

325
        for res.Receive() {
23✔
326
                msg := res.Msg()
16✔
327

16✔
328
                for _, s := range msg.GetSignals() {
25✔
329
                        from, err := shared.NewNodeIDFromProto(s.GetSrcNodeId())
9✔
330
                        if err != nil {
9✔
331
                                return fmt.Errorf("failed to parse node id: %w", err)
×
332
                        }
×
333

334
                        switch content := s.GetContent().(type) {
9✔
335
                        case *proto.Signal_Offer:
7✔
336
                                var offerType signal.OfferType
7✔
337
                                switch content.Offer.Type {
7✔
338
                                case proto.SignalOffer_TYPE_EXPLICIT:
4✔
339
                                        offerType = signal.OfferTypeExplicit
4✔
340
                                case proto.SignalOffer_TYPE_NEXT:
3✔
341
                                        offerType = signal.OfferTypeNext
3✔
342
                                default:
×
343
                                        sa.logger.Warn("unknown offer type")
×
344
                                        continue
×
345
                                }
346
                                go sa.handler.SeedRecvSignalOffer(from, &signal.Offer{
7✔
347
                                        OfferID:   content.Offer.OfferId,
7✔
348
                                        OfferType: offerType,
7✔
349
                                        Sdp:       content.Offer.Sdp,
7✔
350
                                })
7✔
351

352
                        case *proto.Signal_Answer:
1✔
353
                                go sa.handler.SeedRecvSignalAnswer(from, &signal.Answer{
1✔
354
                                        OfferID: content.Answer.OfferId,
1✔
355
                                        Status:  signal.AnswerStatus(content.Answer.Status),
1✔
356
                                        Sdp:     content.Answer.Sdp,
1✔
357
                                })
1✔
358

359
                        case *proto.Signal_Ice:
1✔
360
                                go sa.handler.SeedRecvSignalICE(from, &signal.ICE{
1✔
361
                                        OfferID: content.Ice.OfferId,
1✔
362
                                        Ices:    content.Ice.Ices,
1✔
363
                                })
1✔
364

365
                        default:
×
366
                                sa.logger.Warn("unknown signal type", slog.String("type", fmt.Sprintf("%T", content)))
×
367
                        }
368
                }
369
        }
370

371
        return res.Err()
7✔
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc