• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 21295900463

23 Jan 2026 05:55PM UTC coverage: 78.878% (-0.5%) from 79.395%
21295900463

Pull #17

github

melonamin
fix: address linting issues and add security hardening

- Fix variable shadowing in main.go (branchErr, gitignoreErr)
- Add path traversal validation for plan file requests
- Track and log dropped SSE events for slow clients
- Add warnings for invalid watch directories
- Update tests to use relative plan paths
Pull Request #17: feat: add web dashboard with real-time streaming and multi-session support

1513 of 1931 new or added lines in 19 files covered. (78.35%)

10 existing lines in 3 files now uncovered.

3066 of 3887 relevant lines covered (78.88%)

193.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.05
/pkg/web/server.go
1
package web
2

3
import (
4
        "context"
5
        "embed"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "html/template"
10
        "io/fs"
11
        "log"
12
        "net/http"
13
        "path/filepath"
14
        "sort"
15
        "strings"
16
        "sync"
17
        "time"
18
)
19

20
//go:embed templates static
21
var embeddedFS embed.FS
22

23
// sseHistoryFlushInterval is the number of history events to send before flushing.
24
// this prevents buffering too much data when sending large event histories to clients.
25
const sseHistoryFlushInterval = 100
26

27
// ServerConfig holds configuration for the web server.
28
type ServerConfig struct {
29
        Port     int    // port to listen on
30
        PlanName string // plan name to display in dashboard
31
        Branch   string // git branch name
32
        PlanFile string // path to plan file for /api/plan endpoint
33
}
34

35
// Server provides HTTP server for the real-time dashboard.
36
type Server struct {
37
        cfg    ServerConfig
38
        hub    *Hub            // used for single-session mode (direct execution)
39
        buffer *Buffer         // used for single-session mode (direct execution)
40
        sm     *SessionManager // used for multi-session mode (dashboard)
41
        srv    *http.Server
42
        tmpl   *template.Template
43

44
        // plan caching - set after first successful load (single-session mode)
45
        planMu    sync.Mutex
46
        planCache *Plan
47
}
48

49
// NewServer creates a new web server for single-session mode (direct execution).
50
// returns an error if the embedded template fails to parse.
51
func NewServer(cfg ServerConfig, hub *Hub, buffer *Buffer) (*Server, error) {
17✔
52
        tmpl, err := template.ParseFS(embeddedFS, "templates/base.html")
17✔
53
        if err != nil {
17✔
NEW
54
                return nil, fmt.Errorf("parse template: %w", err)
×
NEW
55
        }
×
56

57
        return &Server{
17✔
58
                cfg:    cfg,
17✔
59
                hub:    hub,
17✔
60
                buffer: buffer,
17✔
61
                tmpl:   tmpl,
17✔
62
        }, nil
17✔
63
}
64

65
// NewServerWithSessions creates a new web server for multi-session mode (dashboard).
66
// returns an error if the embedded template fails to parse.
67
func NewServerWithSessions(cfg ServerConfig, sm *SessionManager) (*Server, error) {
11✔
68
        tmpl, err := template.ParseFS(embeddedFS, "templates/base.html")
11✔
69
        if err != nil {
11✔
NEW
70
                return nil, fmt.Errorf("parse template: %w", err)
×
NEW
71
        }
×
72

73
        return &Server{
11✔
74
                cfg:  cfg,
11✔
75
                sm:   sm,
11✔
76
                tmpl: tmpl,
11✔
77
        }, nil
11✔
78
}
79

80
// Start begins listening for HTTP requests.
81
// blocks until the server is stopped or an error occurs.
82
func (s *Server) Start(ctx context.Context) error {
1✔
83
        mux := http.NewServeMux()
1✔
84

1✔
85
        // register routes
1✔
86
        mux.HandleFunc("/", s.handleIndex)
1✔
87
        mux.HandleFunc("/events", s.handleEvents)
1✔
88
        mux.HandleFunc("/api/plan", s.handlePlan)
1✔
89
        mux.HandleFunc("/api/sessions", s.handleSessions)
1✔
90

1✔
91
        // static files
1✔
92
        staticFS, err := fs.Sub(embeddedFS, "static")
1✔
93
        if err != nil {
1✔
NEW
94
                return fmt.Errorf("static filesystem: %w", err)
×
NEW
95
        }
×
96
        mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(staticFS))))
1✔
97

1✔
98
        s.srv = &http.Server{
1✔
99
                Addr:              fmt.Sprintf("127.0.0.1:%d", s.cfg.Port),
1✔
100
                Handler:           mux,
1✔
101
                ReadHeaderTimeout: 10 * time.Second,
1✔
102
        }
1✔
103

1✔
104
        // start shutdown listener
1✔
105
        go func() {
2✔
106
                <-ctx.Done()
1✔
107
                shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1✔
108
                defer cancel()
1✔
109
                _ = s.srv.Shutdown(shutdownCtx)
1✔
110
        }()
1✔
111

112
        err = s.srv.ListenAndServe()
1✔
113
        if errors.Is(err, http.ErrServerClosed) {
2✔
114
                return nil
1✔
115
        }
1✔
NEW
116
        return fmt.Errorf("http server: %w", err)
×
117
}
118

119
// Stop gracefully shuts down the server.
120
func (s *Server) Stop() error {
4✔
121
        if s.srv == nil {
8✔
122
                return nil
4✔
123
        }
4✔
NEW
124
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
×
NEW
125
        defer cancel()
×
NEW
126
        if err := s.srv.Shutdown(ctx); err != nil {
×
NEW
127
                return fmt.Errorf("shutdown server: %w", err)
×
NEW
128
        }
×
NEW
129
        return nil
×
130
}
131

132
// Hub returns the server's event hub.
133
func (s *Server) Hub() *Hub {
2✔
134
        return s.hub
2✔
135
}
2✔
136

137
// Buffer returns the server's event buffer.
138
func (s *Server) Buffer() *Buffer {
2✔
139
        return s.buffer
2✔
140
}
2✔
141

142
// templateData holds data for the dashboard template.
143
type templateData struct {
144
        PlanName string
145
        Branch   string
146
}
147

148
// handleIndex serves the main dashboard page.
149
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
3✔
150
        if r.URL.Path != "/" {
4✔
151
                http.NotFound(w, r)
1✔
152
                return
1✔
153
        }
1✔
154

155
        w.Header().Set("Content-Type", "text/html; charset=utf-8")
2✔
156

2✔
157
        data := templateData{
2✔
158
                PlanName: s.cfg.PlanName,
2✔
159
                Branch:   s.cfg.Branch,
2✔
160
        }
2✔
161

2✔
162
        if err := s.tmpl.Execute(w, data); err != nil {
2✔
NEW
163
                http.Error(w, "template execution error", http.StatusInternalServerError)
×
NEW
164
                return
×
NEW
165
        }
×
166
}
167

168
// handlePlan serves the parsed plan as JSON.
169
// in single-session mode, uses the server's configured plan file with caching.
170
// in multi-session mode, accepts ?session=<id> to load plan from session metadata.
171
func (s *Server) handlePlan(w http.ResponseWriter, r *http.Request) {
12✔
172
        if r.Method != http.MethodGet {
15✔
173
                w.Header().Set("Allow", http.MethodGet)
3✔
174
                http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
3✔
175
                return
3✔
176
        }
3✔
177

178
        sessionID := r.URL.Query().Get("session")
9✔
179

9✔
180
        // multi-session mode with session ID
9✔
181
        if s.sm != nil && sessionID != "" {
14✔
182
                s.handleSessionPlan(w, sessionID)
5✔
183
                return
5✔
184
        }
5✔
185

186
        // single-session mode - use cached server plan
187
        if s.cfg.PlanFile == "" {
5✔
188
                http.Error(w, "no plan file configured", http.StatusNotFound)
1✔
189
                return
1✔
190
        }
1✔
191

192
        plan, err := s.loadPlan()
3✔
193
        if err != nil {
4✔
194
                log.Printf("[WARN] failed to load plan file %s: %v", s.cfg.PlanFile, err)
1✔
195
                http.Error(w, "unable to load plan", http.StatusInternalServerError)
1✔
196
                return
1✔
197
        }
1✔
198

199
        data, err := plan.JSON()
2✔
200
        if err != nil {
2✔
NEW
201
                log.Printf("[WARN] failed to encode plan: %v", err)
×
NEW
202
                http.Error(w, "unable to encode plan", http.StatusInternalServerError)
×
NEW
203
                return
×
NEW
204
        }
×
205

206
        w.Header().Set("Content-Type", "application/json")
2✔
207
        _, _ = w.Write(data)
2✔
208
}
209

210
// handleSessionPlan handles plan requests for a specific session in multi-session mode.
211
func (s *Server) handleSessionPlan(w http.ResponseWriter, sessionID string) {
5✔
212
        session := s.sm.Get(sessionID)
5✔
213
        if session == nil {
6✔
214
                http.Error(w, "session not found: "+sessionID, http.StatusNotFound)
1✔
215
                return
1✔
216
        }
1✔
217

218
        meta := session.GetMetadata()
4✔
219
        if meta.PlanPath == "" {
5✔
220
                http.Error(w, "no plan file for session", http.StatusNotFound)
1✔
221
                return
1✔
222
        }
1✔
223

224
        // validate plan path to prevent path traversal attacks
225
        if err := validatePlanPath(meta.PlanPath); err != nil {
4✔
226
                log.Printf("[WARN] invalid plan path for session %s: %v", sessionID, err)
1✔
227
                http.Error(w, "invalid plan path", http.StatusBadRequest)
1✔
228
                return
1✔
229
        }
1✔
230

231
        plan, err := loadPlanWithFallback(meta.PlanPath)
2✔
232
        if err != nil {
2✔
NEW
233
                log.Printf("[WARN] failed to load plan file %s: %v", meta.PlanPath, err)
×
NEW
234
                http.Error(w, "unable to load plan", http.StatusInternalServerError)
×
NEW
235
                return
×
NEW
236
        }
×
237

238
        data, err := plan.JSON()
2✔
239
        if err != nil {
2✔
NEW
240
                log.Printf("[WARN] failed to encode plan: %v", err)
×
NEW
241
                http.Error(w, "unable to encode plan", http.StatusInternalServerError)
×
NEW
242
                return
×
NEW
243
        }
×
244

245
        w.Header().Set("Content-Type", "application/json")
2✔
246
        _, _ = w.Write(data)
2✔
247
}
248

249
// loadPlan returns a cached plan or loads it from disk (with completed/ fallback).
250
func (s *Server) loadPlan() (*Plan, error) {
3✔
251
        s.planMu.Lock()
3✔
252
        defer s.planMu.Unlock()
3✔
253

3✔
254
        if s.planCache != nil {
3✔
NEW
255
                return s.planCache, nil
×
NEW
256
        }
×
257

258
        plan, err := ParsePlanFile(s.cfg.PlanFile)
3✔
259
        if err != nil && errors.Is(err, fs.ErrNotExist) {
5✔
260
                completedPath := filepath.Join(filepath.Dir(s.cfg.PlanFile), "completed", filepath.Base(s.cfg.PlanFile))
2✔
261
                plan, err = ParsePlanFile(completedPath)
2✔
262
        }
2✔
263
        if err != nil {
4✔
264
                return nil, err
1✔
265
        }
1✔
266

267
        s.planCache = plan
2✔
268
        return plan, nil
2✔
269
}
270

271
// loadPlanWithFallback loads a plan from disk with completed/ directory fallback.
272
// does not cache - each call reads from disk.
273
func loadPlanWithFallback(path string) (*Plan, error) {
5✔
274
        plan, err := ParsePlanFile(path)
5✔
275
        if err != nil && errors.Is(err, fs.ErrNotExist) {
8✔
276
                completedPath := filepath.Join(filepath.Dir(path), "completed", filepath.Base(path))
3✔
277
                plan, err = ParsePlanFile(completedPath)
3✔
278
        }
3✔
279
        return plan, err
5✔
280
}
281

282
// validatePlanPath checks if a plan path is safe to read.
283
// rejects absolute paths and paths containing ".." to prevent path traversal attacks.
284
// plan paths in progress files should always be relative to the project directory.
285
func validatePlanPath(path string) error {
12✔
286
        if path == "" {
13✔
287
                return errors.New("empty path")
1✔
288
        }
1✔
289

290
        // reject absolute paths - plan paths should always be relative
291
        if filepath.IsAbs(path) {
13✔
292
                return fmt.Errorf("absolute paths not allowed: %s", path)
2✔
293
        }
2✔
294

295
        // reject paths containing ".." components
296
        cleaned := filepath.Clean(path)
9✔
297
        if strings.HasPrefix(cleaned, "..") || strings.Contains(cleaned, string(filepath.Separator)+"..") {
12✔
298
                return fmt.Errorf("path traversal not allowed: %s", path)
3✔
299
        }
3✔
300

301
        return nil
6✔
302
}
303

304
// handleEvents serves the SSE stream.
305
// in multi-session mode, accepts ?session=<id> query parameter.
306
func (s *Server) handleEvents(w http.ResponseWriter, r *http.Request) {
8✔
307
        sessionID := r.URL.Query().Get("session")
8✔
308
        log.Printf("[SSE] connection start: session=%s", sessionID)
8✔
309

8✔
310
        // get session-specific hub and buffer
8✔
311
        hub, buffer, err := s.getSessionResources(r)
8✔
312
        if err != nil {
10✔
313
                log.Printf("[SSE] session not found: %s", sessionID)
2✔
314
                http.Error(w, err.Error(), http.StatusNotFound)
2✔
315
                return
2✔
316
        }
2✔
317

318
        // set SSE headers
319
        w.Header().Set("Content-Type", "text/event-stream")
6✔
320
        w.Header().Set("Cache-Control", "no-cache")
6✔
321
        w.Header().Set("Connection", "keep-alive")
6✔
322
        w.Header().Set("X-Accel-Buffering", "no") // disable nginx buffering
6✔
323

6✔
324
        // ensure we can flush
6✔
325
        flusher, ok := w.(http.Flusher)
6✔
326
        if !ok {
6✔
NEW
327
                http.Error(w, "streaming not supported", http.StatusInternalServerError)
×
NEW
328
                return
×
NEW
329
        }
×
330

331
        // subscribe to hub
332
        eventCh, err := hub.Subscribe()
6✔
333
        if err != nil {
7✔
334
                log.Printf("[SSE] subscribe failed: %v", err)
1✔
335
                http.Error(w, "too many connections", http.StatusServiceUnavailable)
1✔
336
                return
1✔
337
        }
1✔
338
        defer func() {
10✔
339
                hub.Unsubscribe(eventCh)
5✔
340
                log.Printf("[SSE] connection end: session=%s", sessionID)
5✔
341
        }()
5✔
342

343
        // send history first (with periodic flushes for large buffers)
344
        events := buffer.All()
5✔
345
        log.Printf("[SSE] sending %d history events: session=%s", len(events), sessionID)
5✔
346
        for i, event := range events {
10✔
347
                data, err := event.JSON()
5✔
348
                if err != nil {
5✔
NEW
349
                        continue
×
350
                }
351
                fmt.Fprintf(w, "data: %s\n\n", data)
5✔
352
                // flush periodically to avoid buffering too much
5✔
353
                if (i+1)%sseHistoryFlushInterval == 0 {
5✔
NEW
354
                        flusher.Flush()
×
NEW
355
                }
×
356
        }
357
        flusher.Flush()
5✔
358
        log.Printf("[SSE] history sent, entering event loop: session=%s", sessionID)
5✔
359

5✔
360
        // stream new events
5✔
361
        for {
11✔
362
                select {
6✔
363
                case event, ok := <-eventCh:
1✔
364
                        if !ok {
1✔
NEW
365
                                return // channel closed
×
NEW
366
                        }
×
367
                        data, err := event.JSON()
1✔
368
                        if err != nil {
1✔
NEW
369
                                continue
×
370
                        }
371
                        fmt.Fprintf(w, "data: %s\n\n", data)
1✔
372
                        flusher.Flush()
1✔
373

374
                case <-r.Context().Done():
5✔
375
                        return
5✔
376
                }
377
        }
378
}
379

380
// getSessionResources returns the hub and buffer for the request.
381
// in single-session mode, returns the server's hub/buffer.
382
// in multi-session mode, looks up the session by ID from query parameter.
383
func (s *Server) getSessionResources(r *http.Request) (*Hub, *Buffer, error) {
8✔
384
        sessionID := r.URL.Query().Get("session")
8✔
385

8✔
386
        // single-session mode (no session manager or no session ID)
8✔
387
        if s.sm == nil || sessionID == "" {
14✔
388
                if s.hub == nil || s.buffer == nil {
7✔
389
                        return nil, nil, errors.New("no session specified")
1✔
390
                }
1✔
391
                return s.hub, s.buffer, nil
5✔
392
        }
393

394
        // multi-session mode - look up session
395
        session := s.sm.Get(sessionID)
2✔
396
        if session == nil {
3✔
397
                log.Printf("[SSE] session lookup failed: %s (not in manager)", sessionID)
1✔
398
                return nil, nil, fmt.Errorf("session not found: %s", sessionID)
1✔
399
        }
1✔
400

401
        // defensive check for nil hub/buffer
402
        if session.Hub == nil || session.Buffer == nil {
1✔
NEW
403
                log.Printf("[SSE] session has nil hub/buffer: %s", sessionID)
×
NEW
404
                return nil, nil, fmt.Errorf("session not initialized: %s", sessionID)
×
NEW
405
        }
×
406

407
        return session.Hub, session.Buffer, nil
1✔
408
}
409

410
// SessionInfo represents session data for the API response.
411
type SessionInfo struct {
412
        ID           string       `json:"id"`
413
        State        SessionState `json:"state"`
414
        PlanPath     string       `json:"planPath,omitempty"`
415
        Branch       string       `json:"branch,omitempty"`
416
        Mode         string       `json:"mode,omitempty"`
417
        StartTime    time.Time    `json:"startTime"`
418
        LastModified time.Time    `json:"lastModified"`
419
}
420

421
// handleSessions returns a list of all discovered sessions.
422
func (s *Server) handleSessions(w http.ResponseWriter, r *http.Request) {
5✔
423
        if r.Method != http.MethodGet {
8✔
424
                w.Header().Set("Allow", http.MethodGet)
3✔
425
                http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
3✔
426
                return
3✔
427
        }
3✔
428

429
        // single-session mode - return empty list
430
        if s.sm == nil {
3✔
431
                w.Header().Set("Content-Type", "application/json")
1✔
432
                _, _ = w.Write([]byte("[]"))
1✔
433
                return
1✔
434
        }
1✔
435

436
        sessions := s.sm.All()
1✔
437

1✔
438
        // sort by last modified (most recent first)
1✔
439
        sort.Slice(sessions, func(i, j int) bool {
1✔
NEW
440
                return sessions[i].GetLastModified().After(sessions[j].GetLastModified())
×
NEW
441
        })
×
442

443
        // convert to API response format
444
        infos := make([]SessionInfo, 0, len(sessions))
1✔
445
        for _, session := range sessions {
2✔
446
                meta := session.GetMetadata()
1✔
447
                infos = append(infos, SessionInfo{
1✔
448
                        ID:           session.ID,
1✔
449
                        State:        session.GetState(),
1✔
450
                        PlanPath:     meta.PlanPath,
1✔
451
                        Branch:       meta.Branch,
1✔
452
                        Mode:         meta.Mode,
1✔
453
                        StartTime:    meta.StartTime,
1✔
454
                        LastModified: session.GetLastModified(),
1✔
455
                })
1✔
456
        }
1✔
457

458
        data, err := json.Marshal(infos)
1✔
459
        if err != nil {
1✔
NEW
460
                log.Printf("[WARN] failed to encode sessions: %v", err)
×
NEW
461
                http.Error(w, "unable to encode sessions", http.StatusInternalServerError)
×
NEW
462
                return
×
NEW
463
        }
×
464

465
        w.Header().Set("Content-Type", "application/json")
1✔
466
        _, _ = w.Write(data)
1✔
467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc