• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepstreamIO / deepstream.io-client-js / 26456165204

26 May 2026 02:52PM UTC coverage: 66.989% (+3.2%) from 63.776%
26456165204

push

github

jaime-ez
7.1.0

738 of 1431 branches covered (51.57%)

2145 of 3202 relevant lines covered (66.99%)

18.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.96
/src/record/dequeue.ts
1
import * as utils from '../util/utils'
2
import { EVENT, RecordMessage } from '../constants'
1✔
3
import { RecordCore, WriteAckCallback } from './record-core'
1✔
4
import { Emitter } from '../util/emitter'
1!
5

×
6
type ListSubscriptionCallback = (entries: string[]) => void
×
7
type Head = {p: string, n: string}
1✔
8
type Node = {d: string, p: string, n: string}
9

1✔
10
export class Dequeue extends Emitter {
1!
11
    public  debugId = this.record.getDebugId()
×
12
    private wrappedFunctions: Map<Function, Function> = new Map()
1✔
13
    private originalApplyUpdate: Function
1✔
14

1!
15
    private beforeStructure: any
16
    private headPath: string = 'h'
17
    private emptyHead: Head = {p: '', n: ''}
1✔
18

1!
19
    private hasAddListener: boolean = false
×
20
    private hasRemoveListener: boolean = false
×
21
    private hasMoveListener: boolean = false
×
22
    private subscriptions: utils.RecordSubscribeArguments[] = []
×
23
    private lastEntries: string[] = []
24
    private lastVersion: number = 0
×
25

26
    constructor (private record: RecordCore<Dequeue>) {
1✔
27
        super()
28
        this.originalApplyUpdate = this.record.applyUpdate.bind(this.record)
1✔
29
        this.record.applyUpdate = this.applyUpdate.bind(this)
×
30
        this.record.addReference(this)
×
31
        this.record.on('discard', () => this.emit('discard', this), this)
×
32
        this.record.on('delete', () => this.emit('delete', this), this)
×
33
        this.record.on('error', (...args: any[]) => this.emit('error', ...args), this)
×
34
    }
35

×
36
    get name (): string {
37
        return this.record.name
×
38
    }
×
39

40
    get isReady (): boolean {
×
41
        return this.record.isReady
42
    }
×
43

44
    get version (): number {
1✔
45
        return this.record.version as number
×
46
    }
×
47

×
48
    public whenReady (): Promise<Dequeue>
×
49
    public whenReady (callback: ((dq: Dequeue) => void)): void
50
    public whenReady (callback?: ((dq: Dequeue) => void)): void | Promise<Dequeue> {
51
        if (callback) {
×
52
            this.record.whenReady(this, callback)
53
        } else {
1✔
54
            return this.record.whenReady(this)
1✔
55
        }
1✔
56
    }
1✔
57

1✔
58
    public discard (): void {
1✔
59
      this.destroy()
1✔
60
      this.record.removeReference(this)
61
    }
7✔
62

7✔
63
    public delete (callback: (error: string | null) => void): void
7✔
64
    public delete (): Promise<void>
7✔
65
    public delete (callback?: (error: string | null) => void): void | Promise<void> {
7✔
66
      this.destroy()
7✔
67
      return this.record.delete(callback)
7✔
68
    }
7✔
69

7✔
70
    /**
7✔
71
     * Returns the array of list entries or an
7✔
72
     * empty array if the list hasn't been populated yet.
7✔
73
     */
7✔
74
    public getEntries (): string[] {
7✔
75
      // did we computed already?
7✔
76
      if (this.version === this.lastVersion) {
7✔
77
        return this.lastEntries
7✔
78
      }
7✔
79
      const head = this.record.get(this.headPath) as Head || this.emptyHead
×
80
      let next = head.n
×
81
      const entries = []
×
82
      while(next) {
83
        const node = this.record.get(next) as Node
×
84
        entries.push(node.d)
85
        next = node.n
7✔
86
      }
87
      // memoize
1✔
88
      this.lastVersion = this.version
89
      this.lastEntries = entries
×
90

91
      return entries as string[]
92
    }
93

94
    /**
1✔
95
   * Returns true if the list is empty
96
   */
×
97
    public isEmpty (): boolean {
98
      const head = this.record.get(this.headPath) as Head || this.emptyHead
99
      return !head.n && !head.p
100
    }
101

1✔
102
    /**
103
    * Updates the list with a new set of entries
13✔
104
    */
105
    public setEntriesWithAck (entries: string[]): Promise<void>
106
    public setEntriesWithAck (entries: string[], callback: WriteAckCallback): void
107
    public setEntriesWithAck (entries: string[], callback?: WriteAckCallback): Promise<void> | void {
108
        if (!callback) {
1✔
109
            return new Promise(( resolve, reject ) => {
×
110
                this.setEntries(entries, (error: string | null) => {
×
111
                    if (error) {
112
                        reject(error)
113
                    } else {
×
114
                        resolve()
115
                    }
116
                })
1✔
117
            })
×
118
        }
×
119
        this.setEntries(entries, callback)
120
    }
1✔
121

×
122
    /**
×
123
    * Updates the list with a new set of entries
124
    */
125
    public setEntries (entries: string[], callback?: WriteAckCallback) {
126
        const errorMsg = 'entries must be an array'
127

128
        if (!(entries instanceof Array)) {
1✔
129
            throw new Error(errorMsg)
130
        }
5!
131

×
132
        if (entries.length === 0) {
133
          this.record.set({data: {[this.headPath]: this.emptyHead}, callback})
5!
134
          return
5✔
135
        }
5✔
136

5✔
137
        const head = {n: '0', p: `${entries.length -1}`}
9✔
138
        const nodes = {} as Record<string,Node>
9✔
139
        for (let index = 0; index < entries.length; index++) {
9✔
140
          nodes[index] = {
141
            d: entries[index],
142
            n: `${index + 1}`,
5✔
143
            p: `${index - 1}`
5✔
144
          }
5✔
145
          if (index === 0) {
146
            /* tslint:disable:no-string-literal */
147
            nodes[index]['p'] = ''
148
            /* tslint:enable:no-string-literal */
149

1✔
150
          }
6!
151
          if (index === entries.length - 1) {
6✔
152
            /* tslint:disable:no-string-literal */
153
            nodes[index]['n'] = ''
1✔
154
            /* tslint:enable:no-string-literal */
×
155
          }
×
156
        }
×
157
        this.beforeChange()
×
158
        this.record.set({ data: {[this.headPath]: head, ...nodes}, callback })
×
159
        this.afterChange()
×
160
    }
161

162
    /**
×
163
     * Get last entry
164
     */
165
    public getLast () : string {
166
      const head = this.record.get(this.headPath) as Head || this.emptyHead
167
      if (!head.p) return ''
×
168
      const node = this.record.get(head.p) as Node
169
      return node.d
170
    }
171

172
    /**
1✔
173
     * Get first entry
174
     */
1✔
175
    public getFirst () : string {
1!
176
      const head = this.record.get(this.headPath) as Head || this.emptyHead
×
177
      if (!head.n) return ''
178
      const node = this.record.get(head.n) as Node
1!
179
      return node.d
×
180
    }
×
181

182
    /**
1✔
183
     * Remove last value and return it
1✔
184
     *
1✔
185
     */
1✔
186
    public pop () : string {
187
      // when the array is 1 or 2 entries its easier to just set it again to avoid complications with the head reference
188
      const entries = this.getEntries()
189
      if (entries.length < 3) {
190
        const pop = entries.pop() || ''
1✔
191
        this.setEntries(entries)
192
        return pop
1✔
193
      }
194
      const head = this.record.get(this.headPath) as Head || this.emptyHead
195
      if (!head.p) return ''
1✔
196
      const last = this.record.get(head.p) as Node
197
      this.beforeChange()
1✔
198
      // update head
199
      this.record.set({path: `${this.headPath}.p`, data: last.p})
200
      // update last node
201
      if (last.p) this.record.set({path: `${last.p}.n`, data: ''})
1✔
202
      // erase node
1✔
203
      this.record.set({path: head.p, data: undefined})
1✔
204

205
      this.afterChange()
206
      return last.d
207
    }
208

1✔
209
    /**
×
210
     * Remove first value and return it
×
211
     */
×
212
    public shift () : string {
×
213
      // when the array is 1 or 2 entries its easier to just set it again to avoid complications with the head reference
×
214
      const entries = this.getEntries()
215
      if (entries.length < 3) {
216
        const shift = entries.shift() || ''
217
        this.setEntries(entries)
218
        return shift
1✔
219
      }
×
220
      const head = this.record.get(this.headPath) as Head || this.emptyHead
×
221
      if (!head.n) return ''
×
222
      const first = this.record.get(head.n) as Node
×
223
      this.beforeChange()
×
224
      // update head
225
      this.record.set({path: `${this.headPath}.n`, data: first.n})
226
      // update first node
227
      if (first.n) this.record.set({path: `${first.n}.p`, data: ''})
228
      // erase node
229
      this.record.set({path: head.n, data: undefined})
1✔
230

231
      this.afterChange()
×
232
      return first.d
×
233
    }
×
234

×
235
    /**
×
236
     * Get first available key for inserting node
237
     */
×
238
    private getNodeKey () : string {
×
239
      const sortedKeys = Object.keys(this.record.get() as Record<string, Node>).sort((a, b) => Number(a) - Number(b))
×
240
      let key = `${sortedKeys.length}`
×
241
      for (let index = 1; index < sortedKeys.length; index++) {
×
242
        if (index !== Number(sortedKeys[index])) {
243
          key = `${index}`
×
244
          break
245
        }
×
246
      }
×
247
      return key
248
    }
×
249

×
250
    /**
×
251
     * Add value at first position. Issues a single atomic PATCH_MULTI message
252
     * to the server (one version bump, no race window between sub-writes).
253
     * Pass a callback (or use unshiftWithAck) to detect errors on old servers
254
     * that reject PATCH_MULTI as INVALID_MESSAGE_DATA — the caller can then
255
     * fall back to setEntries with the desired ordering.
1✔
256
     *
257
     * @param {String} entry
×
258
     */
×
259
    public unshift (entry: string, callback?: WriteAckCallback) : void {
×
260
      // check if empty
×
261
      if (this.isEmpty()) {
×
262
        this.setEntries([entry], callback)
263
        return
×
264
      }
×
265
      const head = this.record.get(this.headPath) as Head || this.emptyHead
×
266
      const key = this.getNodeKey()
×
267
      const patches: Array<{ path: string, data: any }> = [
×
268
        { path: key, data: { d: entry, p: '', n: head.n } }
269
      ]
×
270
      if (head.n) {
271
        patches.push({ path: `${head.n}.p`, data: key })
×
272
      }
×
273
      patches.push({ path: `${this.headPath}.n`, data: key })
274
      this.beforeChange()
×
275
      this.record.setMulti({ patches, callback })
×
276
      this.afterChange()
×
277
    }
278

279
    public unshiftWithAck (entry: string): Promise<void>
280
    public unshiftWithAck (entry: string, callback: WriteAckCallback): void
281
    public unshiftWithAck (entry: string, callback?: WriteAckCallback): Promise<void> | void {
1✔
282
      if (!callback) {
5✔
283
        return new Promise((resolve, reject) => {
5✔
284
          this.unshift(entry, (error: string | null) => error ? reject(error) : resolve())
5✔
285
        })
5✔
286
      }
5✔
287
      this.unshift(entry, callback)
5✔
288
    }
289

290
    /**
5✔
291
     * Add value at last position. See unshift() for atomicity semantics.
292
     *
293
     * @param {String} entry
294
     */
295
    public push (entry: string, callback?: WriteAckCallback) : void {
296
      // check if empty
297
      if (this.isEmpty()) {
298
        this.setEntries([entry], callback)
299
        return
300
      }
301
      const head = this.record.get(this.headPath) as Head || this.emptyHead
1✔
302
      const key = this.getNodeKey()
303
      const patches: Array<{ path: string, data: any }> = [
5✔
304
        { path: key, data: { d: entry, p: head.p, n: '' } }
1✔
305
      ]
1✔
306
      if (head.p) {
307
        patches.push({ path: `${head.p}.n`, data: key })
4!
308
      }
4✔
309
      patches.push({ path: `${this.headPath}.p`, data: key })
4✔
310
      this.beforeChange()
311
      this.record.setMulti({ patches, callback })
312
      this.afterChange()
4✔
313
    }
4✔
314

315
    public pushWithAck (entry: string): Promise<void>
4✔
316
    public pushWithAck (entry: string, callback: WriteAckCallback): void
4✔
317
    public pushWithAck (entry: string, callback?: WriteAckCallback): Promise<void> | void {
4✔
318
      if (!callback) {
4✔
319
        return new Promise((resolve, reject) => {
320
          this.push(entry, (error: string | null) => error ? reject(error) : resolve())
1✔
321
        })
2✔
322
      }
2✔
323
      this.push(entry, callback)
2✔
324
    }
2✔
325

326
    /**
327
     * Remove a node — atomic PATCH_MULTI batch updating neighbour pointers,
×
328
     * head pointers if needed, and erasing the node itself.
329
     */
330
    private removeNode (nodeKey: string, node: Node, callback?: WriteAckCallback) {
331
      const head = this.record.get(this.headPath) as Head || this.emptyHead
332
      const patches: Array<{ path: string, data: any }> = []
333
      if (node.p) {
334
        patches.push({ path: `${node.p}.n`, data: node.n })
1✔
335
      }
336
      if (node.n) {
1!
337
        patches.push({ path: `${node.n}.p`, data: node.p })
×
338
      }
×
339
      if (head.p === nodeKey) {
340
        patches.push({ path: `${this.headPath}.p`, data: node.p })
1!
341
      }
1✔
342
      if (head.n === nodeKey) {
1✔
343
        patches.push({ path: `${this.headPath}.n`, data: node.n })
344
      }
345
      patches.push({ path: nodeKey, data: undefined })
1✔
346
      this.beforeChange()
1✔
347
      this.record.setMulti({ patches, callback })
348
      this.afterChange()
1✔
349
    }
1✔
350

1✔
351
    /**
1✔
352
     * Removes an entry from the list. If no index is given it will remove the first occurrence
353
     *
1✔
354
     * @param {String} entry
×
355
     * @param {Number} [index]
×
356
     */
×
357
    public removeEntry (entry: string, index?: number) {
×
358
      const head = this.record.get(this.headPath) as Head || this.emptyHead
359
      let next = head.n
360
      let position = 0
×
361
      while(next) {
362
        const node = this.record.get(next) as Node
363
        if (node.d === entry) {
364
          if (!index) {
365
            this.removeNode(next, node)
366
            break
1✔
367
          }
×
368
          if (index && index === position) {
×
369
            this.removeNode(next, node)
×
370
            break
×
371
          }
372
        }
×
373
        ++position
×
374
        next = node.n
375
      }
×
376
      if (!next) {
×
377
        // got to the end without result
378
        throw new Error('no entry removed')
×
379
      }
×
380
    }
381

×
382
  /**
×
383
   * Inserts an entry in the list. At index 0 is equivalent to unshift, at index = entries.length - 1 it will be inserted before last node, unlike push.
×
384
   *
×
385
   * @param {String} entry
386
   * @param {Number} [index]
387
   */
388
    public insertEntry (entry: string, index: number, callback?: WriteAckCallback) {
389
      const hasIndex = this.hasIndex(index)
390

391
      if (index === 0) {
392
        this.unshift(entry, callback)
1✔
393
        return
×
394
      }
×
395

×
396
      if (hasIndex) {
×
397
        const head = this.record.get(this.headPath) as Head || this.emptyHead
×
398
        let next = head.n
×
399
        let position = 0
×
400
        while(next) {
×
401
          const node = this.record.get(next) as Node
×
402
          if (index === position) {
403
            const key = this.getNodeKey()
×
404
            const patches: Array<{ path: string, data: any }> = [
×
405
              { path: key, data: { d: entry, n: next, p: node.p } },
×
406
              { path: `${next}.p`, data: key }
407
            ]
408
            if (node.p) {
×
409
              patches.push({ path: `${node.p}.n`, data: key })
×
410
            }
411
            this.beforeChange()
×
412
            this.record.setMulti({ patches, callback })
413
            this.afterChange()
×
414
            break
415
          }
416
          ++position
417
          next = node.n
418
        }
419

420
        if (!next) {
421
          // got to the end without inserting
422
          throw new Error('Index out of range')
1✔
423
        }
×
424
      }
×
425
    }
×
426

×
427
  /**
428
   * Proxies the underlying Record's subscribe method.
×
429
   */
×
430
    public subscribe (callback: ListSubscriptionCallback): void
×
431
    public subscribe (position: string, callback: ListSubscriptionCallback): void
×
432
    public subscribe (positionOrCallback?: ListSubscriptionCallback | string, callback?: ListSubscriptionCallback): void {
×
433
        const parameters = utils.normalizeArguments(arguments)
×
434

×
435
        if (parameters.path) {
×
436
          if (parameters.path === 'first') {
×
437
            parameters.path = `${this.headPath}.n`
438
          }
439
          else if (parameters.path === 'last') {
440
            parameters.path = `${this.headPath}.p`
×
441
          } else {
×
442
            throw new Error('Position must be "first" or "last" element')
443
          }
×
444
        }
×
445

×
446
        // Make sure the callback is invoked with an empty array for new records
×
447
        const listCallback = function (scope: any, cb: Function) {
448
          if (parameters.path) {
×
449
            if (parameters.path === `${scope.headPath}.p`) {
×
450
              cb(scope.getLast())
451
            }
×
452
            if (parameters.path === `${scope.headPath}.n`) {
453
              cb(scope.getFirst())
×
454
            }
455
          } else {
456
            cb(scope.getEntries())
457
          }
1✔
458
        }.bind(this, this, parameters.callback)
×
459

×
460
        /**
×
461
        * Adding a property onto a function directly is terrible practice,
×
462
        * and we will change this as soon as we have a more seperate approach
463
        * of creating lists that doesn't have records default state.
×
464
        *
×
465
        * The reason we are holding a referencing to wrapped array is so that
466
        * on unsubscribe it can provide a reference to the actual method the
467
        * record is subscribed too.
×
468
        **/
469
        this.wrappedFunctions.set(parameters.callback, listCallback)
470
        parameters.callback = listCallback
471

×
472
        this.subscriptions.push(parameters)
×
473
        this.record.subscribe(parameters, this)
×
474
    }
×
475

476
    /**
×
477
   * Proxies the underlying Record's unsubscribe method. Makes sure
×
478
   * that no path is provided
479
   */
480
    public unsubscribe (callback: ListSubscriptionCallback): void
481
    public unsubscribe (position: string, callback: ListSubscriptionCallback): void
×
482
    public unsubscribe (positionOrCallback?: ListSubscriptionCallback | string, callback?: ListSubscriptionCallback): void {
483
      const parameters = utils.normalizeArguments(arguments)
484
      if (parameters.path) {
485
        if (parameters.path === 'first') {
486
          parameters.path = `${this.headPath}.n`
487
        }
488
        else if (parameters.path === 'last') {
489
          parameters.path = `${this.headPath}.p`
490
        } else {
491
          throw new Error('Position must be "first" or "last" element')
492
        }
493
      }
×
494

×
495
      const listenCallback = this.wrappedFunctions.get(parameters.callback)
×
496
      parameters.callback = listenCallback as (data: any) => void
×
497
      this.wrappedFunctions.delete(parameters.callback)
498
      this.subscriptions = this.subscriptions.filter((subscription: any) => {
1✔
499
        if (!parameters.callback && (subscription.path === parameters.path)) return false
×
500
        if (parameters.callback && (subscription.path === parameters.path && subscription.callback === parameters.callback)) return false
×
501
        return true
×
502
    })
×
503

504
      this.record.unsubscribe(parameters, this)
×
505
    }
×
506

507
    /**
508
     * Proxies the underlying Record's _update method.
×
509
     */
510
    private applyUpdate  (message: RecordMessage) {
511
        this.beforeChange()
×
512
        this.originalApplyUpdate(message)
×
513
        this.afterChange()
×
514
      }
×
515

×
516
    /**
×
517
     * Validates that the index provided is within the current set of entries.
×
518
     */
×
519
    private hasIndex (index?: number) {
×
520
      let hasIndex = false
521
      const entries = this.getEntries()
×
522
      if (index !== undefined) {
523
        if (isNaN(index)) {
524
          throw new Error('Index must be a number')
525
        }
526
        if (index !== entries.length && (index >= entries.length || index < 0)) {
1✔
527
          throw new Error('Index must be within current entries')
7✔
528
        }
7✔
529
        hasIndex = true
7✔
530
      }
531
      return hasIndex
532
    }
533

534
    /**
1✔
535
     * Establishes the current structure of the list, provided the client has attached any
×
536
     * add / move / remove listener
×
537
     *
×
538
     * This will be called before any change to the list, regardsless if the change was triggered
×
539
     * by an incoming message from the server or by the client
×
540
     */
541
    private beforeChange (): void {
×
542
      this.hasAddListener = this.hasListeners(EVENT.ENTRY_ADDED_EVENT)
×
543
      this.hasRemoveListener = this.hasListeners(EVENT.ENTRY_REMOVED_EVENT)
544
      this.hasMoveListener = this.hasListeners(EVENT.ENTRY_MOVED_EVENT)
×
545

546
      if (this.hasAddListener || this.hasRemoveListener || this.hasMoveListener) {
×
547
        this.beforeStructure = this.getStructure()
548
      } else {
549
        this.beforeStructure = null
550
      }
551
    }
552

553
    /**
554
     * Compares the structure of the list after a change to its previous structure and notifies
555
     * any add / move / remove listener. Won't do anything if no listeners are attached.
1✔
556
     */
13✔
557
    private afterChange (): void {
13✔
558
      if (this.beforeStructure === null) {
13✔
559
        return
13!
560
      }
×
561

562
      const after = this.getStructure()
563
      const before = this.beforeStructure
13✔
564
      let entry
565
      let i
566

567
      if (this.hasRemoveListener) {
568
        for (entry in before) {
569
          for (i = 0; i < before[entry].length; i++) {
570
            if (after[entry] === undefined || after[entry][i] === undefined) {
1✔
571
              this.emit(EVENT.ENTRY_REMOVED_EVENT, entry, before[entry][i])
13✔
572
            }
13✔
573
          }
574
        }
×
575
      }
×
576

577
      if (this.hasAddListener || this.hasMoveListener) {
578
        for (entry in after) {
×
579
          if (before[entry] === undefined) {
×
580
            for (i = 0; i < after[entry].length; i++) {
×
581
              this.emit(EVENT.ENTRY_ADDED_EVENT, entry, after[entry][i])
×
582
            }
×
583
          } else {
584
            for (i = 0; i < after[entry].length; i++) {
585
              if (before[entry][i] !== after[entry][i]) {
586
                if (before[entry][i] === undefined) {
587
                  this.emit(EVENT.ENTRY_ADDED_EVENT, entry, after[entry][i])
×
588
                } else {
×
589
                  this.emit(EVENT.ENTRY_MOVED_EVENT, entry, after[entry][i])
×
590
                }
×
591
              }
×
592
            }
593
          }
594
        }
595
      }
×
596
    }
×
597

×
598
    /**
×
599
     * Iterates through the list and creates a map with the entry as a key
600
     * and an array of its position(s) within the list as a value, e.g.
601
     *
×
602
     * {
603
     *   'recordA': [ 0, 3 ],
604
     *   'recordB': [ 1 ],
605
     *   'recordC': [ 2 ]
606
     * }
607
     */
608
    private getStructure (): any {
609
      const structure: any = {}
610
      let i
611
      const entries = this.getEntries()
612

613
      for (i = 0; i < entries.length; i++) {
614
        if (structure[entries[i]] === undefined) {
615
          structure[entries[i]] = [i]
616
        } else {
617
          structure[entries[i]].push(i)
618
        }
619
      }
1✔
620

×
621
      return structure
622
    }
×
623

×
624
    private destroy () {
×
625
      for (let i = 0; i < this.subscriptions.length; i++) {
×
626
        this.record.unsubscribe(this.subscriptions[i], this)
627
      }
628
      this.wrappedFunctions.clear()
×
629
      this.record.removeContext(this)
630
    }
631

×
632
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc