• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 24630512047

19 Apr 2026 01:43PM UTC coverage: 68.518% (+0.2%) from 68.308%
24630512047

Pull #511

github

web-flow
Merge 4a7e41095 into f5093cb11
Pull Request #511: chore: migrate to pino 456

945 of 1497 branches covered (63.13%)

Branch coverage included in aggregate %.

269 of 396 new or added lines in 50 files covered. (67.93%)

1 existing line in 1 file now uncovered.

2472 of 3490 relevant lines covered (70.83%)

18.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/src/adapters/web-socket-server-adapter.ts
1
import { IncomingMessage, Server } from 'http'
2
import WebSocket, { OPEN, WebSocketServer } from 'ws'
2✔
3
import { propEq } from 'ramda'
2✔
4

5
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
6
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
2✔
7
import { createLogger } from '../factories/logger-factory'
2✔
8
import { Event } from '../@types/event'
9
import { Factory } from '../@types/base'
10
import { getRemoteAddress } from '../utils/http'
2✔
11
import { isRateLimited } from '../handlers/request-handlers/rate-limiter-middleware'
2✔
12
import { Settings } from '../@types/settings'
13
import { WebServerAdapter } from './web-server-adapter'
2✔
14

15
const logger = createLogger('web-socket-server-adapter')
2✔
16

17
const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 120000
2✔
18

19
export class WebSocketServerAdapter extends WebServerAdapter implements IWebSocketServerAdapter {
2✔
20
  private webSocketsAdapters: WeakMap<WebSocket, IWebSocketAdapter>
21

22
  private heartbeatInterval: NodeJS.Timeout
23

24
  public constructor(
25
    webServer: Server,
26
    private readonly webSocketServer: WebSocketServer,
17✔
27
    private readonly createWebSocketAdapter: Factory<
17✔
28
      IWebSocketAdapter,
29
      [WebSocket, IncomingMessage, IWebSocketServerAdapter]
30
    >,
31
    private readonly settings: () => Settings,
17✔
32
  ) {
33
    logger('created')
17✔
34
    super(webServer)
17✔
35

36
    this.webSocketsAdapters = new WeakMap()
17✔
37

38
    this.on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this))
17✔
39

40
    this.webSocketServer
17✔
41
      .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this))
42
      .on('error', (error) => {
NEW
43
        logger('error: %o', error)
×
44
      })
45
    this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL)
17✔
46
  }
47

48
  public close(callback?: () => void): void {
49
    super.close(() => {
22✔
50
      logger('closing')
21✔
51
      clearInterval(this.heartbeatInterval)
21✔
52
      this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
21✔
53
        const webSocketAdapter = this.webSocketsAdapters.get(webSocket)
2✔
54
        if (webSocketAdapter) {
2!
NEW
55
          logger('terminating client %s: %s', webSocketAdapter.getClientId(), webSocketAdapter.getClientAddress())
×
56
        }
57
        webSocket.terminate()
2✔
58
      })
59
      logger('closing web socket server')
21✔
60
      this.webSocketServer.close(() => {
21✔
61
        this.webSocketServer.removeAllListeners()
21✔
62
        if (typeof callback !== 'undefined') {
21✔
63
          callback()
3✔
64
        }
65
        logger('closed')
21✔
66
      })
67
    })
68
    this.removeAllListeners()
22✔
69
  }
70

71
  private onBroadcast(event: Event) {
72
    this.webSocketServer.clients.forEach((webSocket: WebSocket) => {
57✔
73
      if (!propEq('readyState', OPEN)(webSocket)) {
85✔
74
        return
1✔
75
      }
76
      const webSocketAdapter = this.webSocketsAdapters.get(webSocket) as IWebSocketAdapter
84✔
77
      if (!webSocketAdapter) {
84!
78
        return
×
79
      }
80
      webSocketAdapter.emit(WebSocketAdapterEvent.Event, event)
84✔
81
    })
82
  }
83

84
  public getConnectedClients(): number {
85
    return Array.from(this.webSocketServer.clients).filter(propEq('readyState', OPEN)).length
2✔
86
  }
87

88
  private async onConnection(client: WebSocket, req: IncomingMessage) {
89
    const currentSettings = this.settings()
55✔
90
    const remoteAddress = getRemoteAddress(req, currentSettings)
55✔
91

92
    logger('client %s connected: %o', remoteAddress, req.headers)
55✔
93

94
    if (await isRateLimited(remoteAddress, currentSettings)) {
55✔
95
      logger('client %s terminated: rate-limited', remoteAddress)
1✔
96
      client.terminate()
1✔
97
      return
1✔
98
    }
99

100
    this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this]))
54✔
101
  }
102

103
  private onHeartbeat() {
104
    this.webSocketServer.clients.forEach((webSocket) => {
1✔
105
      const webSocketAdapter = this.webSocketsAdapters.get(webSocket) as IWebSocketAdapter
1✔
106
      if (webSocketAdapter) {
1!
107
        webSocketAdapter.emit(WebSocketAdapterEvent.Heartbeat)
1✔
108
      }
109
    })
110
  }
111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc