• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cameri / nostream / 24597013493

18 Apr 2026 04:35AM UTC coverage: 47.213% (-1.6%) from 48.812%
24597013493

push

github

web-flow
feat: add opt-in event retention purge (#359) (#412)

Co-authored-by: Ricardo Cabral <me@ricardocabral.io>

461 of 1107 branches covered (41.64%)

Branch coverage included in aggregate %.

55 of 83 new or added lines in 5 files covered. (66.27%)

71 existing lines in 8 files now uncovered.

1343 of 2714 relevant lines covered (49.48%)

3.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.11
/src/adapters/web-socket-adapter.ts
1
import cluster from 'cluster'
1✔
2
import { EventEmitter } from 'stream'
1✔
3
import { IncomingMessage as IncomingHttpMessage } from 'http'
4
import { WebSocket } from 'ws'
1✔
5

6
import { ContextMetadata, Factory } from '../@types/base'
7
import { createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
1✔
8
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
9
import { IncomingMessage, OutgoingMessage } from '../@types/messages'
10
import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters'
11
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
12
import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter'
1✔
13
import { attemptValidation } from '../utils/validation'
1✔
14
import { ContextMetadataKey } from '../constants/base'
1✔
15
import { createLogger } from '../factories/logger-factory'
1✔
16
import { Event } from '../@types/event'
17
import { getRemoteAddress } from '../utils/http'
1✔
18
import { IRateLimiter } from '../@types/utils'
19
import { isEventMatchingFilter } from '../utils/event'
1✔
20
import { messageSchema } from '../schemas/message-schema'
1✔
21
import { Settings } from '../@types/settings'
22
import { SocketAddress } from 'net'
1✔
23

24

25
const debug = createLogger('web-socket-adapter')
1✔
26
const debugHeartbeat = debug.extend('heartbeat')
1✔
27

28
const abortableMessageHandlers: WeakMap<WebSocket, IAbortable[]> = new WeakMap()
1✔
29

30
export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter {
1✔
31
  public clientId: string
32
  private clientAddress: SocketAddress
33
  private alive: boolean
34
  private subscriptions: Map<SubscriptionId, SubscriptionFilter[]>
35

36
  public constructor(
37
    private readonly client: WebSocket,
1✔
38
    private readonly request: IncomingHttpMessage,
1✔
39
    private readonly webSocketServer: IWebSocketServerAdapter,
1✔
40
    private readonly createMessageHandler: Factory<IMessageHandler, [IncomingMessage, IWebSocketAdapter]>,
1✔
41
    private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
1✔
42
    private readonly settings: Factory<Settings>,
1✔
43
  ) {
44
    super()
1✔
45
    this.alive = true
1✔
46
    this.subscriptions = new Map()
1✔
47

48
    this.clientId = Buffer.from(this.request.headers['sec-websocket-key'] as string, 'base64').toString('hex')
1✔
49

50
    const address = getRemoteAddress(this.request, this.settings())
1✔
51

52
    this.clientAddress = new SocketAddress({
1✔
53
      address: address,
54
      family: address.indexOf(':') >= 0 ? 'ipv6' : 'ipv4',
1!
55
    })
56

57
    this.client
1✔
58
      .on('error', (error) => {
59
        if (error.name === 'RangeError' && error.message === 'Max payload size exceeded') {
×
60
          console.error(`web-socket-adapter: client ${this.clientId} (${this.getClientAddress()}) sent payload too large`)
×
61
        } else if (error.name === 'RangeError' && error.message === 'Invalid WebSocket frame: RSV1 must be clear') {
×
62
          debug(`client ${this.clientId} (${this.getClientAddress()}) enabled compression`)
×
63
        } else {
64
          console.error(`web-socket-adapter: client error ${this.clientId} (${this.getClientAddress()}):`, error)
×
65
        }
66

67
        this.client.close()
×
68
      })
69
      .on('message', this.onClientMessage.bind(this))
70
      .on('close', this.onClientClose.bind(this))
71
      .on('pong', this.onClientPong.bind(this))
72
      .on('ping', this.onClientPing.bind(this))
73

74
    this
1✔
75
      .on(WebSocketAdapterEvent.Heartbeat, this.onHeartbeat.bind(this))
76
      .on(WebSocketAdapterEvent.Subscribe, this.onSubscribed.bind(this))
77
      .on(WebSocketAdapterEvent.Unsubscribe, this.onUnsubscribed.bind(this))
78
      .on(WebSocketAdapterEvent.Event, this.onSendEvent.bind(this))
79
      .on(WebSocketAdapterEvent.Broadcast, this.onBroadcast.bind(this))
80
      .on(WebSocketAdapterEvent.Message, this.sendMessage.bind(this))
81

82
    debug('client %s connected from %s', this.clientId, this.clientAddress.address)
1✔
83
  }
84

85
  public getClientId(): string {
86
    return this.clientId
×
87
  }
88

89
  public getClientAddress(): string {
90
    return this.clientAddress.address
×
91
  }
92

93
  public onUnsubscribed(subscriptionId: string): void {
UNCOV
94
    debug('client %s unsubscribed %s', this.clientId, subscriptionId)
×
UNCOV
95
    this.subscriptions.delete(subscriptionId)
×
96
  }
97

98
  public onSubscribed(subscriptionId: string, filters: SubscriptionFilter[]): void {
UNCOV
99
    debug('client %s subscribed %s to %o', this.clientId, subscriptionId, filters)
×
UNCOV
100
    this.subscriptions.set(subscriptionId, filters)
×
101
  }
102

103
  public onBroadcast(event: Event): void {
UNCOV
104
    this.webSocketServer.emit(WebSocketServerAdapterEvent.Broadcast, event)
×
UNCOV
105
    if (cluster.isWorker && typeof process.send === 'function') {
×
106
      process.send({
×
107
        eventName: WebSocketServerAdapterEvent.Broadcast,
108
        event,
109
      })
110
    }
111
  }
112

113
  public onSendEvent(event: Event): void {
UNCOV
114
    this.subscriptions.forEach((filters, subscriptionId) => {
×
UNCOV
115
      if (
×
UNCOV
116
        filters.map(isEventMatchingFilter).some((isMatch) => isMatch(event))
×
117
      ) {
UNCOV
118
        debug('sending event to client %s: %o', this.clientId, event)
×
UNCOV
119
        this.sendMessage(createOutgoingEventMessage(subscriptionId, event))
×
120
      }
121
    })
122
  }
123

124
  private sendMessage(message: OutgoingMessage): void {
UNCOV
125
    if (this.client.readyState !== WebSocket.OPEN) {
×
126
      return
×
127
    }
UNCOV
128
    this.client.send(JSON.stringify(message))
×
129
  }
130

131
  public onHeartbeat(): void {
132
    if (!this.alive && !this.subscriptions.size) {
×
133
      console.error(`web-socket-adapter: pong timeout for client ${this.clientId} (${this.getClientAddress()})`)
×
134
      this.client.close()
×
135
      return
×
136
    }
137

138
    this.alive = false
×
139
    this.client.ping()
×
140
    debugHeartbeat('client %s ping', this.clientId)
×
141
  }
142

143
  public getSubscriptions(): Map<string, SubscriptionFilter[]> {
UNCOV
144
    return new Map(this.subscriptions)
×
145
  }
146

147
  private async onClientMessage(raw: Buffer) {
UNCOV
148
    this.alive = true
×
UNCOV
149
    let abortable = false
×
UNCOV
150
    let messageHandler: IMessageHandler & IAbortable | undefined = undefined
×
UNCOV
151
    try {
×
UNCOV
152
      if (await this.isRateLimited(this.clientAddress.address)) {
×
153
        this.sendMessage(createNoticeMessage('rate limited'))
×
154
        return
×
155
      }
156

UNCOV
157
      const message = attemptValidation(messageSchema)(JSON.parse(raw.toString('utf8')))
×
158

UNCOV
159
      message[ContextMetadataKey] = {
×
160
        remoteAddress: this.clientAddress,
161
      } as ContextMetadata
162

UNCOV
163
      messageHandler = this.createMessageHandler([message, this]) as IMessageHandler & IAbortable
×
UNCOV
164
      if (!messageHandler) {
×
165
        console.error('web-socket-adapter: unhandled message: no handler found:', message)
×
166
        return
×
167
      }
168

UNCOV
169
      abortable = typeof messageHandler.abort === 'function'
×
170

UNCOV
171
      if (abortable) {
×
UNCOV
172
        const handlers = abortableMessageHandlers.get(this.client) ?? []
×
UNCOV
173
        handlers.push(messageHandler)
×
UNCOV
174
        abortableMessageHandlers.set(this.client, handlers)
×
175
      }
176

UNCOV
177
      await messageHandler.handleMessage(message)
×
178
    } catch (error) {
179
      if (error instanceof Error) {
×
180
        if (error.name === 'AbortError') {
×
181
          console.error(`web-socket-adapter: abort from client ${this.clientId} (${this.getClientAddress()})`)
×
182
        } else if (error.name === 'SyntaxError' || error.name === 'ValidationError') {
×
183
          if (typeof (error as any).annotate === 'function') {
×
184
            debug('invalid message client %s (%s): %o', this.clientId, this.getClientAddress(), (error as any).annotate())
×
185
          } else {
186
            console.error(`web-socket-adapter: malformed message from client ${this.clientId} (${this.getClientAddress()}):`, error.message)
×
187
          }
188
          this.sendMessage(createNoticeMessage(`invalid: ${error.message}`))
×
189
        } else {
190
          console.error('web-socket-adapter: unable to handle message:', error)
×
191
        }
192
      } else {
193
        console.error('web-socket-adapter: unable to handle message:', error)
×
194
      }
195
    } finally {
UNCOV
196
      if (abortable && messageHandler) {
×
UNCOV
197
        const handlers = abortableMessageHandlers.get(this.client)
×
UNCOV
198
        if (handlers) {
×
UNCOV
199
          const index = handlers.indexOf(messageHandler)
×
UNCOV
200
          if (index >= 0) {
×
UNCOV
201
            handlers.splice(index, 1)
×
202
          }
203
        }
204
      }
205
    }
206
  }
207

208
  private async isRateLimited(client: string): Promise<boolean> {
209
    const {
210
      rateLimits,
211
      ipWhitelist = [],
×
UNCOV
212
    } = this.settings().limits?.message ?? {}
×
213

UNCOV
214
    if (!Array.isArray(rateLimits) || !rateLimits.length || ipWhitelist.includes(client)) {
×
UNCOV
215
      return false
×
216
    }
217

218
    const rateLimiter = this.slidingWindowRateLimiter()
×
219

220
    const hit = (period: number, rate: number) =>
×
221
      rateLimiter.hit(
×
222
        `${client}:message:${period}`,
223
        1,
224
        { period, rate },
225
      )
226

227
    let limited = false
×
228
    for (const { rate, period } of rateLimits) {
×
229
      const isRateLimited = await hit(period, rate)
×
230

231

232
      if (isRateLimited) {
×
233
        debug('rate limited %s: %d messages / %d ms exceeded', client, rate, period)
×
234

235
        limited = true
×
236
      }
237
    }
238

239
    return limited
×
240
  }
241

242
  private onClientPong() {
243
    debugHeartbeat('client %s pong', this.clientId)
×
244
    this.alive = true
×
245
  }
246

247
  private onClientPing(data: any) {
248
    debugHeartbeat('client %s ping', this.clientId)
×
249
    this.client.pong(data)
×
250
    this.alive = true
×
251
  }
252

253
  private onClientClose() {
UNCOV
254
    this.alive = false
×
UNCOV
255
    this.subscriptions.clear()
×
256

UNCOV
257
    const handlers = abortableMessageHandlers.get(this.client)
×
UNCOV
258
    if (Array.isArray(handlers) && handlers.length) {
×
259
      for (const handler of handlers) {
×
260
        try {
×
261
          handler.abort()
×
262
        } catch (error) {
263
          console.error('Unable to abort message handler', error)
×
264
        }
265
      }
266
    }
267

UNCOV
268
    this.removeAllListeners()
×
UNCOV
269
    this.client.removeAllListeners()
×
270
  }
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc