• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

heathcliff26 / fleetlock / 25077654398

28 Apr 2026 09:05PM UTC coverage: 80.031% (-0.2%) from 80.237%
25077654398

push

github

bot-ahsoka[bot]
fix(container): update image docker.io/library/alpine to v3.23.4

1551 of 1938 relevant lines covered (80.03%)

30.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.37
/pkg/server/server.go
1
package server
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "net/http"
9
        "regexp"
10
        "strings"
11
        "time"
12

13
        "github.com/heathcliff26/fleetlock/pkg/api"
14
        "github.com/heathcliff26/fleetlock/pkg/k8s"
15
        lockmanager "github.com/heathcliff26/fleetlock/pkg/lock-manager"
16
        "github.com/heathcliff26/simple-fileserver/pkg/middleware"
17
)
18

19
const groupValidationPattern = "^[a-zA-Z0-9.-]+$"
20

21
var groupValidationRegex = regexp.MustCompile(groupValidationPattern)
22

23
type Server struct {
24
        cfg *ServerConfig
25
        lm  *lockmanager.LockManager
26
        k8s *k8s.Client
27

28
        httpServer *http.Server
29
}
30

31
// Create a new Server
32
func NewServer(cfg *ServerConfig, groups lockmanager.Groups, storageCfg lockmanager.StorageConfig, k8s *k8s.Client) (*Server, error) {
2✔
33
        lm, err := lockmanager.NewManager(groups, storageCfg)
2✔
34
        if err != nil {
3✔
35
                return nil, err
1✔
36
        }
1✔
37

38
        if k8s == nil {
1✔
39
                slog.Info("No kubernetes client available, will not drain nodes")
×
40
        }
×
41

42
        return &Server{
1✔
43
                cfg: cfg,
1✔
44
                lm:  lm,
1✔
45
                k8s: k8s,
1✔
46
        }, nil
1✔
47
}
48

49
// Main entrypoint for new requests
50
func (s *Server) requestHandler(rw http.ResponseWriter, req *http.Request) {
6✔
51
        var handleFunc func(http.ResponseWriter, api.FleetLockRequest)
6✔
52
        switch req.URL.String() {
6✔
53
        case "/v1/pre-reboot":
1✔
54
                handleFunc = s.handleReserve
1✔
55
        case "/v1/steady-state":
5✔
56
                handleFunc = s.handleRelease
5✔
57
        }
58

59
        // Verify FleetLock header is set
60
        if strings.ToLower(req.Header.Get("fleet-lock-protocol")) != "true" {
7✔
61
                slog.Debug("Received request with missing or wrong fleet-lock-protocol header", slog.String("remote", ReadUserIP(req)))
1✔
62
                rw.WriteHeader(http.StatusBadRequest)
1✔
63
                sendResponse(rw, msgMissingFleetLockHeader)
1✔
64
                return
1✔
65
        }
1✔
66

67
        params, err := api.ParseRequest(req.Body)
5✔
68
        if err != nil {
6✔
69
                slog.Debug("Failed to parse request", "error", err, slog.String("remote", ReadUserIP(req)))
1✔
70
                rw.WriteHeader(http.StatusBadRequest)
1✔
71
                sendResponse(rw, msgRequestParseFailed)
1✔
72
                return
1✔
73
        }
1✔
74

75
        if strings.Contains(params.Client.Group, "\n") || !groupValidationRegex.MatchString(params.Client.Group) {
5✔
76
                slog.Debug("Request contained invalid characters for group", slog.String("group", params.Client.Group), slog.String("remote", ReadUserIP(req)))
1✔
77
                rw.WriteHeader(http.StatusBadRequest)
1✔
78
                sendResponse(rw, msgInvalidGroupValue)
1✔
79
                return
1✔
80
        }
1✔
81

82
        if params.Client.ID == "" {
4✔
83
                slog.Debug("Request did not contain an id", slog.String("remote", ReadUserIP(req)))
1✔
84
                rw.WriteHeader(http.StatusBadRequest)
1✔
85
                sendResponse(rw, msgEmptyID)
1✔
86
                return
1✔
87
        }
1✔
88

89
        handleFunc(rw, params)
2✔
90
}
91

92
// Handle requests to reserve a slot
93
//
94
//        URL: /v1/pre-reboot
95
func (s *Server) handleReserve(rw http.ResponseWriter, params api.FleetLockRequest) {
7✔
96
        ok, err := s.lm.Reserve(params.Client.Group, params.Client.ID)
7✔
97
        if err != nil {
8✔
98
                slog.Error("Failed to reserve slot", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
1✔
99
                rw.WriteHeader(http.StatusInternalServerError)
1✔
100
                sendResponse(rw, msgUnexpectedError)
1✔
101
                return
1✔
102
        }
1✔
103

104
        if ok {
11✔
105
                slog.Info("Reserved slot", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
5✔
106
                if s.k8s != nil && !s.drainNode(rw, params) {
6✔
107
                        return
1✔
108
                }
1✔
109
                sendResponse(rw, msgSuccess)
4✔
110
        } else {
1✔
111
                slog.Debug("Could not reserve slot, all slots where filled", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
1✔
112
                rw.WriteHeader(http.StatusLocked)
1✔
113
                sendResponse(rw, msgSlotsFull)
1✔
114
        }
1✔
115
}
116

117
// Handle requests to release a slot
118
//
119
//        URL: /v1/steady-state
120
func (s *Server) handleRelease(rw http.ResponseWriter, params api.FleetLockRequest) {
3✔
121
        if s.k8s != nil && !s.uncordonNode(rw, params) {
3✔
122
                return
×
123
        }
×
124

125
        err := s.lm.Release(params.Client.Group, params.Client.ID)
3✔
126
        if err != nil {
4✔
127
                slog.Error("Failed to release slot", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
1✔
128
                rw.WriteHeader(http.StatusInternalServerError)
1✔
129
                sendResponse(rw, msgUnexpectedError)
1✔
130
                return
1✔
131
        }
1✔
132
        slog.Info("Released slot", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
2✔
133
        sendResponse(rw, msgSuccess)
2✔
134
}
135

136
// Drain the node after reservation and before sending success to api.
137
// Requires k8s client to be non-nil.
138
func (s *Server) drainNode(rw http.ResponseWriter, params api.FleetLockRequest) bool {
3✔
139
        node, ok := s.matchNodeToId(rw, params)
3✔
140
        if node == "" {
4✔
141
                return ok
1✔
142
        }
1✔
143

144
        drained, err := s.k8s.IsDrained(node)
2✔
145
        if err != nil {
2✔
146
                slog.Error("Could not check if node has been drained", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
×
147
                rw.WriteHeader(http.StatusInternalServerError)
×
148
                sendResponse(rw, msgUnexpectedError)
×
149
                return false
×
150
        }
×
151
        if drained {
3✔
152
                slog.Info("Node is drained, client can continue", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
1✔
153
                return true
1✔
154
        }
1✔
155

156
        go func() {
2✔
157
                err := s.k8s.DrainNode(node)
1✔
158
                if err != nil {
1✔
159
                        slog.Error("Failed to drain node", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
×
160
                } else {
1✔
161
                        slog.Info("Node finished draining, waiting for client to call again", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
1✔
162
                }
1✔
163
        }()
164

165
        // Return non-200 status to indicate the request is successful but the client needs to wait as it is still being processed.
166
        rw.WriteHeader(http.StatusAccepted)
1✔
167
        sendResponse(rw, msgWaitingForNodeDrain)
1✔
168
        return false
1✔
169
}
170

171
// Uncordon the node before release.
172
// Requires k8s client to be non-nil.
173
func (s *Server) uncordonNode(rw http.ResponseWriter, params api.FleetLockRequest) bool {
2✔
174
        node, ok := s.matchNodeToId(rw, params)
2✔
175
        if node == "" {
3✔
176
                return ok
1✔
177
        }
1✔
178

179
        err := s.k8s.UncordonNode(node)
1✔
180
        if err != nil {
1✔
181
                slog.Error("Failed to uncordon node", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
×
182
                rw.WriteHeader(http.StatusInternalServerError)
×
183
                sendResponse(rw, msgUnexpectedError)
×
184
                return false
×
185
        }
×
186
        slog.Info("Uncordoned node", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID), slog.String("node", node))
1✔
187
        return true
1✔
188
}
189

190
func (s *Server) matchNodeToId(rw http.ResponseWriter, params api.FleetLockRequest) (string, bool) {
5✔
191
        node, err := s.k8s.FindNodeByZincatiID(params.Client.ID)
5✔
192
        if err != nil {
5✔
193
                slog.Error("An error occured when matching client id to node", "error", err, slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
×
194
                rw.WriteHeader(http.StatusInternalServerError)
×
195
                sendResponse(rw, msgUnexpectedError)
×
196
                return "", false
×
197
        }
×
198

199
        if node == "" {
7✔
200
                slog.Info("Did not find a matching node for id", slog.String("group", params.Client.Group), slog.String("id", params.Client.ID))
2✔
201
        }
2✔
202

203
        return node, true
5✔
204
}
205

206
// Return a health status of the server
207
// URL: /healthz
208
func (s *Server) handleHealthCheck(rw http.ResponseWriter, _ *http.Request) {
1✔
209
        rw.Header().Set("Content-Type", "application/json")
1✔
210
        status := api.FleetlockHealthResponse{
1✔
211
                Status: "ok",
1✔
212
        }
1✔
213
        sendResponse(rw, status)
1✔
214
}
1✔
215

216
// Prepare the http server for usage.
217
// This is in a separate function to allow testing the handler without running the server.
218
func (s *Server) createHTTPServer() {
1✔
219
        router := http.NewServeMux()
1✔
220
        router.HandleFunc("POST /v1/pre-reboot", s.requestHandler)
1✔
221
        router.HandleFunc("POST /v1/steady-state", s.requestHandler)
1✔
222
        router.HandleFunc("GET /healthz", s.handleHealthCheck)
1✔
223

1✔
224
        s.httpServer = &http.Server{
1✔
225
                Addr:         s.cfg.Listen,
1✔
226
                Handler:      middleware.Logging(router),
1✔
227
                ReadTimeout:  10 * time.Second,
1✔
228
                WriteTimeout: 10 * time.Second,
1✔
229
        }
1✔
230
}
1✔
231

232
// Starts the server and exits with error if that fails
233
func (s *Server) Run() error {
×
234
        if s.httpServer != nil {
×
235
                return fmt.Errorf("server already started")
×
236
        }
×
237

238
        s.createHTTPServer()
×
239
        defer func() {
×
240
                s.httpServer = nil
×
241
        }()
×
242

243
        var err error
×
244
        if s.cfg.SSL.Enabled {
×
245
                slog.Info("Starting server with SSL", slog.String("address", s.cfg.Listen))
×
246
                err = s.httpServer.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key)
×
247
        } else {
×
248
                slog.Info("Starting server", slog.String("address", s.cfg.Listen))
×
249
                err = s.httpServer.ListenAndServe()
×
250
        }
×
251
        // This just means the server was closed after running
252
        if errors.Is(err, http.ErrServerClosed) {
×
253
                slog.Info("Server closed, exiting")
×
254
                return nil
×
255
        }
×
256
        return fmt.Errorf("failed to start server: %w", err)
×
257
}
258

259
func (s *Server) Shutdown() error {
2✔
260
        if s.httpServer == nil {
3✔
261
                return nil
1✔
262
        }
1✔
263

264
        slog.Info("Shutting down server")
1✔
265
        err := s.httpServer.Shutdown(context.Background())
1✔
266
        if err != nil {
1✔
267
                return fmt.Errorf("failed to shutdown server: %w", err)
×
268
        }
×
269
        slog.Info("Server shutdown complete")
1✔
270
        return nil
1✔
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc