• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 24597013493

18 Apr 2026 04:35AM UTC coverage: 47.213% (-1.6%) from 48.812%
24597013493

push

github

web-flow
feat: add opt-in event retention purge (#359) (#412)

Co-authored-by: Ricardo Cabral <me@ricardocabral.io>

461 of 1107 branches covered (41.64%)

Branch coverage included in aggregate %.

55 of 83 new or added lines in 5 files covered. (66.27%)

71 existing lines in 8 files now uncovered.

1343 of 2714 relevant lines covered (49.48%)

3.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.27
/src/adapters/web-socket-server-adapter.ts
1
import { IncomingMessage, Server } from 'http'
2
import WebSocket, { OPEN, WebSocketServer } from 'ws'
1✔
3
import { propEq } from 'ramda'
1✔
4

5
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
6
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
1✔
7
import { createLogger } from '../factories/logger-factory'
1✔
8
import { Event } from '../@types/event'
9
import { Factory } from '../@types/base'
10
import { getRemoteAddress } from '../utils/http'
1✔
11
import { isRateLimited } from '../handlers/request-handlers/rate-limiter-middleware'
1✔
12
import { Settings } from '../@types/settings'
13
import { WebServerAdapter } from './web-server-adapter'
1✔
14

15
const debug = createLogger('web-socket-server-adapter')
1✔
16

17
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 120000
1✔
18

19
export class WebSocketServerAdapter extends WebServerAdapter implements IWebSocketServerAdapter {
1✔
20
  private webSocketsAdapters: WeakMap<WebSocket, IWebSocketAdapter>
21

22
  private heartbeatInterval: NodeJS.Timeout
23

24
  public constructor(
25
    webServer: Server,
26
    private readonly webSocketServer: WebSocketServer,
1✔
27
    private readonly createWebSocketAdapter: Factory<
1✔
28
      IWebSocketAdapter,
29
      [WebSocket, IncomingMessage, IWebSocketServerAdapter]
30
    >,
31
    private readonly settings: () => Settings,
1✔
32
  ) {
33
    debug('created')
1✔
34
    super(webServer)
1✔
35

36
    this.webSocketsAdapters = new WeakMap()
1✔
37

38
    this
1✔
39
      .on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this))
40

41
    this.webSocketServer
1✔
42
      .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
43
      .on('error', (error) => {
44
        debug('error: %o', error)
×
45
      })
46
    this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
1✔
47
  }
48

49
  public close(callback?: () => void): void {
50
    super.close(() => {
1✔
51
      debug('closing')
1✔
52
      clearInterval(this.heartbeatInterval)
1✔
53
      this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
1✔
54
        const webSocketAdapter = this.webSocketsAdapters.get(webSocket)
×
55
        if (webSocketAdapter) {
×
56
          debug('terminating client %s: %s', webSocketAdapter.getClientId(), webSocketAdapter.getClientAddress())
×
57
        }
58
        webSocket.terminate()
×
59
      })
60
      debug('closing web socket server')
1✔
61
      this.webSocketServer.close(() => {
1✔
62
        this.webSocketServer.removeAllListeners()
1✔
63
        if (typeof callback !== 'undefined') {
1!
64
          callback()
1✔
65
        }
66
        debug('closed')
1✔
67
      })
68
    })
69
    this.removeAllListeners()
1✔
70
  }
71

72
  private onBroadcast(event: Event) {
UNCOV
73
    this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
×
UNCOV
74
      if (!propEq('readyState', OPEN)(webSocket)) {
×
75
        return
×
76
      }
UNCOV
77
      const webSocketAdapter = this.webSocketsAdapters.get(webSocket) as IWebSocketAdapter
×
UNCOV
78
      if (!webSocketAdapter) {
×
79
        return
×
80
      }
UNCOV
81
      webSocketAdapter.emit(WebSocketAdapterEvent.Event, event)
×
82
    })
83
  }
84

85
  public getConnectedClients(): number {
86
    return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length
×
87
  }
88

89
  private async onConnection(client: WebSocket, req: IncomingMessage) {
UNCOV
90
    const currentSettings = this.settings()
×
UNCOV
91
    const remoteAddress = getRemoteAddress(req, currentSettings)
×
92

UNCOV
93
    debug('client %s connected: %o', remoteAddress, req.headers)
×
94

UNCOV
95
    if (await isRateLimited(remoteAddress, currentSettings)) {
×
96
      debug('client %s terminated: rate-limited', remoteAddress)
×
97
      client.terminate()
×
98
      return
×
99
    }
100

UNCOV
101
    this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this]))
×
102
  }
103

104
  private onHeartbeat() {
105
    this.webSocketServer.clients.forEach((webSocket) => {
×
106
      const webSocketAdapter = this.webSocketsAdapters.get(webSocket) as IWebSocketAdapter
×
107
      if (webSocketAdapter) {
×
108
        webSocketAdapter.emit(WebSocketAdapterEvent.Heartbeat)
×
109
      }
110
    })
111
  }
112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc