• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yoursunny / NDNts / 12866783525

20 Jan 2025 11:13AM UTC coverage: 94.605% (-0.006%) from 94.611%
12866783525

push

github

yoursunny
nfdmgmt: Prefix Announcement example

4441 of 4749 branches covered (93.51%)

15449 of 16330 relevant lines covered (94.61%)

6629.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.86
/pkg/psync/src/syncps/pubsub.ts
1
import { consume, ConsumerOptions, produce, type Producer, ProducerOptions } from "@ndn/endpoint";
1✔
2
import { Timestamp } from "@ndn/naming-convention2";
3
import { type Component, Data, digestSigning, Interest, lpm, type Name, type Signer, type Verifier } from "@ndn/packet";
4
import { type Subscriber, type Subscription, SubscriptionTable } from "@ndn/sync-api";
5
import { KeyMap, toHex, trackEventListener } from "@ndn/util";
6
import DefaultWeakMap from "mnemonist/default-weak-map.js";
7
import filter from "obliterator/filter.js";
8
import take from "obliterator/take.js";
9
import { TypedEventTarget } from "typescript-event-target";
10

11
import { IBLT } from "../iblt";
12
import { SyncpsCodec } from "./codec";
13

14
interface PublicationEntry {
15
  key: number;
16
  pub: Data;
17
  cb?: SyncpsPubsub.PublishCallback;
18

19
  own: boolean;
20
  expired: boolean;
21

22
  timers: Array<NodeJS.Timeout | number>;
23
}
24

25
interface SyncInterestInfo {
26
  interest: Interest;
27
  recvIblt: IBLT;
28
}
29

30
interface PendingInterest extends SyncInterestInfo {
31
  expire: NodeJS.Timeout | number;
32
  defer: PromiseWithResolvers<Data | undefined>;
33
}
34

35
interface DebugEntry {
36
  action: string;
37
  key?: number;
38
  name?: Name;
39
  ownIblt: IBLT;
40
  recvIblt?: IBLT;
41
  content?: Name[];
42
}
43

44
type EventMap = {
45
  debug: CustomEvent<DebugEntry>;
46
};
47

48
function defaultModifyPublication(pub: Data) {
5✔
49
  pub.name = pub.name.append(Timestamp, Date.now());
5✔
50
}
5✔
51

52
function safeExtractTimestamp(pub: Data): number {
18✔
53
  try {
18✔
54
    return pub.name.at(-1).as(Timestamp);
18✔
55
  } catch {
18!
56
    return 0;
×
57
  }
×
58
}
18✔
59

60
function defaultIsExpired(pub: Data) {
10✔
61
  return safeExtractTimestamp(pub);
10✔
62
}
10✔
63

64
function defaultFilterPubs(items: SyncpsPubsub.FilterPubItem[]) {
37✔
65
  if (!items.some((item) => item.own)) {
37✔
66
    return [];
26✔
67
  }
26✔
68

69
  const timestampMap = new DefaultWeakMap<Data, number>((pub) => safeExtractTimestamp(pub));
11✔
70
  items.sort((a, b) => {
11✔
71
    if (a.own !== b.own) {
4!
72
      return a.own ? -1 : 1;
×
73
    }
×
74
    return timestampMap.get(b.pub) - timestampMap.get(a.pub);
4✔
75
  });
11✔
76
  return items;
11✔
77
}
11✔
78

79
/**
80
 * syncps - pubsub service.
81
 * @deprecated Deprecated in favor of SVS-PS protocol.
82
 */
83
export class SyncpsPubsub extends TypedEventTarget<EventMap> implements Subscriber<Name, CustomEvent<Data>> {
1✔
84
  constructor({
3✔
85
    p,
3✔
86
    syncPrefix,
3✔
87
    describe = `SyncpsPubsub(${syncPrefix})`,
3✔
88
    cpOpts,
3✔
89
    syncInterestLifetime = 4000,
3✔
90
    syncDataPubSize = 1300,
3✔
91
    syncSigner = digestSigning,
3✔
92
    syncVerifier,
3✔
93
    maxPubLifetime = 1000,
3✔
94
    maxClockSkew = 1000,
3✔
95
    modifyPublication = defaultModifyPublication,
3✔
96
    isExpired = defaultIsExpired,
3✔
97
    filterPubs = defaultFilterPubs,
3✔
98
    pubSigner = digestSigning,
3✔
99
    pubVerifier,
3✔
100
  }: SyncpsPubsub.Options) {
3✔
101
    super();
3✔
102
    this.describe = describe;
3✔
103
    this.syncPrefix = syncPrefix;
3✔
104
    const ibltParams = IBLT.PreparedParameters.prepare(p.iblt);
3✔
105
    this.codec = new SyncpsCodec(p, ibltParams);
3✔
106

107
    this.iblt = new IBLT(ibltParams);
3✔
108
    this.maxPubLifetime = maxPubLifetime;
3✔
109
    this.maxClockSkew = maxClockSkew;
3✔
110
    this.dModify = modifyPublication;
3✔
111
    this.dIsExpired = isExpired;
3✔
112
    this.dSigner = pubSigner;
3✔
113
    this.dVerifier = pubVerifier;
3✔
114
    this.dConfirmIblt = new IBLT(ibltParams);
3✔
115

116
    this.pProducer = produce(syncPrefix, this.handleSyncInterest, {
3✔
117
      describe: `${this.describe}[p]`,
3✔
118
      routeCapture: false,
3✔
119
      concurrency: Infinity,
3✔
120
      ...ProducerOptions.exact(cpOpts),
3✔
121
      dataSigner: syncSigner,
3✔
122
    });
3✔
123
    this.pFilter = filterPubs;
3✔
124
    this.pPubSize = syncDataPubSize;
3✔
125

126
    this.cOpts = {
3✔
127
      describe: `${this.describe}[c]`,
3✔
128
      ...ConsumerOptions.exact(cpOpts),
3✔
129
      verifier: syncVerifier,
3✔
130
    };
3✔
131
    this.cLifetime = syncInterestLifetime;
3✔
132

133
    this.scheduleSyncInterest(0);
3✔
134
  }
3✔
135

136
  private readonly maybeHaveEventListener = trackEventListener(this);
3✔
137
  public readonly describe: string;
3✔
138
  private readonly syncPrefix: Name;
3✔
139
  private readonly codec: SyncpsCodec;
3✔
140
  private closed = false;
3✔
141

142
  private readonly iblt: IBLT;
3✔
143
  private readonly pubs = new Map<number, PublicationEntry>();
3✔
144
  private readonly maxPubLifetime: number;
3✔
145
  private readonly maxClockSkew: number;
3✔
146
  private readonly subs = new SubscriptionTable<CustomEvent<Data>>();
3✔
147

148
  private readonly dModify: SyncpsPubsub.ModifyPublicationCallback;
3✔
149
  private readonly dIsExpired: SyncpsPubsub.IsExpiredCallback;
3✔
150
  private readonly dSigner: Signer;
3✔
151
  private readonly dVerifier?: Verifier;
3✔
152
  private nOwnPubs = 0;
3✔
153
  /** IBLT of own publications with callback. */
154
  private readonly dConfirmIblt: IBLT;
3✔
155

156
  private readonly pProducer: Producer;
3✔
157
  private readonly pFilter: SyncpsPubsub.FilterPubsCallback;
3✔
158
  private readonly pPubSize: number;
3✔
159
  private readonly pPendings = new KeyMap<Component, PendingInterest, string>((c) => toHex(c.value));
3✔
160

161
  private readonly cOpts: ConsumerOptions;
3✔
162
  private readonly cLifetime: number;
3✔
163
  private cAbort?: AbortController;
3✔
164
  private cTimer!: NodeJS.Timeout | number;
3✔
165
  private cCurrentInterestNonce?: number;
3✔
166
  private cDelivering = false;
3✔
167

168
  private debug(action: string, key?: number, pub?: Data): void;
169
  private debug(action: string, recvIblt?: IBLT, content?: readonly Data[], contentFirst?: number): void;
170
  private debug(action: string, arg2?: number | IBLT, arg3?: Data | readonly Data[], contentFirst = 0): void {
3✔
171
    if (!this.maybeHaveEventListener.debug) {
106✔
172
      return;
106✔
173
    }
106!
174
    /* c8 ignore next */
175
    this.dispatchTypedEvent("debug", new CustomEvent<DebugEntry>("debug", {
176
      detail: {
×
177
        action,
×
178
        key: typeof arg2 === "number" ? arg2 : undefined,
106!
179
        name: arg3 instanceof Data ? arg3.name : undefined,
106!
180
        ownIblt: this.iblt.clone(),
106✔
181
        recvIblt: typeof arg2 === "object" ? arg2.clone() : undefined,
106!
182
        content: Array.isArray(arg3) ? (arg3 as readonly Data[]).slice(0, contentFirst).map(({ name }) => name) : undefined,
106!
183
      },
106✔
184
    }));
106✔
185
  }
106✔
186

187
  /** Stop the protocol operation. */
188
  public close(): void {
3✔
189
    if (this.closed) {
3!
190
      return;
×
191
    }
×
192
    this.closed = true;
3✔
193

194
    for (const pub of this.pubs.values()) {
3✔
195
      for (const timer of pub.timers) {
15✔
196
        clearTimeout(timer);
45✔
197
      }
45✔
198
    }
15✔
199

200
    this.pProducer.close();
3✔
201

202
    this.cAbort?.abort();
3✔
203
    this.cAbort = undefined;
3✔
204
    clearTimeout(this.cTimer);
3✔
205
  }
3✔
206

207
  /**
208
   * Publish a packet.
209
   * @param pub - Data packet. This does not need to be signed.
210
   * @param cb - Callback to get notified whether publication is confirmed,
211
   *             i.e. its hash appears in a sync Interest from another participant.
212
   * @returns - Promise that resolves when the publication is recorded.
213
   *            It does not mean the publication has reached other participants.
214
   */
215
  public async publish(pub: Data, cb?: SyncpsPubsub.PublishCallback): Promise<void> {
3✔
216
    if (this.closed) {
5!
217
      throw new Error("closed");
×
218
    }
×
219
    this.dModify(pub);
5✔
220
    await this.dSigner.sign(pub);
5✔
221

222
    const key = this.codec.hashPub(pub);
5✔
223
    if (this.pubs.has(key)) {
5!
224
      this.debug("d-dup", key, pub);
×
225
      return;
×
226
    }
×
227
    this.addToActive(key, pub, true, cb);
5✔
228
    this.debug("d-pub", key, pub);
5✔
229

230
    if (!this.cDelivering) {
5✔
231
      this.scheduleSyncInterest(0);
5✔
232
      this.processPendingInterests();
5✔
233
    }
5✔
234
  }
5✔
235

236
  /** Subscribe to a topic. */
237
  public subscribe(topic: Name): Subscription<Name, CustomEvent<Data>> {
3✔
238
    const { sub } = this.subs.subscribe(topic);
6✔
239
    return sub;
6✔
240
  }
6✔
241

242
  private handleSyncInterest = async (interest: Interest): Promise<Data | undefined> => {
3✔
243
    if (interest.name.length !== this.syncPrefix.length + 1) {
157!
244
      return undefined;
×
245
    }
×
246
    if (interest.nonce === this.cCurrentInterestNonce) {
157✔
247
      return undefined;
21✔
248
    }
21✔
249

250
    const ibltComp = interest.name.at(this.syncPrefix.length);
136✔
251
    if (this.pPendings.has(ibltComp)) {
157✔
252
      // same as a pending Interest; if it could be answered, it would have been answered
253
      return undefined;
101✔
254
    }
101✔
255

256
    const si: SyncInterestInfo = {
35✔
257
      interest,
35✔
258
      recvIblt: this.codec.comp2iblt(ibltComp),
35✔
259
    };
35✔
260
    const data = this.processSyncInterest(si);
35✔
261
    if (data) {
157✔
262
      return data;
9✔
263
    }
9✔
264

265
    this.debug("p-save", si.recvIblt);
26✔
266
    const pending = {
26✔
267
      ...si,
26✔
268
      expire: setTimeout(() => {
26✔
269
        if (this.pPendings.delete(ibltComp)) {
21✔
270
          pending.defer.resolve(undefined);
21✔
271
        }
21✔
272
      }, interest.lifetime),
26✔
273
      defer: Promise.withResolvers<Data | undefined>(),
26✔
274
    };
26✔
275
    this.pPendings.set(ibltComp, pending);
26✔
276
    return pending.defer.promise;
26✔
277
  };
157✔
278

279
  private processSyncInterest({ interest, recvIblt }: SyncInterestInfo): Data | undefined {
3✔
280
    {
37✔
281
      const { negative } = this.iblt.diff(this.dConfirmIblt, recvIblt);
37✔
282
      for (const key of negative) {
37✔
283
        const entry = this.pubs.get(key);
19✔
284
        if (entry?.own && !entry.expired) {
19!
285
          this.invokePublishCb(entry, true);
×
286
        }
×
287
      }
19✔
288
    }
37✔
289

290
    const { positive } = this.iblt.diff(recvIblt);
37✔
291
    const items: SyncpsPubsub.FilterPubItem[] = [];
37✔
292
    for (const key of positive) {
37✔
293
      const entry = this.pubs.get(key);
16✔
294
      if (entry && !entry.expired) {
16✔
295
        items.push(entry);
16✔
296
      }
16✔
297
    }
16✔
298

299
    const filtered = this.pFilter(items).map(({ pub }) => pub);
37✔
300
    if (filtered.length === 0) {
37✔
301
      return undefined;
26✔
302
    }
26✔
303

304
    const [content, includedCount] = this.codec.encodeContent(filtered, this.pPubSize);
11✔
305
    this.debug("p-satisfy", recvIblt, filtered, includedCount);
11✔
306
    return new Data(interest.name, Data.FreshnessPeriod(this.maxPubLifetime / 2), content);
11✔
307
  }
37✔
308

309
  private processPendingInterests(): void {
3✔
310
    for (const [ibltComp, pending] of this.pPendings) {
5✔
311
      const data = this.processSyncInterest(pending);
2✔
312
      if (!data) {
2!
313
        continue;
×
314
      }
×
315
      if (this.pPendings.delete(ibltComp)) {
2✔
316
        clearTimeout(pending.expire);
2✔
317
        pending.defer.resolve(data);
2✔
318
      }
2✔
319
    }
2✔
320
  }
5✔
321

322
  private scheduleSyncInterest(after = this.cLifetime - 20): void {
3✔
323
    clearTimeout(this.cTimer);
70✔
324
    this.cTimer = setTimeout(this.sendSyncInterest, after);
70✔
325
  }
70✔
326

327
  private sendSyncInterest = async (): Promise<void> => {
3✔
328
    if (this.closed) {
53!
329
      return;
×
330
    }
×
331
    this.cAbort?.abort();
53✔
332
    this.scheduleSyncInterest();
53✔
333

334
    const abort = new AbortController();
53✔
335
    this.cAbort = abort;
53✔
336
    const ibltComp = this.codec.iblt2comp(this.iblt);
53✔
337
    const name = this.syncPrefix.append(ibltComp);
53✔
338
    this.debug("c-request");
53✔
339
    this.cCurrentInterestNonce = Interest.generateNonce();
53✔
340
    const interest = new Interest(name, Interest.CanBePrefix, Interest.MustBeFresh,
53✔
341
      Interest.Nonce(this.cCurrentInterestNonce), Interest.Lifetime(this.cLifetime));
53✔
342

343
    let content: Data[];
53✔
344
    try {
53✔
345
      const data = await consume(interest, {
53✔
346
        ...this.cOpts,
53✔
347
        signal: abort.signal,
53✔
348
      });
53✔
349
      content = this.codec.decodeContent(data.content);
9✔
350
    } catch {
53✔
351
      if (this.cAbort !== abort) { // aborted
41✔
352
        return;
41✔
353
      }
41!
354
      this.debug("c-error");
×
355
      return;
×
356
    }
✔
357

358
    const prevOwnPubs = this.nOwnPubs;
9✔
359
    this.cDelivering = true;
9✔
360
    try {
9✔
361
      const verifyResults = await Promise.all(content.map(async (pub) => {
9✔
362
        try {
11✔
363
          await this.dVerifier?.verify(pub);
11!
364
        } catch {
11!
365
          return false;
×
366
        }
×
367
        return true;
11✔
368
      }));
9✔
369

370
      for (const [i, pub] of content.entries()) {
53✔
371
        if (!verifyResults[i]) {
11!
372
          this.debug("c-reject", undefined, pub);
×
373
          continue;
×
374
        }
×
375

376
        const key = this.codec.hashPub(pub);
11✔
377
        if (this.pubs.has(key) || this.isExpired(pub)) {
11✔
378
          this.debug("c-ignore", key, pub);
1✔
379
          continue;
1✔
380
        }
1✔
381

382
        this.addToActive(key, pub, false);
10✔
383
        const [sub] = take(filter(
10✔
384
          lpm(pub.name, (prefixHex) => this.subs.list(prefixHex)),
10✔
385
          (s) => s.size > 0), 1);
10✔
386
        if (sub) {
11✔
387
          this.debug("c-deliver", key, pub);
7✔
388
          this.subs.update(sub, new CustomEvent<Data>("update", { detail: pub }));
7✔
389
        } else {
11✔
390
          this.debug("c-nosub", key, pub);
3✔
391
        }
3✔
392
      }
11✔
393
    } finally {
9✔
394
      this.cDelivering = false;
9✔
395
    }
9✔
396

397
    if (this.cAbort === abort) { // this is the current Interest
9✔
398
      this.scheduleSyncInterest(0);
9✔
399
    }
9✔
400
    if (this.nOwnPubs !== prevOwnPubs) { // new publications during delivering
53!
401
      this.processPendingInterests();
×
402
    }
×
403
  };
53✔
404

405
  private isExpired(pub: Data): boolean {
3✔
406
    const res = this.dIsExpired(pub);
10✔
407
    if (typeof res === "boolean") {
10!
408
      return res;
×
409
    }
×
410
    const diff = Date.now() - res;
10✔
411
    return diff >= this.maxPubLifetime + this.maxClockSkew || diff <= -this.maxClockSkew;
10✔
412
  }
10✔
413

414
  private addToActive(key: number, pub: Data, own: boolean, cb?: SyncpsPubsub.PublishCallback) {
3✔
415
    if (this.closed) {
15!
416
      throw new Error("unexpected addToActive");
×
417
    }
×
418
    if (own) {
15✔
419
      ++this.nOwnPubs;
5✔
420
      if (cb) {
5!
421
        this.dConfirmIblt.insert(key);
×
422
      }
×
423
    }
5✔
424
    this.iblt.insert(key);
15✔
425

426
    const entry: PublicationEntry = {
15✔
427
      key,
15✔
428
      pub,
15✔
429
      cb,
15✔
430
      own,
15✔
431
      expired: false,
15✔
432
      timers: [
15✔
433
        setTimeout(() => {
15✔
434
          this.debug("d-expire", entry.key, entry.pub);
×
435
          entry.expired = true;
×
436
          this.invokePublishCb(entry, false);
×
437
        }, this.maxPubLifetime),
15✔
438
        setTimeout(() => {
15✔
439
          this.debug("d-unpublish", entry.key, entry.pub);
×
440
          this.iblt.erase(entry.key);
×
441
          this.scheduleSyncInterest(0);
×
442
        }, this.maxPubLifetime + this.maxClockSkew),
15✔
443
        setTimeout(() => {
15✔
444
          this.debug("d-forget", entry.key, entry.pub);
×
445
          this.pubs.delete(entry.key);
×
446
        }, this.maxPubLifetime * 2),
15✔
447
      ],
15✔
448
    };
15✔
449
    this.pubs.set(key, entry);
15✔
450
  }
15✔
451

452
  private invokePublishCb(entry: PublicationEntry, confirmed: boolean): void {
3✔
453
    if (!entry.cb) {
×
454
      return;
×
455
    }
×
456
    this.debug(confirmed ? "d-confirm" : "d-unconfirm", entry.key, entry.pub);
×
457
    entry.cb(entry.pub, confirmed);
×
458
    entry.cb = undefined;
×
459
    this.dConfirmIblt.erase(entry.key);
×
460
  }
×
461
}
3✔
462

463
export namespace SyncpsPubsub {
464
  export interface Parameters extends SyncpsCodec.Parameters {
465
    iblt: IBLT.Parameters;
466
  }
467

468
  export type ModifyPublicationCallback = (pub: Data) => void;
469

470
  /**
471
   * Callback to determine if a publication is expired.
472
   *
473
   * @remarks
474
   * The callback can return either:
475
   * - boolean to indicate whether the publication is expired.
476
   * - number, interpreted as Unix timestamp (milliseconds) of publication creation time.
477
   *   The publication is considered expired if this timestamp is before
478
   *   `NOW - (maxPubLifetime+maxClockSkew)` or after `NOW + maxClockSkew`.
479
   */
480
  export type IsExpiredCallback = (pub: Data) => boolean | number;
481

482
  export interface FilterPubItem {
483
    /** A publication, i.e. Data packet. */
484
    readonly pub: Data;
485

486
    /** Whether the publication is owned by the local participant. */
487
    readonly own: boolean;
488
  }
489

490
  /**
491
   * Callback to decide what publications to be included in a response.
492
   * @param items - Unexpired publications.
493
   * @returns A priority list of publications to be included in the response.
494
   */
495
  export type FilterPubsCallback = (items: FilterPubItem[]) => FilterPubItem[];
496

497
  export interface Options {
498
    /**
499
     * Algorithm parameters.
500
     *
501
     * @remarks
502
     * They must be the same on every peer.
503
     */
504
    p: Parameters;
505

506
    /** Sync group prefix. */
507
    syncPrefix: Name;
508

509
    /**
510
     * Description for debugging purpose.
511
     * @defaultValue SyncpsPubsub + syncPrefix
512
     */
513
    describe?: string;
514

515
    /**
516
     * Consumer and producer options.
517
     *
518
     * @remarks
519
     * - `.fw` may be specified.
520
     * - Most other fields are overridden.
521
     */
522
    cpOpts?: ConsumerOptions & ProducerOptions;
523

524
    /**
525
     * Sync Interest lifetime in milliseconds.
526
     * @defaultValue 4000
527
     */
528
    syncInterestLifetime?: number;
529

530
    /**
531
     * Advisory maximum size for publications included in a sync reply Data packet.
532
     * @defaultValue 1300
533
     */
534
    syncDataPubSize?: number;
535

536
    /**
537
     * Signer of sync reply Data packets.
538
     * @defaultValue digestSigning
539
     */
540
    syncSigner?: Signer;
541

542
    /**
543
     * Verifier of sync reply Data packets.
544
     * @defaultValue no verification
545
     */
546
    syncVerifier?: Verifier;
547

548
    /**
549
     * Publication lifetime.
550
     * @defaultValue 1000
551
     */
552
    maxPubLifetime?: number;
553

554
    /**
555
     * Maximum clock skew, for calculating timers.
556
     * @defaultValue 1000
557
     */
558
    maxClockSkew?: number;
559

560
    /**
561
     * Callback to modify publication before it's signed.
562
     * @defaultValue appending a TimestampNameComponent to the name
563
     */
564
    modifyPublication?: ModifyPublicationCallback;
565

566
    /**
567
     * Callback to determine if a publication is expired.
568
     *
569
     * @defaultValue
570
     * The last component is interpreted as TimestampNameComponent.
571
     * If it is not a TimestampNameComponent, the publication is seen as expired.
572
     */
573
    isExpired?: IsExpiredCallback;
574

575
    /**
576
     * Callback to decide what publications to be included in a response.
577
     *
578
     * @defaultValue
579
     * - Respond nothing if there's no own publication.
580
     * - Otherwise, prioritize own publications over others, and prioritize later timestamp.
581
     */
582
    filterPubs?: FilterPubsCallback;
583

584
    /**
585
     * Signer of publications.
586
     * @defaultValue digestSigning
587
     */
588
    pubSigner?: Signer;
589

590
    /**
591
     * Verifier of publications.
592
     * @defaultValue no verification
593
     */
594
    pubVerifier?: Verifier;
595
  }
596

597
  export type PublishCallback = (pub: Data, confirmed: boolean) => void;
598
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc