• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

yoursunny / NDNts / 20211874349

14 Dec 2025 05:53PM UTC coverage: 87.623% (-7.9%) from 95.518%
20211874349

push

github

yoursunny
mk: upgrade to Vitest 4.x

2454 of 2992 branches covered (82.02%)

6683 of 7627 relevant lines covered (87.62%)

7034.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.39
/pkg/psync/src/partial-subscriber.ts
1
import type { ConsumerOptions } from "@ndn/endpoint";
2
import type { Component, Name, Verifier } from "@ndn/packet";
3
import { type Subscriber, type Subscription, SubscriptionTable, SyncUpdate } from "@ndn/sync-api";
4
import { randomJitter } from "@ndn/util";
5
import { BloomFilter, type Parameters as BloomParameters } from "@yoursunny/psync-bloom";
6
import { TypedEventTarget } from "typescript-event-target";
7

8
import { PSyncCodec } from "./codec";
9
import type { PSyncCore } from "./core";
10
import { IBLT } from "./iblt";
11
import { StateFetcher } from "./state-fetcher";
12

13
type Sub = Subscription<Name, SyncUpdate<Name>>;
14
type Update = SyncUpdate<Name>;
15

16
interface DebugEntry {
17
  action: string;
18
}
19

20
type EventMap = {
21
  /** Emitted for debugging. */
22
  debug: CustomEvent<DebugEntry>;
23
  state: PartialSubscriber.StateEvent;
24
};
25

26
/** PSync - PartialSync subscriber. */
27
export class PartialSubscriber extends TypedEventTarget<EventMap>
28
  implements Subscriber<Name, Update, PartialSubscriber.TopicInfo> {
29
  constructor({
30
    p,
31
    syncPrefix,
32
    describe = `PartialSubscriber(${syncPrefix})`,
1✔
33
    cOpts,
34
    syncInterestLifetime = 1000,
1✔
35
    syncInterestInterval = [syncInterestLifetime / 2 + 100, syncInterestLifetime / 2 + 500],
1✔
36
    verifier,
37
  }: PartialSubscriber.Options) {
38
    super();
1✔
39
    this.describe = describe;
1✔
40
    this.helloPrefix = syncPrefix.append("hello");
1✔
41
    this.syncPrefix = syncPrefix.append("sync");
1✔
42
    this.codec = new PSyncCodec(p, IBLT.PreparedParameters.prepare(p.iblt));
1✔
43
    this.encodeBloom = p.encodeBloom;
1✔
44

45
    this.subs.handleRemoveTopic = this.handleRemoveTopic;
1✔
46

47
    this.cFetcher = new StateFetcher(this.describe, this.codec, syncInterestLifetime, {
1✔
48
      ...cOpts,
49
      verifier,
50
    });
51
    this.cInterval = randomJitter.between(...syncInterestInterval);
1✔
52

53
    void (async () => {
1✔
54
      this.bloom = await BloomFilter.create(p.bloom);
1✔
55
      this.scheduleInterest(0);
1✔
56
    })();
57
  }
58

59
  public readonly describe: string;
60
  private readonly helloPrefix: Name;
61
  private readonly syncPrefix: Name;
62
  private readonly codec: PSyncCodec;
63
  private readonly encodeBloom: PartialSubscriber.Parameters["encodeBloom"];
64
  private closed = false;
1✔
65

66
  private readonly subs = new SubscriptionTable<Update>();
1✔
67
  // eslint-disable-next-line @typescript-eslint/no-restricted-types
68
  private readonly prevSeqNums = new WeakMap<object, number>();
1✔
69
  private bloom!: BloomFilter;
70
  private ibltComp?: Component;
71

72
  private readonly cFetcher: StateFetcher;
73
  private readonly cInterval: () => number;
74
  private cTimer!: NodeJS.Timeout | number;
75
  private cAbort?: AbortController;
76

77
  private debug(action: string): void {
78
    this.dispatchTypedEvent("debug", new CustomEvent<DebugEntry>("debug", {
23✔
79
      detail: { action },
80
    }));
81
  }
82

83
  /** Stop the protocol operation. */
84
  public close(): void {
85
    if (this.closed) {
1!
86
      return;
×
87
    }
88
    this.closed = true;
1✔
89

90
    this.cAbort?.abort();
1✔
91
    this.cAbort = undefined;
1✔
92
    clearTimeout(this.cTimer);
1✔
93
  }
94

95
  public subscribe(topic: PartialSubscriber.TopicInfo): Sub {
96
    const { sub, objKey } = this.subs.subscribe(topic.prefix);
2✔
97
    if (objKey) {
2!
98
      this.prevSeqNums.set(objKey, topic.seqNum);
2✔
99
      this.bloom.insert(this.codec.toBloomKey(topic.prefix));
2✔
100
    }
101
    return sub;
2✔
102
  }
103

104
  // eslint-disable-next-line @typescript-eslint/no-restricted-types
105
  private readonly handleRemoveTopic = (topic: Name, objKey: object): void => {
1✔
106
    void topic;
1✔
107
    if (!this.prevSeqNums.delete(objKey)) {
1!
108
      return;
×
109
    }
110

111
    this.bloom.clear();
1✔
112
    for (const [prefix, set] of this.subs.associations()) {
1✔
113
      if (!this.prevSeqNums.has(set)) {
1!
114
        continue;
×
115
      }
116
      this.bloom.insert(this.codec.toBloomKey(prefix));
1✔
117
    }
118
  };
119

120
  private scheduleInterest(after = this.cInterval()) {
13✔
121
    clearTimeout(this.cTimer);
13✔
122
    this.cTimer = setTimeout(this.sendInterest, after);
13✔
123
  }
124

125
  private readonly sendInterest = async (): Promise<void> => {
12✔
126
    if (this.closed) {
12!
127
      return;
×
128
    }
129
    this.cAbort?.abort();
12✔
130
    this.scheduleInterest();
12✔
131

132
    const abort = new AbortController();
12✔
133
    this.cAbort = abort;
12✔
134

135
    if (this.ibltComp) {
12✔
136
      return this.sendSyncInterest(abort);
11✔
137
    }
138
    return this.sendHelloInterest(abort);
1✔
139
  };
140

141
  private async sendHelloInterest(abort: AbortController): Promise<void> {
142
    this.debug("h-request");
1✔
143

144
    let state: PSyncCore.State;
145
    try {
1✔
146
      const { state: rState, versioned } = await this.cFetcher.fetch(this.helloPrefix, abort, "h");
1✔
147
      state = rState;
1✔
148
      this.ibltComp = versioned.at(-2);
1✔
149
    } catch {
150
      if (this.cAbort !== abort) { // aborted
×
151
        return;
×
152
      }
153
      this.debug("h-error");
×
154
      return;
×
155
    }
156

157
    this.debug("h-response");
1✔
158
    this.handleState(state);
1✔
159
    this.dispatchTypedEvent("state", new PartialSubscriber.StateEvent("state", state));
1✔
160
  }
161

162
  private async sendSyncInterest(abort: AbortController): Promise<void> {
163
    const name = this.syncPrefix.append(...this.encodeBloom(this.bloom), this.ibltComp!);
11✔
164
    this.debug("s-request");
11✔
165

166
    let state: PSyncCore.State;
167
    try {
11✔
168
      const { state: rState, versioned } = await this.cFetcher.fetch(name, abort, "s");
11✔
169
      // TODO test ContentType=Nack explicitly
170
      if (rState.length === 0) {
5!
171
        this.ibltComp = undefined;
×
172
        return this.scheduleInterest(0);
×
173
      }
174
      state = rState;
5✔
175
      this.ibltComp = versioned.at(-2);
5✔
176
    } catch {
177
      if (this.cAbort !== abort) { // aborted
5!
178
        return;
×
179
      }
180
      this.debug("s-error");
5✔
181
      return;
5✔
182
    }
183

184
    this.debug("s-response");
5✔
185
    this.handleState(state);
5✔
186
  }
187

188
  private handleState(state: PSyncCore.State): void {
189
    for (const { prefix, seqNum } of state) {
6✔
190
      const set = this.subs.list(prefix);
9✔
191
      if (set.size === 0) {
9✔
192
        continue;
5✔
193
      }
194

195
      const prevSeqNum = this.prevSeqNums.get(set)!;
4✔
196
      if (seqNum <= prevSeqNum) {
4!
197
        continue;
×
198
      }
199
      this.prevSeqNums.set(set, seqNum);
4✔
200

201
      this.subs.update(set, new SyncUpdate({
4✔
202
        id: prefix,
203
        seqNum,
204
        remove: () => undefined,
×
205
      }, prevSeqNum + 1, seqNum));
206
    }
207
  }
208
}
3✔
209

210
export namespace PartialSubscriber {
3!
211
  /** Algorithm parameters. */
212
  export interface Parameters extends PSyncCore.Parameters, PSyncCodec.Parameters {
213
    bloom: BloomParameters;
214
  }
215

216
  /** {@link PartialSubscriber} constructor options. */
217
  export interface Options {
218
    /**
219
     * Algorithm parameters.
220
     *
221
     * @remarks
222
     * They must match the publisher parameters.
223
     */
224
    p: Parameters;
225

226
    /** Sync producer prefix. */
227
    syncPrefix: Name;
228

229
    /**
230
     * Description for debugging purpose.
231
     * @defaultValue PartialSubscriber + syncPrefix
232
     */
233
    describe?: string;
234

235
    /**
236
     * Consumer options.
237
     *
238
     * @remarks
239
     * - `.describe` is overridden as {@link Options.describe}.
240
     * - `.modifyInterest` is overridden.
241
     * - `.retx` is overridden.
242
     * - `.signal` is overridden.
243
     * - `.verifier` is overridden.
244
     */
245
    cOpts?: ConsumerOptions;
246

247
    /**
248
     * Sync Interest lifetime in milliseconds.
249
     * @defaultValue 1000
250
     */
251
    syncInterestLifetime?: number;
252

253
    /**
254
     * Interval between sync Interests, randomized within the range, in milliseconds.
255
     * @defaultValue `[syncInterestLifetime/2+100,syncInterestLifetime/2+500]`
256
     */
257
    syncInterestInterval?: [min: number, max: number];
258

259
    /**
260
     * Verifier of sync reply Data packets.
261
     * @defaultValue no verification
262
     */
263
    verifier?: Verifier;
264
  }
265

266
  export interface TopicInfo extends PSyncCore.PrefixSeqNum {}
267

268
  export class StateEvent extends Event {
3✔
269
    constructor(
270
        type: string,
271
        public readonly topics: readonly TopicInfo[],
1✔
272
    ) {
273
      super(type);
1✔
274
    }
275
  }
276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc