• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llamerada-jp / colonio / 18451467117

13 Oct 2025 12:12AM UTC coverage: 62.183% (-6.8%) from 68.948%
18451467117

Pull #107

github

llamerada-jp
wip

Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
Pull Request #107: implement KVS feature

297 of 923 new or added lines in 14 files covered. (32.18%)

5 existing lines in 2 files now uncovered.

3111 of 5003 relevant lines covered (62.18%)

34.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.67
/seed/server/server.go
1
/*
2
 * Copyright 2017- Yuji Ito <llamerada.jp@gmail.com>
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package server
17

18
import (
19
        "context"
20
        "log/slog"
21
        "net/http"
22

23
        "connectrpc.com/connect"
24
        "github.com/gorilla/sessions"
25
        proto "github.com/llamerada-jp/colonio/api/colonio/v1alpha"
26
        service "github.com/llamerada-jp/colonio/api/colonio/v1alpha/v1alphaconnect"
27
        "github.com/llamerada-jp/colonio/internal/shared"
28
        "github.com/llamerada-jp/colonio/seed/controller"
29
        "github.com/llamerada-jp/colonio/seed/misc"
30
)
31

32
type ContextKey int
33

34
const (
35
        ContextKeySession ContextKey = iota
36
)
37

38
type Options struct {
39
        Logger       *slog.Logger
40
        SessionStore sessions.Store
41
        Controller   controller.Controller
42
}
43

44
type Server struct {
45
        logger         *slog.Logger
46
        sessionStore   sessions.Store
47
        controller     controller.Controller
48
        connectHandler http.Handler
49
        service.UnimplementedSeedServiceHandler
50
}
51

52
/**
53
 * NewServer creates an http handler to provide colonio's seed.
54
 */
55
func NewServer(options Options) *Server {
7✔
56
        return &Server{
7✔
57
                logger:       options.Logger,
7✔
58
                sessionStore: options.SessionStore,
7✔
59
                controller:   options.Controller,
7✔
60
        }
7✔
61
}
7✔
62

63
func (c *Server) RegisterService(mux *http.ServeMux) {
7✔
64
        path, handler := service.NewSeedServiceHandler(c)
7✔
65
        c.connectHandler = handler
7✔
66
        mux.Handle(path, c)
7✔
67
}
7✔
68

69
type bypass struct {
70
        http.ResponseWriter
71
        http.Flusher
72
        statusCode int
73
        bodySize   int
74
}
75

76
func (b *bypass) WriteHeader(statusCode int) {
4✔
77
        b.statusCode = statusCode
4✔
78
        b.ResponseWriter.WriteHeader(statusCode)
4✔
79
}
4✔
80

81
func (b *bypass) Write(data []byte) (int, error) {
28✔
82
        b.bodySize += len(data)
28✔
83
        return b.ResponseWriter.Write(data)
28✔
84
}
28✔
85

86
func (c *Server) ServeHTTP(response http.ResponseWriter, request *http.Request) {
20✔
87
        // create session
20✔
88
        session, err := newSession(c.sessionStore, request, response)
20✔
89
        if err != nil {
20✔
90
                c.logger.Error("failed to create session", slog.String("error", err.Error()))
×
91
                http.Error(response, "session error", http.StatusInternalServerError)
×
92
                return
×
93
        }
×
94
        ctxWithSession := context.WithValue(request.Context(), ContextKeySession, session)
20✔
95
        nodeID := session.getNodeID()
20✔
96
        ctx := misc.NewLoggerContext(ctxWithSession, nodeID)
20✔
97
        logger := misc.NewLogger(ctx, c.logger)
20✔
98

20✔
99
        logger.Info("request",
20✔
100
                slog.String("from", request.RemoteAddr),
20✔
101
                slog.String("method", request.Method),
20✔
102
                slog.String("uri", request.RequestURI),
20✔
103
                slog.Int("contentLength", int(request.ContentLength)))
20✔
104

20✔
105
        bypass := &bypass{
20✔
106
                ResponseWriter: response,
20✔
107
                Flusher:        response.(http.Flusher),
20✔
108
                statusCode:     http.StatusOK,
20✔
109
        }
20✔
110

20✔
111
        defer func() {
40✔
112
                logger.Info("response",
20✔
113
                        slog.Int("statusCode", bypass.statusCode),
20✔
114
                        slog.Int("bodySize", bypass.bodySize))
20✔
115
        }()
20✔
116

117
        c.connectHandler.ServeHTTP(bypass, request.WithContext(ctx))
20✔
118
}
119

120
func (c *Server) AssignNode(ctx context.Context, _ *connect.Request[proto.AssignNodeRequest]) (*connect.Response[proto.AssignNodeResponse], error) {
7✔
121
        logger := misc.NewLogger(ctx, c.logger)
7✔
122
        session := ctx.Value(ContextKeySession).(*session)
7✔
123

7✔
124
        // check if the session is already assigned
7✔
125
        if nodeID := session.getNodeID(); nodeID != nil {
7✔
126
                if err := c.controller.UnassignNode(ctx, nodeID); err != nil {
×
127
                        logger.Warn("failed to unassign node", slog.String("error", err.Error()))
×
128
                        return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
129
                }
×
130
        }
131

132
        // assign a new node
133
        nodeID, isAlone, err := c.controller.AssignNode(ctx)
7✔
134
        if err != nil {
7✔
135
                logger.Warn("failed to assign node", slog.String("error", err.Error()))
×
136
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
137
        }
×
138

139
        session.setNodeID(nodeID)
7✔
140
        if err = session.write(); err != nil {
7✔
141
                logger.Warn("failed to write session", slog.String("error", err.Error()))
×
142
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
143
        }
×
144

145
        return &connect.Response[proto.AssignNodeResponse]{
7✔
146
                Msg: &proto.AssignNodeResponse{
7✔
147
                        NodeId:  nodeID.Proto(),
7✔
148
                        IsAlone: isAlone,
7✔
149
                },
7✔
150
        }, nil
7✔
151
}
152

153
func (c *Server) UnassignNode(ctx context.Context, _ *connect.Request[proto.UnassignNodeRequest]) (*connect.Response[proto.UnassignNodeResponse], error) {
3✔
154
        logger := misc.NewLogger(ctx, c.logger)
3✔
155
        session := ctx.Value(ContextKeySession).(*session)
3✔
156
        defer func() {
6✔
157
                if session != nil {
6✔
158
                        session.delete()
3✔
159
                }
3✔
160
        }()
161

162
        nodeID := session.getNodeID()
3✔
163
        if nodeID == nil {
5✔
164
                return &connect.Response[proto.UnassignNodeResponse]{}, nil
2✔
165
        }
2✔
166

167
        if err := c.controller.UnassignNode(ctx, nodeID); err != nil {
1✔
168
                logger.Warn("failed to unassign node", slog.String("error", err.Error()))
×
169
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
170
        }
×
171

172
        return &connect.Response[proto.UnassignNodeResponse]{}, nil
1✔
173
}
174

175
func (c *Server) Keepalive(ctx context.Context, _ *connect.Request[proto.KeepaliveRequest]) (*connect.Response[proto.KeepaliveResponse], error) {
2✔
176
        logger := misc.NewLogger(ctx, c.logger)
2✔
177
        session := ctx.Value(ContextKeySession).(*session)
2✔
178
        nodeID := session.getNodeID()
2✔
179
        if nodeID == nil {
3✔
180
                logger.Warn("session error (node id is not found)")
1✔
181
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
1✔
182
        }
1✔
183

184
        isAlone, err := c.controller.Keepalive(ctx, nodeID)
1✔
185
        if err != nil {
1✔
186
                logger.Warn("failed to keepalive", slog.String("error", err.Error()))
×
187
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
188
        }
×
189

190
        // update session lifetime
191
        if err = session.write(); err != nil {
1✔
192
                logger.Warn("failed to write session", slog.String("error", err.Error()))
×
193
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
194
        }
×
195

196
        return &connect.Response[proto.KeepaliveResponse]{
1✔
197
                Msg: &proto.KeepaliveResponse{
1✔
198
                        IsAlone: isAlone,
1✔
199
                },
1✔
200
        }, nil
1✔
201
}
202

203
func (c *Server) ReconcileNextNodes(ctx context.Context, request *connect.Request[proto.ReconcileNextNodesRequest]) (*connect.Response[proto.ReconcileNextNodesResponse], error) {
2✔
204
        logger := misc.NewLogger(ctx, c.logger)
2✔
205
        session := ctx.Value(ContextKeySession).(*session)
2✔
206
        nodeID := session.getNodeID()
2✔
207
        if nodeID == nil {
3✔
208
                logger.Warn("session error (node id is not found)")
1✔
209
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
1✔
210
        }
1✔
211

212
        nextNodeIDs, err := shared.ConvertNodeIDsFromProto(request.Msg.GetNextNodeIds())
1✔
213
        if err != nil {
1✔
214
                logger.Warn("next_node_ids contains invalid", slog.String("error", err.Error()))
×
215
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
216
        }
×
217

218
        disconnectedIDs, err := shared.ConvertNodeIDsFromProto(request.Msg.GetDisconnectedNodeIds())
1✔
219
        if err != nil {
1✔
220
                logger.Warn("disconnected_node_ids contains invalid", slog.String("error", err.Error()))
×
221
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
222
        }
×
223

224
        matched, err := c.controller.ReconcileNextNodes(ctx, nodeID, nextNodeIDs, disconnectedIDs)
1✔
225
        if err != nil {
1✔
226
                logger.Warn("failed to reconcile next nodes", slog.String("error", err.Error()))
×
227
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
228
        }
×
229

230
        return &connect.Response[proto.ReconcileNextNodesResponse]{
1✔
231
                Msg: &proto.ReconcileNextNodesResponse{
1✔
232
                        Matched: matched,
1✔
233
                },
1✔
234
        }, nil
1✔
235
}
236

237
func (c *Server) SendSignal(ctx context.Context, request *connect.Request[proto.SendSignalRequest]) (*connect.Response[proto.SendSignalResponse], error) {
2✔
238
        logger := misc.NewLogger(ctx, c.logger)
2✔
239
        session := ctx.Value(ContextKeySession).(*session)
2✔
240

2✔
241
        nodeID := session.getNodeID()
2✔
242
        if nodeID == nil {
3✔
243
                logger.Warn("session error (node id is not found)")
1✔
244
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
1✔
245
        }
1✔
246

247
        if err := c.controller.SendSignal(ctx, nodeID, request.Msg.GetSignal()); err != nil {
1✔
248
                logger.Warn("failed to send signal", slog.String("error", err.Error()))
×
249
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
250
        }
×
251

252
        return &connect.Response[proto.SendSignalResponse]{
1✔
253
                Msg: &proto.SendSignalResponse{},
1✔
254
        }, nil
1✔
255
}
256

257
func (c *Server) PollSignal(ctx context.Context, _ *connect.Request[proto.PollSignalRequest], stream *connect.ServerStream[proto.PollSignalResponse]) error {
2✔
258
        logger := misc.NewLogger(ctx, c.logger)
2✔
259
        session := ctx.Value(ContextKeySession).(*session)
2✔
260
        nodeID := session.getNodeID()
2✔
261
        if nodeID == nil {
3✔
262
                logger.Warn("session error (node id is not found)")
1✔
263
                return connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
1✔
264
        }
1✔
265

266
        // send the first packet immediately to notify the connection
267
        if err := stream.Send(&proto.PollSignalResponse{
1✔
268
                Signals: []*proto.Signal{},
1✔
269
        }); err != nil {
1✔
270
                logger.Warn("failed to send initial response", slog.String("error", err.Error()))
×
271
                return connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
272
        }
×
273

274
        if err := c.controller.PollSignal(ctx, nodeID, func(s *proto.Signal) error {
3✔
275
                return stream.Send(&proto.PollSignalResponse{
2✔
276
                        Signals: []*proto.Signal{s},
2✔
277
                })
2✔
278
        }); err != nil {
2✔
279
                logger.Warn("failed to poll signal", slog.String("error", err.Error()))
×
280
                return connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
281
        }
×
282

283
        return nil
1✔
284
}
285

286
func (c *Server) StateKvs(ctx context.Context, request *connect.Request[proto.StateKvsRequest]) (*connect.Response[proto.StateKvsResponse], error) {
2✔
287
        logger := misc.NewLogger(ctx, c.logger)
2✔
288
        session := ctx.Value(ContextKeySession).(*session)
2✔
289

2✔
290
        nodeID := session.getNodeID()
2✔
291
        if nodeID == nil {
3✔
292
                logger.Warn("session error (node id is not found)")
1✔
293
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
1✔
294
        }
1✔
295

296
        var active bool
1✔
297
        switch request.Msg.GetState() {
1✔
298
        case proto.KvsState_KVS_STATE_ACTIVE:
1✔
299
                active = true
1✔
NEW
300
        case proto.KvsState_KVS_STATE_INACTIVE:
×
NEW
301
                active = false
×
NEW
302
        default:
×
NEW
303
                logger.Warn("invalid KVS state", slog.Int("state", int(request.Msg.GetState())))
×
NEW
304
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
305
        }
306

307
        entireState, err := c.controller.StateKvs(ctx, nodeID, active)
1✔
308
        if err != nil {
1✔
NEW
309
                logger.Warn("failed to set KVS state", slog.String("error", err.Error()))
×
NEW
310
                return nil, connect.NewError(connect.CodeInternal, misc.ErrorByContext(ctx))
×
NEW
311
        }
×
312

313
        return &connect.Response[proto.StateKvsResponse]{
1✔
314
                Msg: &proto.StateKvsResponse{
1✔
315
                        EntireState: proto.KvsState(entireState),
1✔
316
                },
1✔
317
        }, nil
1✔
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc