• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepstreamIO / deepstream.io-client-js / 5157468952

pending completion
5157468952

push

github

jaime-ez
7.0.2

880 of 1746 branches covered (50.4%)

1984 of 3089 relevant lines covered (64.23%)

17.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.01
/src/record/record-handler.ts
1
import * as utils from '../util/utils'
2
import { EVENT, RECORD_ACTION, RecordMessage, TOPIC, RecordData, RecordPathData, ListenMessage, Message } from '../constants'
1✔
3
import { Services } from '../deepstream-client'
×
4
import { Options } from '../client-options'
×
5
import { RecordCore, WriteAckCallback } from './record-core'
×
6
import { Record } from './record'
×
7
import { AnonymousRecord } from './anonymous-record'
×
8
import { List } from './list'
9
import { Dequeue } from './dequeue'
×
10
import { Listener, ListenCallback } from '../util/listener'
11
import { SingleNotifier } from './single-notifier'
×
12
import { WriteAcknowledgementService } from './write-ack-service'
×
13
import { DirtyService } from './dirty-service'
14
import { MergeStrategyService } from './merge-strategy-service'
×
15
import { MergeStrategy } from './merge-strategy'
16
import {BulkSubscriptionService} from '../util/bulk-subscription-service'
×
17

18
// type SubscribeActions = RECORD_ACTION.SUBSCRIBEANDHEAD | RECORD_ACTION.SUBSCRIBECREATEANDREAD
1✔
19

×
20
export interface RecordServices {
×
21
  bulkSubscriptionService: { [index: number]: BulkSubscriptionService<RECORD_ACTION> }
22
  writeAckService: WriteAcknowledgementService
1✔
23
  readRegistry: SingleNotifier<RecordMessage>,
58✔
24
  headRegistry: SingleNotifier<RecordMessage>,
58!
25
  dirtyService: DirtyService,
×
26
  mergeStrategy: MergeStrategyService
27
}
×
28

×
29
export class RecordHandler {
30
  private recordCores = new Map<string, RecordCore>()
31
  private notifyCallbacks = new Map<string, Function>()
×
32
  private recordServices: RecordServices
33
  private dirtyService: DirtyService
1✔
34

1✔
35
  constructor (private services: Services, private options: Options, recordServices?: RecordServices, private listener: Listener = new Listener(TOPIC.RECORD, services)) {
1✔
36
    this.recordServices = recordServices || {
1✔
37
      bulkSubscriptionService: {
1✔
38
        [RECORD_ACTION.SUBSCRIBECREATEANDREAD]: this.getBulkSubscriptionService(RECORD_ACTION.SUBSCRIBECREATEANDREAD),
1✔
39
        [RECORD_ACTION.SUBSCRIBEANDREAD]: this.getBulkSubscriptionService(RECORD_ACTION.SUBSCRIBEANDREAD),
1✔
40
        [RECORD_ACTION.SUBSCRIBEANDHEAD]: this.getBulkSubscriptionService(RECORD_ACTION.SUBSCRIBEANDHEAD)
1✔
41
      },
1✔
42
      writeAckService: new WriteAcknowledgementService(services),
1✔
43
      readRegistry: new SingleNotifier(services, RECORD_ACTION.READ, options.recordReadTimeout),
1✔
44
      headRegistry: new SingleNotifier(services, RECORD_ACTION.HEAD, options.recordReadTimeout),
1✔
45
      dirtyService: new DirtyService(services.storage, options.dirtyStorageName),
1✔
46
      mergeStrategy: new MergeStrategyService(services, options.mergeStrategy)
1✔
47
    } as RecordServices
1✔
48
    this.dirtyService = this.recordServices.dirtyService
1✔
49

50
    this.sendUpdatedData = this.sendUpdatedData.bind(this)
51
    this.onMergeCompleted = this.onMergeCompleted.bind(this)
58!
52
    this.getRecordCore = this.getRecordCore.bind(this)
58✔
53
    this.removeRecord = this.removeRecord.bind(this)
58✔
54
    this.onBulkSubscriptionSent = this.onBulkSubscriptionSent.bind(this)
58✔
55
    this.services.connection.registerHandler(TOPIC.RECORD, this.handle.bind(this))
58✔
56
    this.services.connection.onReestablished(this.syncDirtyRecords.bind(this))
58✔
57

58!
58
    if (this.services.connection.isConnected) {
59
      this.syncDirtyRecords()
60
    }
61
  }
62

63
  /**
64
   * Returns all the available data-sync names.
65
   *
66
   * Please note: Lists, AnonymousRecords and Records are all essentially
67
   * the same thing within the SDK, so this array will contain a list of
68
   * everything.
69
   *
58✔
70
   * Due to how records work as well even after a discard this list will
58✔
71
   * take a while to update. This is intentional as their is an option for
58✔
72
   * how long a record will survive before being discarded! You can change that
58✔
73
   * via the `recordDiscardTimeout: milliseconds` option.
58✔
74
   */
58✔
75
  public names (): string[] {
58✔
76
    return [...this.recordCores.keys()]
58✔
77
  }
58!
78

58✔
79
  public setMergeStrategy (recordName: string, mergeStrategy: MergeStrategy): void {
80
    if (typeof mergeStrategy === 'function') {
81
      this.recordServices.mergeStrategy.setMergeStrategyByName(recordName, mergeStrategy)
82
    } else {
83
      throw new Error('Invalid merge strategy: Must be a Function')
84
    }
85
  }
86

87
  public setMergeStrategyRegExp (regexp: RegExp, mergeStrategy: MergeStrategy): void {
88
    if (typeof mergeStrategy === 'function') {
89
      this.recordServices.mergeStrategy.setMergeStrategyByPattern(regexp, mergeStrategy)
90
    } else {
91
      throw new Error('Invalid merge strategy: Must be a Function')
92
    }
93
  }
1✔
94

×
95
  /**
96
 * Returns an existing record or creates a new one.
1✔
97
 *
×
98
 * @param   {String} name the unique name of the record
×
99
 */
100
  public getRecord (name: string): Record {
101
    return new Record(this.getRecordCore(name))
×
102
  }
103

104
  /**
1✔
105
   * Returns an existing List or creates a new one. A list is a specialised
×
106
   * type of record that holds an array of recordNames.
×
107
   *
108
   * @param   {String} name       the unique name of the list
109
   */
×
110
  public getList (name: string): List {
111
    return new List(this.getRecordCore(name))
112
  }
113

114
  /**
115
   * Returns an existing Dequeue or creates a new one. A Dq is a specialised
116
   * type of record that provides a double-ended queue interface.
117
   *
1✔
118
   * @param   {String} name       the unique name of the Dq
×
119
   */
120
  public getDq (name: string): Dequeue {
121
    return new Dequeue(this.getRecordCore(name))
122
  }
123

124
  /**
125
   * Returns an anonymous record. A anonymous record is effectively
126
   * a wrapper that mimicks the API of a record, but allows for the
1✔
127
   * underlying record to be swapped without loosing subscriptions etc.
×
128
   *
129
   * This is particularly useful when selecting from a number of similarly
130
   * structured records. E.g. a list of users that can be choosen from a list
131
   *
132
   * The only API difference to a normal record is an additional setName( name ) method.
133
   */
134
  public getAnonymousRecord (): AnonymousRecord {
135
    return new AnonymousRecord(this.getRecordCore)
1✔
136
  }
×
137

138
  /**
139
   * Allows to listen for record subscriptions made by this or other clients. This
140
   * is useful to create "active" data providers, e.g. providers that only provide
141
   * data for a particular record if a user is actually interested in it
142
   *
143
   * @param   {String}   pattern  A combination of alpha numeric characters and wildcards( * )
144
   * @param   {Function} callback
145
   */
146
  public listen (pattern: string, callback: ListenCallback): void {
147
    this.listener.listen(pattern, callback)
148
  }
1✔
149

×
150
  /**
151
   * Removes a listener that was previously registered with listenForSubscriptions
152
   *
153
   * @param   {String}   pattern  A combination of alpha numeric characters and wildcards( * )
154
   */
155
  public unlisten (pattern: string): void {
156
    this.listener.unlisten(pattern)
157
  }
158

159
  /**
1✔
160
   * Retrieve the current record data without subscribing to changes
×
161
   *
162
   * @param   {String}  name the unique name of the record
163
   * @param   {Function}  callback
164
   */
165
  public snapshot (name: string): Promise<RecordData>
166
  public snapshot (name: string, callback: (error: string | null, data: RecordData) => void): void
167
  public snapshot (name: string, callback?: (error: string | null, data: RecordData) => void): void | Promise<RecordData> {
1✔
168
    if (typeof name !== 'string' || name.length === 0) {
×
169
      throw new Error('invalid argument: name')
170
    }
1✔
171
    if (callback !== undefined && typeof callback !== 'function') {
8✔
172
      throw new Error('invalid argument: callback')
8✔
173
    }
2✔
174

175
    const recordCore = this.recordCores.get(name)
6!
176
    if (recordCore) {
×
177
      if (callback) {
178
        recordCore.whenReady(null, () => {
6✔
179
          callback(null, recordCore.get())
6!
180
        })
×
181
      } else {
×
182
        return new Promise((resolve, reject) => {
×
183
          recordCore.whenReady(null, () => {
184
            resolve(recordCore.get())
185
          })
186
        })
×
187
      }
×
188
      return
×
189
    }
190

191
    if (callback) {
192
      this.recordServices.readRegistry.request(name, callback)
×
193
    } else {
194
      return new Promise((resolve, reject) => {
6✔
195
        this.recordServices.readRegistry.request(name, (error, data) => error ? reject(error) : resolve(data))
3✔
196
      })
197
    }
198
  }
3✔
199

3✔
200
  /**
201
   * Allows the user to query to see whether or not the record exists.
202
   *
203
   * @param   {String}  name the unique name of the record
1✔
204
   * @param   {Function}  callback
10✔
205
   */
10✔
206
  public has (name: string): Promise<boolean>
2✔
207
  public has (name: string, callback: (error: string | null, has: boolean | null) => void): void
208
  public has (name: string, callback?: (error: string | null, has: boolean | null) => void): Promise<boolean> | void {
8!
209
    if (typeof name !== 'string' || name.length === 0) {
×
210
      throw new Error('invalid argument: name')
211
    }
212
    if (callback !== undefined && typeof callback !== 'function') {
8✔
213
      throw new Error('invalid argument: callback')
4✔
214
    }
4✔
215

4✔
216
    let cb
217
    if (!callback) {
218
      return new Promise ((resolve, reject) => {
4✔
219
        cb = (error: string | null, version: number) => error ? reject(error) : resolve(version !== -1)
4✔
220
        this.head(name, cb)
221
      })
1✔
222
    }
16✔
223
    cb = (error: string | null, version: number) => error ? callback(error, null) : callback(null, version !== -1)
16✔
224
    this.head(name, cb)
2✔
225
  }
226

14!
227
  /**
×
228
   * Allows the user to query for the version number of a record.
229
   *
14✔
230
   * @param   {String}  name the unique name of the record
14!
231
   * @param   {Function}  callback
×
232
   */
×
233
  public head (name: string): Promise<number>
×
234
  public head (name: string, callback: (error: string | null, version: number) => void): void
235
  public head (name: string, callback?: (error: string | null, version: number) => void): void | Promise<number> {
236
    if (typeof name !== 'string' || name.length === 0) {
237
      throw new Error('invalid argument: name')
×
238
    }
×
239
    if (callback !== undefined && typeof callback !== 'function') {
×
240
      throw new Error('invalid argument: callback')
241
    }
242

243
    const recordCore = this.recordCores.get(name)
×
244
    if (recordCore) {
245
      if (callback) {
14✔
246
        recordCore.whenReady(null, () => {
11✔
247
          callback(null, recordCore.version as number)
248
        })
249
      } else {
3✔
250
        return new Promise((resolve, reject) => {
3✔
251
          recordCore.whenReady(null, () => {
252
            resolve(recordCore.version as number)
253
          })
254
        })
1✔
255
      }
12✔
256
      return
12✔
257
    }
12✔
258

25✔
259
    if (callback) {
260
      this.recordServices.headRegistry.request(name, callback)
12✔
261
    } else {
12✔
262
      return new Promise((resolve, reject) => {
7✔
263
        this.recordServices.headRegistry.request(name, (error, data) => error ? reject(error) : resolve(data))
7✔
264
      })
7✔
265
    }
266
  }
267

5✔
268
  /**
269
   * A wrapper function around setData. The function works exactly
1✔
270
   * the same however when a callback is omitted a Promise will be
15✔
271
   * returned.
5✔
272
   *
273
   * @param {String}          recordName     the name of the record to set
1✔
274
   * @param {String|Object}   pathOrData     the path to set or the data to write
275
   * @param {Object|Function} dataOrCallback the data to write or the write acknowledgement
×
276
   *                                         callback
277
   * @param {Function}        callback       the callback that will be called with the result
1✔
278
   *                                         of the write
×
279
   * @returns {Promise} if a callback is omitted a Promise will be returned that resolves
×
280
   *                    with the result of the write
×
281
   */
×
282
  public setDataWithAck (recordName: string, data: RecordData | undefined, callback?: WriteAckCallback): Promise<string | void> | void
×
283
  public setDataWithAck (recordName: string, path: string, data: RecordData | undefined, callback?: WriteAckCallback): Promise<string | void> | void
284
  public setDataWithAck (recordName: string, ...rest: any[]): Promise<string | void> | void {
×
285
    const args = utils.normalizeSetArguments(arguments, 1)
286
    if (!args.callback) {
×
287
      return new Promise((resolve, reject) => {
×
288
        args.callback = error => error === null ? resolve() : reject(error)
289
        this.sendSetData(recordName, -1, args)
290
      })
291
    }
292
    this.sendSetData(recordName, -1, args)
293
  }
×
294

×
295
  /**
296
   * Allows setting the data for a record without being subscribed to it. If
297
   * the client is subscribed to the record locally, the update will be proxied
×
298
   * through the record object like a normal call to Record.set. Otherwise a force
×
299
   * write will be performed that overwrites any remote data.
300
   *
301
   * @param {String} recordName the name of the record to write to
302
   * @param {String|Object} pathOrData either the path to write data to or the data to
1✔
303
   *                                   set the record to
17✔
304
   * @param {Object|Primitive|Function} dataOrCallback either the data to write to the
17!
305
   *                                                   record or a callback function
×
306
   *                                                   indicating write success
307
   * @param {Function} callback if provided this will be called with the result of the
17!
308
   *                            write
×
309
   */
310
  public setData (recordName: string, data: RecordData): void
17✔
311
  public setData (recordName: string, path: string, data: RecordPathData | undefined, callback: WriteAckCallback): void
17!
312
  public setData (recordName: string, pathOrData: string | RecordData, dataOrCallback: RecordPathData | WriteAckCallback | undefined, callback?: WriteAckCallback): void
×
313
  public setData (recordName: string): void {
×
314
    const args = utils.normalizeSetArguments(arguments, 1)
315
    this.sendSetData(recordName, -1, args)
316
  }
17✔
317

11✔
318
  public delete (recordName: string, callback?: (error: string | null) => void): void | Promise<void> {
6✔
319
    // TODO: Use a delete service to make the logic in record core and here common
320
    throw Error('Delete is not yet supported without use of a Record')
321
  }
5✔
322

323
  public notify (recordNames: string[], callback?: (error: string) => void): void | Promise<void> {
324
    if (!this.services.connection.isConnected) {
325
      if (callback) {
6✔
326
        callback(EVENT.CLIENT_OFFLINE)
327
        return
17✔
328
      }
329
      return new Promise((resolve, reject) => reject(EVENT.CLIENT_OFFLINE))
330
    }
331

332
    const correlationId = utils.getUid()
333

334
    this.services.connection.sendMessage({
335
      topic: TOPIC.RECORD,
17✔
336
      action: RECORD_ACTION.NOTIFY,
14✔
337
      names: recordNames,
338
      correlationId
339
    })
3✔
340

341
    if (callback) {
342
      this.notifyCallbacks.set(correlationId, callback)
1✔
343
    } else {
×
344
      return new Promise((resolve, reject) => {
345
        this.notifyCallbacks.set(correlationId, (error: string) => error ? reject(error) : resolve())
1✔
346
      })
4✔
347
    }
4✔
348
  }
2✔
349

350
  private sendSetData (recordName: string, version: number, args: utils.RecordSetArguments): void {
351
    const { path, data, callback } = args
2✔
352
    if (!recordName || typeof recordName !== 'string' || recordName.length === 0) {
2✔
353
      throw new Error('invalid argument: recordName must be an non empty string')
354
    }
355
    if (!path && (data === null || typeof data !== 'object')) {
356
      throw new Error('invalid argument: data must be an object when no path is provided')
357
    }
358

359
    const recordCores = this.recordCores.get(recordName)
360
    if (recordCores) {
361
      recordCores.set({ path, data, callback })
1✔
362
      return
15✔
363
    }
15!
364

365
    let action
×
366
    if (path) {
×
367
      if (data === undefined) {
×
368
        action = RECORD_ACTION.ERASE
×
369
      } else {
370
        action = RECORD_ACTION.CREATEANDPATCH
371
      }
×
372
    } else {
373
      action = RECORD_ACTION.CREATEANDUPDATE
×
374
    }
375

15!
376
    const message = {
×
377
      topic: TOPIC.RECORD,
×
378
      action,
379
      name: recordName,
15!
380
      path,
381
      version,
382
      parsedData: data
383
    }
×
384

×
385
    if (callback) {
386
      this.recordServices.writeAckService.send(message, callback)
15✔
387
    } else {
8✔
388
      this.services.connection.sendMessage(message)
8✔
389
    }
390
  }
7✔
391

2✔
392
  public saveToOfflineStorage () {
1✔
393
    this.recordCores.forEach(recordCore => recordCore.saveRecordToOffline())
394
  }
395

1✔
396
  public clearOfflineStorage (): Promise<void>
397
  public clearOfflineStorage (callback: (error: string | null) => void): void
2✔
398
  public clearOfflineStorage (callback?: (error: string | null) => void): Promise<void> | void {
399
    if (callback) {
5!
400
      this.services.storage.reset(callback)
×
401
    } else {
×
402
      return new Promise((resolve, reject) => {
403
        this.services.storage.reset(error => error ? reject(error) : resolve())
404
      })
405
    }
406
  }
407

408
  /**
409
   * Will be called by the client for incoming messages on the RECORD topic
×
410
   *
411
   * @param   {Object} message parsed and validated deepstream message
5!
412
   */
413
  private handle (message: RecordMessage) {
5✔
414
    if (
2✔
415
      (message.action === RECORD_ACTION.NOTIFY && message.isAck) ||
416
      (message.isError && message.action === RECORD_ACTION.RECORD_NOTIFY_ERROR)
417
     ) {
3✔
418
      const callback = this.notifyCallbacks.get(message.correlationId!)
419
      if (callback) {
5✔
420
        callback(message.data || null)
421
        this.notifyCallbacks.delete(message.correlationId!)
×
422
      } else {
×
423
        this.services.logger.error(message, RECORD_ACTION.NOTIFY)
×
424
      }
×
425
      return
426
    }
×
427

×
428
    if (message.isAck) {
429
      this.services.timeoutRegistry.remove(message)
×
430
      return
431
    }
432

×
433
    if (
434
      message.action === RECORD_ACTION.SUBSCRIPTION_FOR_PATTERN_FOUND ||
×
435
      message.action === RECORD_ACTION.SUBSCRIPTION_FOR_PATTERN_REMOVED ||
×
436
      message.action === RECORD_ACTION.LISTEN ||
×
437
      message.action === RECORD_ACTION.UNLISTEN
438
    ) {
×
439
      this.listener.handle(message as ListenMessage)
440
      return
441
    }
442

443
    if (message.isWriteAck && message.action !== RECORD_ACTION.VERSION_EXISTS) {
444
      this.recordServices.writeAckService.recieve(message)
1✔
445
      return
×
446
    }
447

1✔
448
    if (message.action === RECORD_ACTION.READ_RESPONSE || message.originalAction === RECORD_ACTION.READ) {
×
449
      if (message.isError) {
×
450
        this.recordServices.readRegistry.recieve(message, RECORD_ACTION[message.action])
×
451
      } else {
×
452
        this.recordServices.readRegistry.recieve(message, null, message.parsedData)
453
      }
×
454
      return
455
    }
1✔
456

58✔
457
    if (
458
      message.action === RECORD_ACTION.HEAD_RESPONSE_BULK
459
    ) {
460
      Object.keys(message.versions!).forEach(name => {
1✔
461
        this.recordServices.headRegistry.recieve({
462
          topic: TOPIC.RECORD,
58✔
463
          action: RECORD_ACTION.HEAD_RESPONSE,
58✔
464
          originalAction: RECORD_ACTION.HEAD,
58✔
465
          name,
×
466
          version: message.versions![name]
×
467
        }, null, message.versions![name])
×
468
      })
469
      return
×
470
    }
471

×
472
    if (
473
      message.action === RECORD_ACTION.HEAD_RESPONSE ||
474
      message.originalAction === RECORD_ACTION.HEAD
×
475
    ) {
476
      if (message.isError) {
58✔
477
        this.recordServices.headRegistry.recieve(message, RECORD_ACTION[message.action])
58!
478
      } else {
479
        this.recordServices.headRegistry.recieve(message, null, message.version)
58!
480
      }
481
      return
482
    }
1✔
483

×
484
    const recordCore = this.recordCores.get(message.name)
×
485
    if (recordCore) {
486
      recordCore.handle(message)
×
487
      return
×
488
    }
489

×
490
    if (
×
491
      message.action === RECORD_ACTION.VERSION_EXISTS
×
492
    ) {
493
      return
494
    }
×
495

×
496
    if (
497
      message.action === RECORD_ACTION.SUBSCRIPTION_HAS_PROVIDER ||
498
      message.action === RECORD_ACTION.SUBSCRIPTION_HAS_NO_PROVIDER
499
    ) {
×
500
      // record can receive a HAS_PROVIDER after discarding the record
501
      return
1✔
502
    }
×
503

×
504
    if (message.isError) {
505
      this.services.logger.error(message)
1✔
506
      return
×
507
    }
508

1✔
509
    this.services.logger.error(message, EVENT.UNSOLICITED_MESSAGE)
×
510
  }
×
511

512
  /**
513
   * Callback for 'deleted' and 'discard' events from a record. Removes the record from
1✔
514
   * the registry
515
   */
1✔
516
  private removeRecord (recordName: string) {
517
    this.recordCores.delete(recordName)
518
  }
519

520
  private getRecordCore (recordName: string): RecordCore<any> {
521
    let recordCore = this.recordCores.get(recordName)
522
    if (!recordCore) {
523
      recordCore = new RecordCore(recordName, this.services, this.options, this.recordServices, this.removeRecord)
524
      this.recordCores.set(recordName, recordCore)
525
    }
526
    return recordCore
527
  }
528

529
  private syncDirtyRecords () {
530
    this.dirtyService.whenLoaded(this, this._syncDirtyRecords)
531
  }
532

533
  // TODO: Expose issues here, as there isn't a reason why a record core needs to exist in
534
  // order to sync up
535
  private _syncDirtyRecords () {
536
    const dirtyRecords = this.dirtyService.getAll()
537
    for (const [recordName] of dirtyRecords) {
538
      const recordCore = this.recordCores.get(recordName)
539
      if (recordCore && recordCore.references.size > 0) {
540
        // if it isn't zero the record core takes care of it
541
        continue
542
      }
543
      this.services.storage.get(recordName, this.sendUpdatedData)
544
    }
545
  }
546

547
  private sendUpdatedData (recordName: string, version: number, data: RecordData) {
548
    if (version === -1) {
549
      // deleted locally, how to merge?
550
      this.services.logger.warn({ topic: TOPIC.RECORD }, RECORD_ACTION.DELETE, "Deleted record while offline, can't resolve")
551
      return
552
    }
553

554
    const callback = (error: string | null, name: string) => {
555
      if (!error) {
556
        this.dirtyService.setDirty(name, false)
557
      } else {
558
        this.recordServices.readRegistry.register(name, this, message => {
559
          this.recordServices.mergeStrategy.merge(
560
            message,
561
            version,
562
            data,
563
            this.onMergeCompleted,
564
            this
565
          )
566
        })
567
      }
568
    }
569
    this.sendSetData(recordName, version, { data, callback })
570
  }
571

572
  private onMergeCompleted (error: string | null, { name, version }: RecordMessage, mergeData: RecordData) {
573
    this.sendSetData(name, version! + 1, { data: mergeData })
574
  }
575

576
  private getBulkSubscriptionService (bulkSubscribe: RECORD_ACTION) {
577
    return new BulkSubscriptionService<RECORD_ACTION>(
578
        this.services, this.options.subscriptionInterval, TOPIC.RECORD,
579
        bulkSubscribe, RECORD_ACTION.UNSUBSCRIBE,
580
        this.onBulkSubscriptionSent
581
      )
582
  }
583

584
  private onBulkSubscriptionSent (message: Message) {
585
    if (!message.names) {
586
      this.services.timeoutRegistry.add({ message })
587
    }
588
  }
589
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc