• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 25207542759

01 May 2026 08:07AM UTC coverage: 62.181% (-2.4%) from 64.58%
25207542759

Pull #591

github

web-flow
Merge 29f984796 into d8f62b496
Pull Request #591: fix: add husky install fallback

1679 of 3060 branches covered (54.87%)

Branch coverage included in aggregate %.

3944 of 5983 relevant lines covered (65.92%)

7.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.54
/src/adapters/web-socket-adapter.ts
1
import cluster from 'cluster'
1✔
2
import { EventEmitter } from 'stream'
1✔
3
import { IncomingMessage as IncomingHttpMessage } from 'http'
4
import { WebSocket } from 'ws'
1✔
5
import { ZodError } from 'zod'
1✔
6

7
import { ContextMetadata, Factory } from '../@types/base'
8
import { createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
1✔
9
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
10
import { IncomingMessage, OutgoingMessage } from '../@types/messages'
11
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
12
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
13
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
1✔
14
import { attemptValidation } from '../utils/validation'
1✔
15
import { ContextMetadataKey } from '../constants/base'
1✔
16
import { createLogger } from '../factories/logger-factory'
1✔
17
import { Event } from '../@types/event'
18
import { getRemoteAddress } from '../utils/http'
1✔
19
import { IRateLimiter } from '../@types/utils'
20
import { isEventMatchingFilter } from '../utils/event'
1✔
21
import { messageSchema } from '../schemas/message-schema'
1✔
22
import { Settings } from '../@types/settings'
23
import { SocketAddress } from 'net'
1✔
24

25
const logger = createLogger('web-socket-adapter')
1✔
26
const debugHeartbeat = logger.extend('heartbeat')
1✔
27

28
const abortableMessageHandlers: WeakMap<WebSocket, IAbortable[]> = new WeakMap()
1✔
29

30
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
1✔
31
  public clientId: string
32
  private clientAddress: SocketAddress
33
  private alive: boolean
34
  private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
35

36
  public constructor(
37
    private readonly client: WebSocket,
40✔
38
    private readonly request: IncomingHttpMessage,
40✔
39
    private readonly webSocketServer: IWebSocketServerAdapter,
40✔
40
    private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
40✔
41
    private readonly rateLimiter: Factory<IRateLimiter>,
40✔
42
    private readonly settings: Factory<Settings>,
40✔
43
  ) {
44
    super()
40✔
45
    this.alive = true
40✔
46
    this.subscriptions = new Map()
40✔
47

48
    this.clientId = Buffer.from(this.request.headers['sec-websocket-key'] as string, 'base64').toString('hex')
40✔
49

50
    const address = getRemoteAddress(this.request, this.settings())
40✔
51

52
    this.clientAddress = new SocketAddress({
40✔
53
      address: address,
54
      family: address.indexOf(':') >= 0 ? 'ipv6' : 'ipv4',
40✔
55
    })
56

57
    this.client
40✔
58
      .on('error', (error) => {
59
        if (error.name === 'RangeError' && error.message === 'Max payload size exceeded') {
3✔
60
          logger.error(`web-socket-adapter: client ${this.clientId} (${this.getClientAddress()}) sent payload too large`)
1✔
61
        } else if (error.name === 'RangeError' && error.message === 'Invalid WebSocket frame: RSV1 must be clear') {
2✔
62
          logger(`client ${this.clientId} (${this.getClientAddress()}) enabled compression`)
1✔
63
        } else {
64
          logger.error(`web-socket-adapter: client error ${this.clientId} (${this.getClientAddress()}):`, error)
1✔
65
        }
66

67
        this.client.close()
3✔
68
      })
69
      .on('message', this.onClientMessage.bind(this))
70
      .on('close', this.onClientClose.bind(this))
71
      .on('pong', this.onClientPong.bind(this))
72
      .on('ping', this.onClientPing.bind(this))
73

74
    this.on(WebSocketAdapterEvent.Heartbeat, this.onHeartbeat.bind(this))
40✔
75
      .on(WebSocketAdapterEvent.Subscribe, this.onSubscribed.bind(this))
76
      .on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this))
77
      .on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this))
78
      .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this))
79
      .on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this))
80

81
    logger('client %s connected from %s', this.clientId, this.clientAddress.address)
40✔
82
  }
83

84
  public getClientId(): string {
85
    return this.clientId
3✔
86
  }
87

88
  public getClientAddress(): string {
89
    return this.clientAddress.address
13✔
90
  }
91

92
  public onUnsubscribed(subscriptionId: string): void {
93
    logger('client %s unsubscribed %s', this.clientId, subscriptionId)
2✔
94
    this.subscriptions.delete(subscriptionId)
2✔
95
  }
96

97
  public onSubscribed(subscriptionId: string, filters: SubscriptionFilter[]): void {
98
    logger('client %s subscribed %s to %o', this.clientId, subscriptionId, filters)
14✔
99
    this.subscriptions.set(subscriptionId, filters)
14✔
100
  }
101

102
  public onBroadcast(event: Event): void {
103
    this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event)
1✔
104
    if (cluster.isWorker && typeof process.send === 'function') {
1!
105
      process.send({
×
106
        eventName: WebSocketServerAdapterEvent.Broadcast,
107
        event,
108
      })
109
    }
110
  }
111

112
  public onSendEvent(event: Event): void {
113
    this.subscriptions.forEach((filters, subscriptionId) => {
4✔
114
      if (filters.map(isEventMatchingFilter).some((isMatch) => isMatch(event))) {
5✔
115
        logger('sending event to client %s: %o', this.clientId, event)
4✔
116
        this.sendMessage(createOutgoingEventMessage(subscriptionId, event))
4✔
117
      }
118
    })
119
  }
120

121
  private sendMessage(message: OutgoingMessage): void {
122
    if (this.client.readyState !== WebSocket.OPEN) {
11✔
123
      return
2✔
124
    }
125
    this.client.send(JSON.stringify(message))
9✔
126
  }
127

128
  public onHeartbeat(): void {
129
    if (!this.alive) {
9✔
130
      logger.error(`web-socket-adapter: pong timeout for client ${this.clientId} (${this.getClientAddress()})`)
2✔
131
      this.client.close()
2✔
132
      return
2✔
133
    }
134

135
    this.alive = false
7✔
136
    this.client.ping()
7✔
137
    debugHeartbeat('client %s ping', this.clientId)
7✔
138
  }
139

140
  public getSubscriptions(): Map<string, SubscriptionFilter[]> {
141
    return new Map(this.subscriptions)
8✔
142
  }
143

144
  private async onClientMessage(raw: Buffer) {
145
    this.alive = true
7✔
146
    let abortable = false
7✔
147
    let messageHandler: (IMessageHandler & IAbortable) | undefined = undefined
7✔
148
    try {
7✔
149
      if (await this.isRateLimited(this.clientAddress.address)) {
7✔
150
        this.sendMessage(createNoticeMessage('rate limited'))
1✔
151
        return
1✔
152
      }
153

154
      const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))
6✔
155

156
      message[ContextMetadataKey] = {
2✔
157
        remoteAddress: this.clientAddress,
158
      } as ContextMetadata
159

160
      messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
2✔
161
      if (!messageHandler) {
2✔
162
        logger.error('web-socket-adapter: unhandled message: no handler found:', message)
1✔
163
        return
1✔
164
      }
165

166
      abortable = typeof messageHandler.abort === 'function'
1✔
167

168
      if (abortable) {
1!
169
        const handlers = abortableMessageHandlers.get(this.client) ?? []
×
170
        handlers.push(messageHandler)
×
171
        abortableMessageHandlers.set(this.client, handlers)
×
172
      }
173

174
      await messageHandler.handleMessage(message)
1✔
175
    } catch (error) {
176
      if (error instanceof Error) {
5!
177
        if (error.name === 'AbortError') {
5✔
178
          logger.error(`web-socket-adapter: abort from client ${this.clientId} (${this.getClientAddress()})`)
1✔
179
        } else if (error.name === 'SyntaxError' || error instanceof ZodError) {
4!
180
          logger('invalid message client %s (%s): %s', this.clientId, this.getClientAddress(), error.message)
4✔
181
          const notice =
182
            error instanceof ZodError
4!
183
              ? `invalid: ${error.issues[0]?.message ?? error.message}`
×
184
              : `invalid: ${error.message}`
185
          this.sendMessage(createNoticeMessage(notice))
4✔
186
        } else {
187
          logger.error('web-socket-adapter: unable to handle message:', error)
×
188
        }
189
      } else {
190
        logger.error('web-socket-adapter: unable to handle message:', error)
×
191
      }
192
    } finally {
193
      if (abortable && messageHandler) {
7!
194
        const handlers = abortableMessageHandlers.get(this.client)
×
195
        if (handlers) {
×
196
          const index = handlers.indexOf(messageHandler)
×
197
          if (index >= 0) {
×
198
            handlers.splice(index, 1)
×
199
          }
200
        }
201
      }
202
    }
203
  }
204

205
  private async isRateLimited(client: string): Promise<boolean> {
206
    const { rateLimits, ipWhitelist = [] } = this.settings().limits?.message ?? {}
7!
207

208
    if (!Array.isArray(rateLimits) || !rateLimits.length || ipWhitelist.includes(client)) {
7✔
209
      return false
6✔
210
    }
211

212
    const rateLimiter = this.rateLimiter()
1✔
213

214
    const hit = (period: number, rate: number) => rateLimiter.hit(`${client}:message:${period}`, 1, { period, rate })
1✔
215

216
    let limited = false
1✔
217
    for (const { rate, period } of rateLimits) {
1✔
218
      const isRateLimited = await hit(period, rate)
1✔
219

220
      if (isRateLimited) {
1!
221
        logger('rate limited %s: %d messages / %d ms exceeded', client, rate, period)
1✔
222

223
        limited = true
1✔
224
      }
225
    }
226

227
    return limited
1✔
228
  }
229

230
  private onClientPong() {
231
    debugHeartbeat('client %s pong', this.clientId)
1✔
232
    this.alive = true
1✔
233
  }
234

235
  private onClientPing(data: any) {
236
    debugHeartbeat('client %s ping', this.clientId)
1✔
237
    this.client.pong(data)
1✔
238
    this.alive = true
1✔
239
  }
240

241
  private onClientClose() {
242
    this.alive = false
2✔
243
    this.subscriptions.clear()
2✔
244

245
    const handlers = abortableMessageHandlers.get(this.client)
2✔
246
    if (Array.isArray(handlers) && handlers.length) {
2!
247
      for (const handler of handlers) {
×
248
        try {
×
249
          handler.abort()
×
250
        } catch (error) {
251
          logger.error('Unable to abort message handler', error)
×
252
        }
253
      }
254
    }
255

256
    this.removeAllListeners()
2✔
257
    this.client.removeAllListeners()
2✔
258
  }
259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc