• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gmaclennan / rpc-reflector / 5280286787

pending completion
5280286787

push

github

gmaclennan
chore(release): 1.3.4

190 of 203 branches covered (93.6%)

Branch coverage included in aggregate %.

320 of 328 relevant lines covered (97.56%)

749.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.27
/server.js
1
const assert = require('assert')
9✔
2
const { serializeError } = require('serialize-error')
9✔
3

4
const isStream = require('is-stream')
9✔
5
const { msgType } = require('./lib/constants')
9✔
6
const isValidMessage = require('./lib/validate-message')
9✔
7
const { parse, stringify } = require('./lib/prop-array-utils')
9✔
8
const MessageStream = require('./lib/message-stream')
9✔
9
const isMessagePortLike = require('./lib/is-message-port-like')
9✔
10
const { EventEmitter } = require('events')
9✔
11

12
/** @typedef {import('./lib/types').MsgRequest} MsgRequest */
13
/** @typedef {import('./lib/types').MsgResponse} MsgResponse */
14
/** @typedef {import('./lib/types').MsgOn} MsgOn */
15
/** @typedef {import('./lib/types').MsgOff} MsgOff */
16
/** @typedef {import('./lib/types').MsgEmit} MsgEmit */
17
/** @typedef {import('./lib/types').Message} Message */
18
/** @typedef {import('./lib/types').NonEmptyArray<string>} NonEmptyStringArray */
19
/** @typedef {import('./lib/types').MessagePortLike} MessagePortLike */
20
/** @typedef {import('worker_threads').MessagePort} MessagePortNode */
21

22
module.exports = createServer
9✔
23

24
/**
25
 * @public
26
 * Create an RPC server that will receive messages via `receiver`, call the
27
 * matching method on `handler`, and send the reply via `send`.
28
 *
29
 * @param {{[method: string]: any}} handler Any method called on the client
30
 * object will be called on this object. Methods can return a value, a Promise,
31
 * or a ReadableStream. Your transport stream must be able to encode/decode any
32
 * values that your handler returns
33
 * @param {MessagePort | MessagePortLike | MessagePortNode | import('stream').Duplex} channel A Duplex Stream with objectMode=true or a MessagePort-like object that must implement an `.on('message')` event handler and a `.postMessage()` method.
34
 *
35
 * @returns {{ close: () => void }} An object with a single method `close()`
36
 * that will stop the server listening to and sending any more messages
37
 */
38
function createServer(handler, channel) {
39
  assert(typeof handler === 'object', 'Missing handler object.')
135✔
40
  const channelIsStream = isStream.duplex(channel)
135✔
41
  assert(
135✔
42
    isMessagePortLike(channel) || channelIsStream,
204✔
43
    'Must pass a Duplex Stream or a browser MessagePort, node worker.MessagePort, or MessagePort-like object'
44
  )
45

46
  /** @type {Map<string, (...args: any[]) => void>} */
47
  let subscriptions = new Map()
135✔
48

49
  if (channelIsStream) {
135✔
50
    channel.on('data', handleMessage)
69✔
51
  } else if ('on' in channel) {
66!
52
    channel.on('message', handleMessage)
66✔
53
  } else {
54
    channel.addEventListener('message', handleMessage)
×
55
  }
56

57
  /** @param {MsgResponse | MsgEmit} msg */
58
  function send(msg) {
59
    // TODO: Do we need back pressure here? Would just result in buffering here
60
    // vs. buffering in the stream, so probably no
61
    if (channelIsStream) {
4,380✔
62
      channel.write(msg)
108✔
63
    } else {
64
      channel.postMessage(msg)
4,272✔
65
    }
66
  }
67

68
  /**
69
   * Handles an incoming message.
70
   * @param {unknown} msg Can be any type, but we only process messages types that
71
   * we understand, other messages are ignored
72
   */
73
  function handleMessage(msg) {
74
    // When using a MessagePort in a browser or electron environment, the
75
    // actual data is in `event.data`
76
    if (typeof msg === 'object' && msg && 'data' in msg) {
441!
77
      msg = msg.data
×
78
    }
79
    if (!isValidMessage(msg)) return
441✔
80
    switch (msg[0]) {
348✔
81
      case msgType.REQUEST:
82
        return handleRequest(/** @type {MsgRequest} */ (msg))
210✔
83
      case msgType.ON:
84
        return handleOn(/** @type {MsgOn} */ (msg))
84✔
85
      case msgType.OFF:
86
        return handleOff(/** @type {MsgOff} */ (msg))
30✔
87
      default:
88
        console.warn(`Unhandled message type: ${msg[0]}. (Message was ignored)`)
24✔
89
    }
90
  }
91

92
  /** @param {MsgRequest} msg */
93
  async function handleRequest([, msgId, propArray, args]) {
94
    /** @type {MsgResponse} */
95
    let response
96

97
    try {
210✔
98
      const result = await Promise.resolve(
210✔
99
        applyNestedMethod(handler, propArray, args)
100
      )
101
      if (isStream.readable(result)) {
126✔
102
        return handleStream(result)
36✔
103
      }
104
      response = [msgType.RESPONSE, msgId, null, result]
90✔
105
    } catch (error) {
106
      response = [msgType.RESPONSE, msgId, serializeError(error)]
84✔
107
    }
108

109
    send(response)
174✔
110

111
    /** @param {import('stream').Readable} stream */
112
    function handleStream(stream) {
113
      const rs = stream.pipe(new MessageStream(msgId))
36✔
114
      if (channelIsStream) {
36✔
115
        rs.pipe(channel, { end: false })
18✔
116
      } else {
117
        rs.on('data', (chunk) => send(chunk))
4,164✔
118
      }
119
      rs.on('error', (err) =>
36✔
120
        send([msgType.RESPONSE, msgId, serializeError(err)])
×
121
      )
122
    }
123
  }
124

125
  /** @param {MsgOn} msg */
126
  function handleOn([, eventName, propArray]) {
127
    let emitter
128
    try {
84✔
129
      emitter = getNestedEventEmitter(handler, propArray)
84✔
130
    } catch (e) {
131
      return console.warn(e + '. (Subscription from client was ignored)')
12✔
132
    }
133

134
    const encodedEventName = stringify(propArray, eventName)
72✔
135

136
    // If we are already emitting for this event, we can ignore
137
    if (subscriptions.has(encodedEventName)) return
72✔
138

139
    /** @type {(...args: any[]) => void} */
140
    const listener = (...args) => {
66✔
141
      if (args.length === 1 && args[0] instanceof Error) {
42✔
142
        send([msgType.EMIT, eventName, propArray, serializeError(args[0])])
6✔
143
      } else {
144
        send([msgType.EMIT, eventName, propArray, null, args])
36✔
145
      }
146
    }
147
    subscriptions.set(encodedEventName, listener)
66✔
148
    emitter.on(eventName, listener)
66✔
149
  }
150

151
  /** @param {MsgOff} msg */
152
  function handleOff([, eventName, propArray]) {
153
    let emitter
154
    try {
30✔
155
      emitter = getNestedEventEmitter(handler, propArray)
30✔
156
    } catch (e) {
157
      return console.warn(e + '. (Subscription from client was ignored)')
12✔
158
    }
159

160
    const encodedEventName = stringify(propArray, eventName)
18✔
161

162
    // Fail silently if there is nothing to unsubscribe
163
    if (!subscriptions.has(encodedEventName)) return
18✔
164

165
    const listener = subscriptions.get(encodedEventName)
12✔
166
    listener && emitter.removeListener(eventName, listener)
12✔
167
    subscriptions.delete(encodedEventName)
12✔
168
  }
169

170
  return {
135✔
171
    close: () => {
172
      if (channelIsStream) {
18✔
173
        channel.off('data', handleMessage)
9✔
174
      } else if ('off' in channel) {
9!
175
        channel.off('message', handleMessage)
9✔
176
      } else {
177
        channel.removeEventListener('message', handleMessage)
×
178
      }
179
      for (const [encodedEventName, listener] of subscriptions.entries()) {
18✔
180
        const [propArray, eventName] = parse(encodedEventName)
24✔
181
        try {
24✔
182
          const emitter = getNestedEventEmitter(handler, propArray)
24✔
183
          emitter.removeListener(eventName, listener)
24✔
184
        } catch (e) {}
185
      }
186
      subscriptions = new Map()
18✔
187
    },
188
  }
189
}
190

191
/**
192
 * @private
193
 * Calls a deeply nested property function. Throws a TypeError if not a function
194
 *
195
 * @param {{[propertyKey: string]: any}} target
196
 * @param {NonEmptyStringArray} propArray
197
 * @param {ArrayLike<any>} args
198
 * @returns {any}
199
 */
200
function applyNestedMethod(target, propArray, args) {
201
  let nested = target
210✔
202
  for (const propertyKey of propArray.slice(0, -1)) {
210✔
203
    if (!Reflect.has(nested, propertyKey)) {
96✔
204
      throw new ReferenceError(`${propertyKey} is not defined`)
6✔
205
    }
206
    nested = nested[propertyKey]
90✔
207
  }
208
  const propertyKey = propArray[propArray.length - 1]
204✔
209
  if (nested === null) {
204✔
210
    throw new TypeError(`Cannot read property '${propertyKey}' of null`)
6✔
211
  }
212
  if (typeof nested === 'object') {
198✔
213
    if (!Reflect.has(nested, propertyKey)) {
168✔
214
      throw new ReferenceError(`${propertyKey} is not defined`)
30✔
215
    }
216
  } else if (typeof nested[propertyKey] === 'undefined') {
30!
217
    throw new ReferenceError(`${propertyKey} is not defined`)
24✔
218
  }
219
  if (typeof nested[propertyKey] === 'function') {
138✔
220
    return Reflect.apply(nested[propertyKey], nested, args)
84✔
221
  }
222
  if (typeof nested[propertyKey] === 'symbol') {
54✔
223
    throw new TypeError(`Property '${propertyKey}' is a Symbol`)
6✔
224
  }
225
  return nested[propertyKey]
48✔
226
}
227

228
/**
229
 * @private
230
 * Returns a deeply nested event emitter
231
 *
232
 * @param {{[propertyKey: string]: any}} target
233
 * @param {string[]} propArray
234
 * @returns {EventEmitter}
235
 */
236
function getNestedEventEmitter(target, propArray) {
237
  let nested = target
138✔
238
  for (const propertyKey of propArray) {
138✔
239
    if (!Reflect.has(nested, propertyKey)) {
42✔
240
      throw new ReferenceError(`${propertyKey} is not defined`)
12✔
241
    }
242
    nested = nested[propertyKey]
30✔
243
  }
244
  if (!(nested instanceof EventEmitter)) {
126✔
245
    throw new TypeError(
12✔
246
      `${
247
        propArray.length === 0 ? '[target]' : propArray[propArray.length - 1]
12!
248
      } is not an EventEmitter`
249
    )
250
  }
251
  return nested
114✔
252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc