• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yoursunny / NDNts / 20717763796

05 Jan 2026 02:02PM UTC coverage: 87.738% (+0.1%) from 87.623%
20717763796

push

github

yoursunny
mk: upgrade to xo-config 0.1002.0

2454 of 2992 branches covered (82.02%)

6683 of 7617 relevant lines covered (87.74%)

7062.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.77
/pkg/svs/src/sync.ts
1
import { Forwarder, type FwFace, FwPacket } from "@ndn/fw";
2
import { Data, Interest, Name, type NameLike, noopSigning, nullSigner, type Signer, TT as l3TT, type Verifier } from "@ndn/packet";
3
import { type SyncNode, type SyncProtocol, SyncUpdate } from "@ndn/sync-api";
4
import { Decoder, Encoder } from "@ndn/tlv";
5
import { assert, pushable, randomJitter, trackEventListener } from "@ndn/util";
6
import { consume, tap } from "streaming-iterables";
7
import type { Promisable } from "type-fest";
8
import { TypedEventTarget } from "typescript-event-target";
9

10
import { TT, Version2, Version3 } from "./an";
11
import { IDImpl, StateVector } from "./state-vector";
12

13
interface DebugEntry {
14
  action: string;
15
  own: Record<string, number>;
16
  recv?: Record<string, number>;
17
  state: string;
18
  nextState?: string;
19
  ourOlder?: number;
20
  ourNewer?: number;
21
}
22

23
type EventMap = SyncProtocol.EventMap<SvSync.ID> & {
24
  debug: CustomEvent<DebugEntry>;
25
  rxerror: CustomEvent<[interest: Interest, e: unknown]>;
26
};
27

28
/** StateVectorSync participant. */
29
export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<SvSync.ID> {
30
  public static async create({
31
    syncPrefix,
32
    fw = Forwarder.getDefault(),
21✔
33
    describe = `SvSync(${syncPrefix})`,
21✔
34
    initialStateVector = new StateVector(),
21✔
35
    initialize,
36
    syncInterestLifetime = 1000,
21✔
37
    periodicTimeout = [30000, 0.1],
21✔
38
    suppressionPeriod = 2200,
21✔
39
    suppressionTimeout = SvSync.suppressionExpDelay(suppressionPeriod),
21✔
40
    signer = nullSigner,
21✔
41
    verifier = noopSigning,
21✔
42
    svs3 = false,
21✔
43
  }: SvSync.Options): Promise<SvSync> {
44
    if (typeof periodicTimeout === "number") {
21!
45
      periodicTimeout = [periodicTimeout, 0.1];
×
46
    }
47

48
    const sync = new SvSync(
21✔
49
      svs3,
50
      syncPrefix,
51
      describe,
52
      initialStateVector,
53
      syncInterestLifetime,
54
      randomJitter(periodicTimeout[1], periodicTimeout[0]),
55
      suppressionTimeout,
56
      suppressionPeriod,
57
      signer,
58
      verifier,
59
    );
60
    await initialize?.(sync);
21✔
61
    sync.makeFace(fw);
21✔
62
    return sync;
21✔
63
  }
64

65
  private constructor(
66
      private readonly svs3: boolean,
21✔
67
      public readonly syncPrefix: Name,
21✔
68
      public readonly describe: string,
21✔
69
      private readonly own: StateVector,
21✔
70
      private readonly syncInterestLifetime: number,
21✔
71
      private readonly steadyTimer: () => number,
21✔
72
      private readonly suppressionTimer: () => number,
21✔
73
      private readonly suppressionPeriod: number,
21✔
74
      private readonly signer: Signer,
21✔
75
      private readonly verifier: Verifier,
21✔
76
  ) {
77
    super();
21✔
78
    this.syncInterestName = syncPrefix.append(svs3 ? Version3 : Version2);
21✔
79
  }
80

81
  private makeFace(fw: Forwarder): void {
82
    this.face = fw.addFace({
21✔
83
      rx: this.txStream,
84
      tx: (iterable) => consume(tap(this.handleRxPacket, iterable)),
21✔
85
    }, {
86
      describe: this.describe,
87
      routeCapture: false,
88
    });
89
    this.face.addRoute(this.syncInterestName, this.syncPrefix);
21✔
90
  }
91

92
  private readonly maybeHaveEventListener = trackEventListener(this);
21✔
93
  private face?: FwFace;
94
  private readonly syncInterestName: Name;
95
  private txStream = pushable<FwPacket>();
21✔
96

97
  /**
98
   * In steady state, undefined.
99
   * In suppression state, aggregated state vector of incoming sync Interests.
100
   */
101
  private aggregated?: StateVector;
102

103
  /** Sync Interest timer. */
104
  private timer!: NodeJS.Timeout | number;
105

106
  private debug(action: string, entry: Partial<DebugEntry> = {}, recv?: StateVector): void {
239✔
107
    if (!this.maybeHaveEventListener.debug) {
239✔
108
      return;
182✔
109
    }
110
    /* v8 ignore next */
111
    this.dispatchTypedEvent("debug", new CustomEvent<DebugEntry>("debug", {
57✔
112
      detail: {
113
        action,
114
        own: this.own.toJSON(),
115
        recv: recv?.toJSON(),
116
        state: this.aggregated ? "suppression" : "steady",
57✔
117
        ...entry,
118
      },
119
    }));
120
  }
121

122
  /** Cease operations. */
123
  public close(): void {
124
    clearTimeout(this.timer);
19✔
125
    this.face?.close();
19✔
126
  }
127

128
  /**
129
   * Retrieve or create sync node by name.
130
   *
131
   * For SVS v2, retrieve or create sync node with specified name.
132
   *
133
   * For SVS v3, retrieve sync node with specified name and most recent bootstrap time.
134
   * If no sync node with this name exists, create sync node with current time as bootstrap time.
135
   */
136
  public get(name: NameLike): SyncNode<SvSync.ID>;
137

138
  /** Retrieve or create sync node by name and bootstrap time (SVS v3). */
139
  public get(id: { name: NameLike; boot: number }): SyncNode<SvSync.ID>;
140

141
  /** Retrieve or create sync node by name and bootstrap time (SVS v3). */
142
  public get(name: NameLike, boot: number): SyncNode<SvSync.ID>;
143

144
  public get(arg1: NameLike | { name: NameLike; boot: number }, boot = -1) {
137✔
145
    let id: IDImpl;
146
    if (arg1 instanceof IDImpl) {
137✔
147
      id = arg1;
55✔
148
    } else {
149
      let name: NameLike;
150
      if (Name.isNameLike(arg1)) {
82✔
151
        name = arg1;
80✔
152
      } else {
153
        ({ name, boot } = arg1);
2✔
154
      }
155
      name = Name.from(name);
82✔
156

157
      if (!this.svs3) {
82✔
158
        id = new IDImpl(name);
40✔
159
      } else if (boot === -1) {
42✔
160
        id = this.findByName(name) as IDImpl | undefined ?? new IDImpl(name, SvSync.makeBootstrapTime());
20✔
161
      } else {
162
        id = new IDImpl(name, boot);
22✔
163
      }
164
    }
165
    return new SvSyncNode(id, this.nodeOp);
137✔
166
  }
167

168
  /**
169
   * Retrieve or create sync node by name.
170
   *
171
   * For SVS v2, same as `get(name)`.
172
   *
173
   * For SVS v3, create sync node with specified name and current bootstrap time.
174
   * Note the different between `get(name)` and `add(name)`:
175
   * - `get(name)` searches for existing sync nodes with specified name first.
176
   * - `add(name)` almost always creates a new sync node.
177
   */
178
  public add(name: NameLike): SyncNode<SvSync.ID>;
179

180
  /** Same as `get(id)` (SVS v3). */
181
  public add(id: { name: NameLike; boot: number }): SyncNode<SvSync.ID>;
182

183
  /** Same as `get(name, boot)` (SVS v3). */
184
  public add(name: NameLike, boot: number): SyncNode<SvSync.ID>;
185

186
  public add(arg1: any, boot = SvSync.makeBootstrapTime()): SyncNode<SvSync.ID> {
17✔
187
    return this.get(arg1, boot);
17✔
188
  }
189

190
  private findByName(name: Name): StateVector.ID | undefined {
191
    let best: StateVector.ID | undefined;
192
    for (const [id] of this.own) {
20✔
193
      if (!id.name.equals(name)) {
60✔
194
        continue;
40✔
195
      }
196
      if (!best || id.boot > best.boot) {
20!
197
        best = id;
20✔
198
      }
199
    }
200
    return best;
20✔
201
  }
202

203
  /**
204
   * Obtain a copy of own state vector.
205
   *
206
   * @remarks
207
   * This may be used as {@link SvSync.Options.initialStateVector} to re-create an SvSync instance.
208
   */
209
  public get currentStateVector(): StateVector {
210
    return new StateVector(this.own);
1✔
211
  }
212

213
  /**
214
   * Multi-purpose callback passed to {@link SvSyncNode} constructor.
215
   *
216
   * @remarks
217
   * - `nodeOp(id)`: get seqNum
218
   * - `nodeOp(id, n)`: set seqNum, return new seqNum
219
   * - `nodeOp(id, 0)`: delete node during initialization
220
   */
221
  private readonly nodeOp = (id: Name, n: number | undefined): number => {
124✔
222
    if (n !== undefined) { // setSeqNum requested
124✔
223
      if (!this.face) { // decrement/remove permitted during initialization
39✔
224
        this.own.set(id, n);
4✔
225
      } else if (n > this.own.get(id)) { // increment only after initialization
35✔
226
        this.own.set(id, n);
33✔
227
        this.debug("publish");
33✔
228
        this.resetTimer(true);
33✔
229
      }
230
    }
231
    return this.own.get(id);
124✔
232
  };
233

234
  private readonly handleRxPacket = async (pkt: FwPacket): Promise<void> => {
150✔
235
    if (!(FwPacket.isEncodable(pkt) && pkt.l3 instanceof Interest)) {
150✔
236
      return;
50✔
237
    }
238

239
    const interest = pkt.l3;
100✔
240
    let recv: StateVector;
241
    try {
100✔
242
      [recv] = await this.parseSyncInterest(interest);
100✔
243
    } catch (err: unknown) {
244
      this.dispatchTypedEvent("rxerror", new CustomEvent<[interest: Interest, e: unknown]>("rxerror", {
×
245
        detail: [interest, err],
246
      }));
247
      return;
×
248
    }
249

250
    const { maxBoot } = recv;
100✔
251
    if (maxBoot > SvSync.makeBootstrapTime() + 86400) {
100✔
252
      this.debug("rx-future", {}, recv);
2✔
253
      return;
2✔
254
    }
255

256
    await this.handleRecv(recv);
98✔
257
  };
258

259
  /**
260
   * Parse and verify incoming sync Interest.
261
   * @param interest - Received Interest.
262
   */
263
  private async parseSyncInterest(interest: Interest): Promise<[recv: StateVector, piggyback: Uint8Array]> {
264
    assert(interest.appParameters);
100✔
265
    const d0 = new Decoder(interest.appParameters);
100✔
266
    const { type, decoder: d1, after } = d0.read();
100✔
267
    let recv: StateVector;
268
    switch (type) {
100!
269
      case TT.StateVector: { // SVS v2
270
        await this.verifier.verify(interest);
78✔
271
        recv = d1.decode(StateVector);
78✔
272
        break;
78✔
273
      }
274
      case l3TT.Data: { // SVS v3
275
        const data = d1.decode(Data);
22✔
276
        assert(data.name.equals(this.syncInterestName));
22✔
277
        await this.verifier.verify(data);
22✔
278
        recv = Decoder.decode(data.content, StateVector);
22✔
279
        break;
22✔
280
      }
281
      default: {
282
        throw new Error("cannot find StateVector in Interest");
×
283
      }
284
    }
285
    return [recv, after];
100✔
286
  }
287

288
  /**
289
   * Handle incoming state vector.
290
   * @param recv - Received StateVector.
291
   */
292
  private async handleRecv(recv: StateVector): Promise<void> {
293
    const ourOlder = this.own.listOlderThan(recv);
98✔
294
    const ourNewer = recv.listOlderThan(this.own);
98✔
295
    this.debug("recv", {
98✔
296
      nextState: (!this.aggregated && ourNewer.length > 0) ? "suppression" : undefined,
294✔
297
      ourOlder: ourOlder.length,
298
      ourNewer: ourNewer.length,
299
    }, recv);
300
    this.own.mergeFrom(recv);
98✔
301

302
    for (const { id, loSeqNum, hiSeqNum } of ourOlder) {
98✔
303
      this.dispatchTypedEvent("update", new SyncUpdate(this.get(id), loSeqNum, hiSeqNum));
54✔
304
    }
305

306
    if (this.aggregated) { // in suppression state
98!
307
      this.aggregated.mergeFrom(recv);
×
308
      return;
×
309
    }
310

311
    // in steady state
312
    if (this.shouldEnterSuppression(ourNewer)) {
98✔
313
      // entering suppression state
314
      this.aggregated = recv;
4✔
315
    }
316
    this.resetTimer();
98✔
317
  }
318

319
  private shouldEnterSuppression(ourNewer: readonly StateVector.DiffEntry[]): boolean {
320
    const ignoreUpdatedAfter = performance.now() - this.suppressionPeriod;
98✔
321
    return ourNewer.some(({ id }) => this.own.getEntry(id).lastUpdate <= ignoreUpdatedAfter);
98✔
322
  }
323

324
  private resetTimer(immediate = false): void {
184✔
325
    clearTimeout(this.timer);
184✔
326
    const delay = immediate ? 0 : this.aggregated ? this.suppressionTimer() : this.steadyTimer();
184✔
327
    this.timer = setTimeout(this.handleTimer, delay);
184✔
328
  }
329

330
  private readonly handleTimer = () => {
53✔
331
    if (this.aggregated) { // in suppression state, exiting to steady state
53✔
332
      const ourNewer = this.aggregated.listOlderThan(this.own);
4✔
333
      this.debug("timer", {
4✔
334
        nextState: "steady",
335
        ourNewer: ourNewer.length,
336
      });
337
      if (ourNewer.length > 0) {
4!
338
        void this.sendSyncInterest();
4✔
339
      }
340
      this.aggregated = undefined;
4✔
341
    } else { // in steady state
342
      this.debug("timer");
49✔
343
      void this.sendSyncInterest();
49✔
344
    }
345

346
    this.resetTimer();
53✔
347
  };
348

349
  /** Transmit a sync Interest. */
350
  private async sendSyncInterest(): Promise<void> {
351
    const interest = new Interest();
53✔
352
    interest.name = this.syncInterestName;
53✔
353
    interest.canBePrefix = true;
53✔
354
    interest.mustBeFresh = true;
53✔
355
    interest.lifetime = this.syncInterestLifetime;
53✔
356

357
    if (this.svs3) {
53✔
358
      const encoder = new Encoder();
14✔
359
      this.own.encodeTo(encoder, 3);
14✔
360

361
      const data = new Data();
14✔
362
      data.name = this.syncInterestName;
14✔
363
      data.content = encoder.output;
14✔
364
      await this.signer.sign(data);
14✔
365

366
      interest.appParameters = Encoder.encode(data);
14✔
367
      await interest.updateParamsDigest();
14✔
368
    } else {
369
      interest.appParameters = Encoder.encode(this.own);
39✔
370
      await this.signer.sign(interest);
39✔
371
    }
372

373
    this.debug("send");
53✔
374
    this.txStream.push(FwPacket.create(interest));
53✔
375
  }
376
}
3✔
377

378
export namespace SvSync {
3!
379
  /** {@link SvSync.create} options. */
380
  export interface Options {
381
    /** Sync group prefix. */
382
    syncPrefix: Name;
383

384
    /**
385
     * Use the specified logical forwarder.
386
     * @defaultValue `Forwarder.getDefault()`
387
     */
388
    fw?: Forwarder;
389

390
    /** Description for debugging purpose. */
391
    describe?: string;
392

393
    /**
394
     * Initial state vector.
395
     * @defaultValue empty state vector
396
     */
397
    initialStateVector?: StateVector;
398

399
    /**
400
     * Application initialization function.
401
     *
402
     * @remarks
403
     * During initialization, it's possible to remove SyncNode or decrease seqNum.
404
     * Calling `sync.close()` has no effect.
405
     *
406
     * Sync protocol starts running after the returned Promise is resolved.
407
     */
408
    initialize?: (sync: SvSync) => Promisable<void>;
409

410
    /**
411
     * Sync Interest lifetime in milliseconds.
412
     * @defaultValue 1000
413
     */
414
    syncInterestLifetime?: number;
415

416
    /**
417
     * Sync Interest timer in steady state.
418
     * @defaultValue `[30000ms, ±10%]`
419
     * @remarks
420
     * If specified as tuple,
421
     * - median: median interval in milliseconds.
422
     * - jitter: ± percentage, in [0.0, 1.0) range.
423
     *
424
     * If specified as number, it's interpreted as median.
425
     */
426
    periodicTimeout?: number | [median: number, jitter: number];
427

428
    /**
429
     * Sync Interest timer in suppression state, maximum value.
430
     * @defaultValue `200ms`
431
     */
432
    suppressionPeriod?: number;
433

434
    /**
435
     * Sync Interest timer in suppression state, value generator.
436
     * @defaultValue `SvSync.suppressionExpDelay(suppressionPeriod)`
437
     * @remarks
438
     * The maximum value returned by the generator function should be `suppressionPeriod`.
439
     */
440
    suppressionTimeout?: () => number;
441

442
    /**
443
     * Sync Interest signer (SVS v2).
444
     * State Vector Data signer (SVS v3).
445
     * @defaultValue nullSigner
446
     */
447
    signer?: Signer;
448

449
    /**
450
     * Sync Interest verifier (SVS v2).
451
     * State Vector Data verifier (SVS v3).
452
     * @defaultValue no verification
453
     */
454
    verifier?: Verifier;
455

456
    /**
457
     * Enable SVS v3 protocol.
458
     * @defaultValue false
459
     */
460
    svs3?: boolean;
461
  }
462

463
  /**
464
   * SVS v2 suppression timeout exponential decay function.
465
   * @param c - Constant factor.
466
   * @param f - Decay factor.
467
   * @returns Function to generate suppression timeout values.
468
   */
469
  export function suppressionExpDelay(c: number, f = 10): () => number {
3✔
470
    const cf = c / f;
21✔
471
    return () => {
21✔
472
      const v = Math.random() * c;
4✔
473
      return -c * Math.expm1((v - c) / cf);
4✔
474
    };
475
  }
476

477
  /** Make SVS v3 bootstrap time based on current timestamp. */
478
  export function makeBootstrapTime(now = Date.now()): number {
3✔
479
    return Math.trunc(now / 1000);
118✔
480
  }
481

482
  /**
483
   * Sync node ID.
484
   *
485
   * For SVS v2, this should be accessed as `Name`.
486
   * Accessing the object fields would give `{ name, boot: -1 }`.
487
   *
488
   * For SVS v3, this should be accessed as `{ name, boot }` object.
489
   * Accessing as `Name` would return the name only.
490
   *
491
   * Note: the `Name` variant will be deleted when SVS v2 support is dropped.
492
   */
493
  export type ID = Name & StateVector.ID;
494
}
495

496
class SvSyncNode implements SyncNode<SvSync.ID> {
497
  constructor(
498
      public readonly id: SvSync.ID,
137✔
499
      private readonly op: (id: Name, n: number | undefined) => number,
137✔
500
  ) {}
501

502
  public get seqNum(): number {
503
    return this.op(this.id, undefined);
85✔
504
  }
505

506
  public set seqNum(n: number) {
507
    this.op(this.id, n);
37✔
508
  }
509

510
  public remove(): void {
511
    this.op(this.id, 0);
2✔
512
  }
513
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc