• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alexferl / zerohttp / 23093061092

14 Mar 2026 05:45PM UTC coverage: 93.164% (+0.5%) from 92.697%
23093061092

push

github

web-flow
feat: add AcceptsJSON and RenderAuto for content negotiation (#105)

* feat: add AcceptsJSON and RenderAuto for content negotiation

- Add AcceptsJSON function to detect JSON-capable clients
- Add RenderAuto method that returns JSON or plain text based on Accept header
- Add tests for both AcceptsJSON and RenderAuto
- Update middleware to use RenderAuto instead of Render

Signed-off-by: alexferl <me@alexferl.com>

* increase coverage

Signed-off-by: alexferl <me@alexferl.com>

---------

Signed-off-by: alexferl <me@alexferl.com>

64 of 67 new or added lines in 17 files covered. (95.52%)

7 existing lines in 2 files now uncovered.

8899 of 9552 relevant lines covered (93.16%)

76.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.94
/sse.go
1
package zerohttp
2

3
import (
4
        "context"
5
        "fmt"
6
        "io"
7
        "net/http"
8
        "strconv"
9
        "strings"
10
        "sync"
11
        "time"
12

13
        "github.com/alexferl/zerohttp/config"
14
)
15

16
// SSEConnection is an alias for config.SSEConnection.
17
type SSEConnection = config.SSEConnection
18

19
// SSEProvider is an alias for config.SSEProvider.
20
type SSEProvider = config.SSEProvider
21

22
// SSEEvent is an alias for config.SSEEvent.
23
type SSEEvent = config.SSEEvent
24

25
// SSE is the built-in SSE implementation using Go's standard library.
26
type SSE struct {
27
        w       http.ResponseWriter
28
        flusher http.Flusher
29
        ctx     context.Context
30
        cancel  context.CancelFunc
31
        closed  chan struct{}
32
        done    chan struct{} // Closed when monitor goroutine exits
33
        mu      sync.Mutex
34
        retry   time.Duration
35
}
36

37
// sseLineEndingReplacer normalizes CR, LF, and CRLF to LF for SSE spec compliance.
38
// The SSE spec allows lines to be terminated by CRLF, LF, or bare CR.
39
var sseLineEndingReplacer = strings.NewReplacer("\r\n", "\n", "\r", "\n")
40

41
func normalizeLineEndings(s string) string {
1,289✔
42
        return sseLineEndingReplacer.Replace(s)
1,289✔
43
}
1,289✔
44

45
// setupSSEResponse sets up the SSE headers and returns the flusher.
46
// This is a helper shared between NewSSE and NewSSEWriter.
47
//
48
// Note: The Content-Type check is a heuristic. It may false-positive if
49
// middleware sets Content-Type before the SSE handler runs. Consider avoiding
50
// Content-Type middleware on SSE routes.
51
func setupSSEResponse(w http.ResponseWriter) (http.Flusher, error) {
209✔
52
        if w.Header().Get(HeaderContentType) != "" {
211✔
53
                return nil, fmt.Errorf("sse: response headers already sent")
2✔
54
        }
2✔
55

56
        flusher, ok := w.(http.Flusher)
207✔
57
        if !ok {
207✔
58
                return nil, fmt.Errorf("sse: streaming not supported")
×
59
        }
×
60

61
        w.Header().Set(HeaderContentType, MIMETextEventStream)
207✔
62
        w.Header().Set(HeaderCacheControl, CacheControlNoCache)
207✔
63
        w.Header().Set(HeaderConnection, ConnectionKeepAlive)
207✔
64

207✔
65
        w.WriteHeader(http.StatusOK)
207✔
66
        flusher.Flush()
207✔
67

207✔
68
        return flusher, nil
207✔
69
}
70

71
// NewSSE creates a new SSE connection using stdlib.
72
// This sets the appropriate headers and prepares the connection for streaming.
73
func NewSSE(w http.ResponseWriter, r *http.Request) (*SSE, error) {
201✔
74
        flusher, err := setupSSEResponse(w)
201✔
75
        if err != nil {
202✔
76
                return nil, err
1✔
77
        }
1✔
78

79
        ctx, cancel := context.WithCancel(r.Context())
200✔
80

200✔
81
        stream := &SSE{
200✔
82
                w:       w,
200✔
83
                flusher: flusher,
200✔
84
                ctx:     ctx,
200✔
85
                cancel:  cancel,
200✔
86
                closed:  make(chan struct{}),
200✔
87
                done:    make(chan struct{}),
200✔
88
        }
200✔
89

200✔
90
        // Monitor context cancellation
200✔
91
        go func() {
400✔
92
                defer close(stream.done) // Signal goroutine exit
200✔
93
                select {
200✔
94
                case <-ctx.Done():
80✔
95
                        _ = stream.Close()
80✔
96
                case <-stream.closed:
102✔
97
                }
98
        }()
99

100
        return stream, nil
200✔
101
}
102

103
// Send writes an event to the client.
104
func (s *SSE) Send(event SSEEvent) error {
1,848✔
105
        s.mu.Lock()
1,848✔
106
        defer s.mu.Unlock()
1,848✔
107

1,848✔
108
        select {
1,848✔
109
        case <-s.closed:
286✔
110
                return fmt.Errorf("sse: connection closed")
286✔
111
        case <-s.ctx.Done():
272✔
112
                return fmt.Errorf("sse: context cancelled")
272✔
113
        default:
1,290✔
114
        }
115

116
        // Validate ID and Name per SSE spec: must not contain CR, LF, or NULL
117
        if strings.ContainsAny(event.ID, "\r\n\x00") {
1,293✔
118
                return fmt.Errorf("sse: event ID must not contain CR, LF, or NULL")
3✔
119
        }
3✔
120
        if strings.ContainsAny(event.Name, "\r\n") {
1,289✔
121
                return fmt.Errorf("sse: event name must not contain CR or LF")
2✔
122
        }
2✔
123

124
        var buf strings.Builder
1,285✔
125

1,285✔
126
        if event.ID != "" {
1,499✔
127
                buf.WriteString("id: ")
214✔
128
                buf.WriteString(event.ID)
214✔
129
                buf.WriteByte('\n')
214✔
130
        }
214✔
131

132
        if event.Name != "" {
1,286✔
133
                buf.WriteString("event: ")
1✔
134
                buf.WriteString(event.Name)
1✔
135
                buf.WriteByte('\n')
1✔
136
        }
1✔
137

138
        retry := event.Retry
1,285✔
139
        if retry == 0 {
2,569✔
140
                retry = s.retry
1,284✔
141
        }
1,284✔
142
        if retry > 0 {
1,287✔
143
                buf.WriteString("retry: ")
2✔
144
                buf.WriteString(strconv.FormatInt(retry.Milliseconds(), 10))
2✔
145
                buf.WriteByte('\n')
2✔
146
        }
2✔
147

148
        if len(event.Data) > 0 {
2,570✔
149
                lines := strings.Split(normalizeLineEndings(string(event.Data)), "\n")
1,285✔
150
                for _, line := range lines {
2,572✔
151
                        buf.WriteString("data: ")
1,287✔
152
                        buf.WriteString(line)
1,287✔
153
                        buf.WriteByte('\n')
1,287✔
154
                }
1,287✔
155
        } else {
×
156
                buf.WriteString("data: \n")
×
157
        }
×
158

159
        // Empty line terminates the event
160
        buf.WriteByte('\n')
1,285✔
161

1,285✔
162
        _, err := io.WriteString(s.w, buf.String())
1,285✔
163
        if err != nil {
1,288✔
164
                return fmt.Errorf("sse: write error: %w", err)
3✔
165
        }
3✔
166

167
        s.flusher.Flush()
1,282✔
168
        return nil
1,282✔
169
}
170

171
// SendComment sends a comment (heartbeat/keepalive).
172
func (s *SSE) SendComment(comment string) error {
3✔
173
        s.mu.Lock()
3✔
174
        defer s.mu.Unlock()
3✔
175

3✔
176
        select {
3✔
177
        case <-s.closed:
1✔
178
                return fmt.Errorf("sse: connection closed")
1✔
UNCOV
179
        case <-s.ctx.Done():
×
UNCOV
180
                return fmt.Errorf("sse: context cancelled")
×
181
        default:
2✔
182
        }
183

184
        // Comments start with colon
185
        lines := strings.Split(normalizeLineEndings(comment), "\n")
2✔
186
        var buf strings.Builder
2✔
187
        for _, line := range lines {
4✔
188
                buf.WriteString(": ")
2✔
189
                buf.WriteString(line)
2✔
190
                buf.WriteByte('\n')
2✔
191
        }
2✔
192

193
        _, err := io.WriteString(s.w, buf.String())
2✔
194
        if err != nil {
3✔
195
                return fmt.Errorf("sse: write error: %w", err)
1✔
196
        }
1✔
197

198
        s.flusher.Flush()
1✔
199
        return nil
1✔
200
}
201

202
// Close signals the SSE connection is done.
203
func (s *SSE) Close() error {
2,767✔
204
        s.mu.Lock()
2,767✔
205
        defer s.mu.Unlock()
2,767✔
206

2,767✔
207
        select {
2,767✔
208
        case <-s.closed:
2,583✔
209
                return nil
2,583✔
210
        default:
184✔
211
                close(s.closed)
184✔
212
                s.cancel()
184✔
213
                return nil
184✔
214
        }
215
}
216

217
// WaitDone blocks until the monitor goroutine exits.
218
// This is primarily used for testing to verify goroutine cleanup.
219
func (s *SSE) WaitDone() {
102✔
220
        <-s.done
102✔
221
}
102✔
222

223
// SetRetry sets the default reconnection time for this connection.
224
func (s *SSE) SetRetry(d time.Duration) error {
1✔
225
        s.mu.Lock()
1✔
226
        defer s.mu.Unlock()
1✔
227
        s.retry = d
1✔
228
        return nil
1✔
229
}
1✔
230

231
// DefaultProvider implements SSEProvider using the stdlib
232
type DefaultProvider struct{}
233

234
// NewDefaultProvider creates a new stdlib-based SSE provider.
235
func NewDefaultProvider() *DefaultProvider {
3✔
236
        return &DefaultProvider{}
3✔
237
}
3✔
238

239
// NewSSE creates a new SSE connection using the stdlib implementation.
240
func (p *DefaultProvider) NewSSE(w http.ResponseWriter, r *http.Request) (SSEConnection, error) {
1✔
241
        return NewSSE(w, r)
1✔
242
}
1✔
243

244
// SSEWriter wraps an http.ResponseWriter to provide SSE capabilities.
245
// This is a lower-level helper for users who want to write SSE directly.
246
type SSEWriter struct {
247
        w       http.ResponseWriter
248
        flusher http.Flusher
249
        ctx     context.Context
250
        mu      sync.Mutex
251
}
252

253
// NewSSEWriter creates a new SSEWriter from an http.ResponseWriter.
254
// This sets SSE headers and prepares the connection.
255
func NewSSEWriter(w http.ResponseWriter, r *http.Request) (*SSEWriter, error) {
8✔
256
        flusher, err := setupSSEResponse(w)
8✔
257
        if err != nil {
9✔
258
                return nil, err
1✔
259
        }
1✔
260

261
        return &SSEWriter{
7✔
262
                w:       w,
7✔
263
                flusher: flusher,
7✔
264
                ctx:     r.Context(),
7✔
265
        }, nil
7✔
266
}
267

268
// WriteEvent writes an SSE event.
269
func (s *SSEWriter) WriteEvent(event SSEEvent) error {
3✔
270
        s.mu.Lock()
3✔
271
        defer s.mu.Unlock()
3✔
272

3✔
273
        select {
3✔
274
        case <-s.ctx.Done():
×
275
                return fmt.Errorf("sse: %w", s.ctx.Err())
×
276
        default:
3✔
277
        }
278

279
        // Validate ID and Name per SSE spec: must not contain CR, LF, or NULL
280
        if strings.ContainsAny(event.ID, "\r\n\x00") {
4✔
281
                return fmt.Errorf("sse: event ID must not contain CR, LF, or NULL")
1✔
282
        }
1✔
283
        if strings.ContainsAny(event.Name, "\r\n") {
3✔
284
                return fmt.Errorf("sse: event name must not contain CR or LF")
1✔
285
        }
1✔
286

287
        var buf strings.Builder
1✔
288

1✔
289
        if event.ID != "" {
2✔
290
                buf.WriteString("id: ")
1✔
291
                buf.WriteString(event.ID)
1✔
292
                buf.WriteByte('\n')
1✔
293
        }
1✔
294

295
        if event.Name != "" {
2✔
296
                buf.WriteString("event: ")
1✔
297
                buf.WriteString(event.Name)
1✔
298
                buf.WriteByte('\n')
1✔
299
        }
1✔
300

301
        if event.Retry > 0 {
2✔
302
                buf.WriteString("retry: ")
1✔
303
                buf.WriteString(strconv.FormatInt(event.Retry.Milliseconds(), 10))
1✔
304
                buf.WriteByte('\n')
1✔
305
        }
1✔
306

307
        if len(event.Data) > 0 {
2✔
308
                lines := strings.Split(normalizeLineEndings(string(event.Data)), "\n")
1✔
309
                for _, line := range lines {
2✔
310
                        buf.WriteString("data: ")
1✔
311
                        buf.WriteString(line)
1✔
312
                        buf.WriteByte('\n')
1✔
313
                }
1✔
314
        } else {
×
315
                buf.WriteString("data: \n")
×
316
        }
×
317

318
        buf.WriteByte('\n')
1✔
319

1✔
320
        _, err := io.WriteString(s.w, buf.String())
1✔
321
        if err != nil {
1✔
322
                return fmt.Errorf("sse: write error: %w", err)
×
323
        }
×
324
        s.flusher.Flush()
1✔
325
        return nil
1✔
326
}
327

328
// WriteComment writes an SSE comment.
329
func (s *SSEWriter) WriteComment(comment string) error {
2✔
330
        s.mu.Lock()
2✔
331
        defer s.mu.Unlock()
2✔
332

2✔
333
        select {
2✔
334
        case <-s.ctx.Done():
1✔
335
                return fmt.Errorf("sse: %w", s.ctx.Err())
1✔
336
        default:
1✔
337
        }
338

339
        lines := strings.Split(normalizeLineEndings(comment), "\n")
1✔
340
        var buf strings.Builder
1✔
341
        for _, line := range lines {
2✔
342
                buf.WriteString(": ")
1✔
343
                buf.WriteString(line)
1✔
344
                buf.WriteByte('\n')
1✔
345
        }
1✔
346

347
        _, err := io.WriteString(s.w, buf.String())
1✔
348
        if err != nil {
1✔
349
                return fmt.Errorf("sse: write error: %w", err)
×
350
        }
×
351
        s.flusher.Flush()
1✔
352
        return nil
1✔
353
}
354

355
// Flush flushes the underlying writer.
356
func (s *SSEWriter) Flush() {
1✔
357
        s.mu.Lock()
1✔
358
        defer s.mu.Unlock()
1✔
359
        s.flusher.Flush()
1✔
360
}
1✔
361

362
// IsClientDisconnected checks if the client has disconnected.
363
// This checks if the request context is done.
364
func IsClientDisconnected(r *http.Request) bool {
2✔
365
        select {
2✔
366
        case <-r.Context().Done():
1✔
367
                return true
1✔
368
        default:
1✔
369
                return false
1✔
370
        }
371
}
372

373
// SSEReplayer defines the interface for event replay storage.
374
// Implementations can use in-memory storage, Redis, databases, etc.
375
type SSEReplayer interface {
376
        // Store saves an event to the replay buffer and returns the event with assigned ID.
377
        Store(event SSEEvent) SSEEvent
378
        // Replay sends all events after the given ID to the provided send function.
379
        // Returns the number of events replayed and any error.
380
        Replay(afterID string, send func(SSEEvent) error) (int, error)
381
}
382

383
// Ensure InMemoryReplayer implements SSEReplayer
384
var _ SSEReplayer = (*InMemoryReplayer)(nil)
385

386
// InMemoryReplayer stores events in memory with a circular buffer.
387
// Events can be limited by max count and/or TTL.
388
type InMemoryReplayer struct {
389
        events    []storedEvent
390
        maxEvents int
391
        ttl       time.Duration
392
        mu        sync.RWMutex
393
        lastID    int64
394
}
395

396
type storedEvent struct {
397
        id        int64
398
        event     SSEEvent
399
        timestamp time.Time
400
}
401

402
// NewInMemoryReplayer creates a new in-memory event replayer.
403
// maxEvents is the maximum number of events to keep (0 = unlimited).
404
// ttl is how long to keep events (0 = no expiration).
405
func NewInMemoryReplayer(maxEvents int, ttl time.Duration) *InMemoryReplayer {
7✔
406
        if maxEvents < 0 {
7✔
407
                maxEvents = 0
×
408
        }
×
409
        return &InMemoryReplayer{
7✔
410
                events:    make([]storedEvent, 0),
7✔
411
                maxEvents: maxEvents,
7✔
412
                ttl:       ttl,
7✔
413
        }
7✔
414
}
415

416
// Store saves an event to the replay buffer with an auto-generated ID.
417
// Returns the event with the assigned ID so it can be used for broadcasting.
418
func (r *InMemoryReplayer) Store(event SSEEvent) SSEEvent {
11✔
419
        r.mu.Lock()
11✔
420
        defer r.mu.Unlock()
11✔
421

11✔
422
        now := time.Now()
11✔
423

11✔
424
        if r.ttl > 0 {
13✔
425
                valid := make([]storedEvent, 0, len(r.events))
2✔
426
                for _, e := range r.events {
3✔
427
                        if now.Sub(e.timestamp) < r.ttl {
1✔
428
                                valid = append(valid, e)
×
429
                        }
×
430
                }
431
                r.events = valid
2✔
432
        }
433

434
        r.lastID++
11✔
435
        event.ID = strconv.FormatInt(r.lastID, 10)
11✔
436

11✔
437
        r.events = append(r.events, storedEvent{
11✔
438
                id:        r.lastID,
11✔
439
                event:     event,
11✔
440
                timestamp: now,
11✔
441
        })
11✔
442

11✔
443
        if r.maxEvents > 0 && len(r.events) > r.maxEvents {
12✔
444
                r.events = r.events[len(r.events)-r.maxEvents:]
1✔
445
        }
1✔
446

447
        return event
11✔
448
}
449

450
// Replay sends all events after the given ID to the provided send function.
451
func (r *InMemoryReplayer) Replay(afterID string, send func(SSEEvent) error) (int, error) {
6✔
452
        startID := int64(0)
6✔
453
        if afterID != "" {
9✔
454
                var err error
3✔
455
                startID, err = strconv.ParseInt(afterID, 10, 64)
3✔
456
                if err != nil {
4✔
457
                        return 0, fmt.Errorf("sse: invalid Last-Event-ID: %w", err)
1✔
458
                }
1✔
459
        }
460

461
        // Snapshot events under lock, then release before I/O
462
        r.mu.RLock()
5✔
463
        var snapshot []SSEEvent
5✔
464
        for _, se := range r.events {
14✔
465
                if se.id > startID {
17✔
466
                        snapshot = append(snapshot, se.event)
8✔
467
                }
8✔
468
        }
469
        r.mu.RUnlock()
5✔
470

5✔
471
        count := 0
5✔
472
        for _, event := range snapshot {
13✔
473
                if err := send(event); err != nil {
8✔
474
                        return count, err
×
475
                }
×
476
                count++
8✔
477
        }
478
        return count, nil
5✔
479
}
480

481
// SSEWithReplay creates a new SSE connection and replays missed events if Last-Event-ID header is present.
482
// After replay completes, the connection is ready for new events.
483
func SSEWithReplay(w http.ResponseWriter, r *http.Request, replayer SSEReplayer) (*SSE, error) {
4✔
484
        stream, err := NewSSE(w, r)
4✔
485
        if err != nil {
4✔
486
                return nil, err
×
487
        }
×
488

489
        // Check for Last-Event-ID header for replay
490
        lastEventID := r.Header.Get(HeaderLastEventID)
4✔
491
        if lastEventID != "" {
7✔
492
                if replayer == nil {
4✔
493
                        _ = stream.Close()
1✔
494
                        return nil, fmt.Errorf("sse: Last-Event-ID header present but no replayer configured")
1✔
495
                }
1✔
496
                _, err := replayer.Replay(lastEventID, func(event SSEEvent) error {
4✔
497
                        return stream.Send(event)
2✔
498
                })
2✔
499
                if err != nil {
3✔
500
                        _ = stream.Close()
1✔
501
                        return nil, fmt.Errorf("sse: replay failed: %w", err)
1✔
502
                }
1✔
503
        }
504

505
        return stream, nil
2✔
506
}
507

508
// SSEHub manages multiple SSE connections for broadcasting.
509
type SSEHub struct {
510
        connections map[*SSE]struct{}
511
        topics      map[string]map[*SSE]struct{}
512
        mu          sync.RWMutex
513
}
514

515
// NewSSEHub creates a new SSE broadcast hub.
516
func NewSSEHub() *SSEHub {
10✔
517
        return &SSEHub{
10✔
518
                connections: make(map[*SSE]struct{}),
10✔
519
                topics:      make(map[string]map[*SSE]struct{}),
10✔
520
        }
10✔
521
}
10✔
522

523
// Register adds an SSE connection to the hub.
524
func (h *SSEHub) Register(s *SSE) {
905✔
525
        h.mu.Lock()
905✔
526
        defer h.mu.Unlock()
905✔
527
        h.connections[s] = struct{}{}
905✔
528
}
905✔
529

530
// Unregister removes an SSE connection from the hub.
531
func (h *SSEHub) Unregister(s *SSE) {
560✔
532
        h.mu.Lock()
560✔
533
        defer h.mu.Unlock()
560✔
534
        delete(h.connections, s)
560✔
535
        for topic, subs := range h.topics {
568✔
536
                delete(subs, s)
8✔
537
                if len(subs) == 0 {
10✔
538
                        delete(h.topics, topic)
2✔
539
                }
2✔
540
        }
541
}
542

543
// Subscribe adds an SSE connection to a topic.
544
func (h *SSEHub) Subscribe(s *SSE, topic string) {
17✔
545
        h.mu.Lock()
17✔
546
        defer h.mu.Unlock()
17✔
547
        if h.topics[topic] == nil {
24✔
548
                h.topics[topic] = make(map[*SSE]struct{})
7✔
549
        }
7✔
550
        h.topics[topic][s] = struct{}{}
17✔
551
}
552

553
// Unsubscribe removes an SSE connection from a topic.
554
func (h *SSEHub) Unsubscribe(s *SSE, topic string) {
1✔
555
        h.mu.Lock()
1✔
556
        defer h.mu.Unlock()
1✔
557
        if subs, ok := h.topics[topic]; ok {
2✔
558
                delete(subs, s)
1✔
559
                if len(subs) == 0 {
2✔
560
                        delete(h.topics, topic)
1✔
561
                }
1✔
562
        }
563
}
564

565
// Broadcast sends an event to all registered connections.
566
// Connections that fail to receive the event are automatically unregistered.
567
func (h *SSEHub) Broadcast(event SSEEvent) {
1,102✔
568
        h.mu.RLock()
1,102✔
569
        connections := make([]*SSE, 0, len(h.connections))
1,102✔
570
        for conn := range h.connections {
2,373✔
571
                connections = append(connections, conn)
1,271✔
572
        }
1,271✔
573
        h.mu.RUnlock()
1,102✔
574

1,102✔
575
        var failed []*SSE
1,102✔
576
        for _, conn := range connections {
2,373✔
577
                if err := conn.Send(event); err != nil {
1,823✔
578
                        failed = append(failed, conn)
552✔
579
                }
552✔
580
        }
581

582
        // Unregister failed connections
583
        for _, conn := range failed {
1,654✔
584
                h.Unregister(conn)
552✔
585
                _ = conn.Close()
552✔
586
        }
552✔
587
}
588

589
// BroadcastTo sends an event to all connections subscribed to a topic.
590
// Connections that fail to receive the event are automatically unregistered.
591
func (h *SSEHub) BroadcastTo(topic string, event SSEEvent) {
102✔
592
        h.mu.RLock()
102✔
593
        var connections []*SSE
102✔
594
        if subs, ok := h.topics[topic]; ok {
204✔
595
                connections = make([]*SSE, 0, len(subs))
102✔
596
                for conn := range subs {
615✔
597
                        connections = append(connections, conn)
513✔
598
                }
513✔
599
        }
600
        h.mu.RUnlock()
102✔
601

102✔
602
        var failed []*SSE
102✔
603
        for _, conn := range connections {
615✔
604
                if err := conn.Send(event); err != nil {
519✔
605
                        failed = append(failed, conn)
6✔
606
                }
6✔
607
        }
608

609
        // Unregister failed connections
610
        for _, conn := range failed {
108✔
611
                h.Unregister(conn)
6✔
612
                _ = conn.Close()
6✔
613
        }
6✔
614
}
615

616
// ConnectionCount returns the number of registered connections.
617
func (h *SSEHub) ConnectionCount() int {
5✔
618
        h.mu.RLock()
5✔
619
        defer h.mu.RUnlock()
5✔
620
        return len(h.connections)
5✔
621
}
5✔
622

623
// TopicCount returns the number of connections subscribed to a topic.
624
func (h *SSEHub) TopicCount(topic string) int {
7✔
625
        h.mu.RLock()
7✔
626
        defer h.mu.RUnlock()
7✔
627
        return len(h.topics[topic])
7✔
628
}
7✔
629

630
var (
631
        _ SSEConnection = (*SSE)(nil)
632
        _ SSEProvider   = (*DefaultProvider)(nil)
633
        _ SSEReplayer   = (*InMemoryReplayer)(nil)
634
)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc