• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lad-tech / nsc-toolkit / 12828032748

17 Jan 2025 11:17AM UTC coverage: 80.576%. Remained the same
12828032748

push

github

web-flow
fix: Remove inbox from config old consumer (#140)

Co-authored-by: gleip <kuchinsn@lad24.ru>

357 of 474 branches covered (75.32%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

875 of 1055 relevant lines covered (82.94%)

4.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.83
/src/StreamManager.ts
1
import { StreamAction, StreamManagerParam, GetListenerOptions, GetBatchListenerOptions } from '.';
2
import { JetStreamManager, RetentionPolicy, StorageType, DiscardPolicy, Nanos, consumerOpts, Subscription } from 'nats';
4✔
3
import { Root } from './Root';
4✔
4
import { StreamBatchMsgFetcher } from './StreamBatchMsgFetcher';
4✔
5
import { isConsumerOptsBuilder } from 'nats/lib/jetstream/types';
4✔
6
import { StreamSingleMsgFetcher } from './StreamSingleMsgFetcher';
4✔
7

8
export class StreamManager extends Root {
4✔
9
  private readonly STAR_WILDCARD = '*';
8✔
10
  private readonly GREATER_WILDCARD = '>';
8✔
11
  private readonly TWO_WEEKS_IN_SECOND = 1209600;
8✔
12
  private readonly ONE_DAY_IN_SECOND = 86400;
8✔
13
  private readonly CONSUMER_NOT_FOUND = 'consumer not found';
8✔
14

15
  private readonly defaultStreamOption: Omit<Required<StreamAction>, 'action' | 'maxBytes'> &
8✔
16
    Pick<StreamAction, 'maxBytes'> = {
17
    storage: 'file',
18
    retentionPolicy: 'limits',
19
    discardPolicy: 'old',
20
    messageTTL: this.TWO_WEEKS_IN_SECOND,
21
    duplicateTrackingTime: this.ONE_DAY_IN_SECOND,
22
    replication: 1,
23
    rollUps: true,
24
  };
25

26
  private jsm?: JetStreamManager;
27

28
  constructor(private param: StreamManagerParam) {
8✔
29
    super(param.broker, param.outputFormatter);
8✔
30
  }
31

32
  static isPullConsumerOptions(
33
    setting?: GetListenerOptions | GetBatchListenerOptions,
34
  ): setting is GetBatchListenerOptions {
35
    return !!(setting as GetBatchListenerOptions)?.batch;
3✔
36
  }
37

38
  static isStreamFetcher(
39
    consumer?: StreamSingleMsgFetcher | StreamBatchMsgFetcher | Subscription,
40
  ): consumer is StreamBatchMsgFetcher {
41
    return !!(consumer as StreamBatchMsgFetcher)?.fetch;
4!
42
  }
43

44
  public async createStreams() {
45
    if (!this.jsm) {
6✔
46
      this.jsm = await this.param.broker.jetstreamManager();
6✔
47
    }
48

49
    for await (const { action, ...options } of this.param.options.actions) {
6✔
50
      const streamName = this.getStreamName(action);
6✔
51

52
      const config = {
6✔
53
        name: streamName,
54
        subjects: [`${this.param.serviceName}.${this.param.options.prefix}.${action}`],
55
        retention: (options.retentionPolicy || this.defaultStreamOption.retentionPolicy) as RetentionPolicy,
6!
56
        storage: (options.storage || this.defaultStreamOption.storage) as StorageType,
6!
57
        num_replicas: options.replication || this.defaultStreamOption.replication,
6!
58
        discard: (options.discardPolicy || this.defaultStreamOption.discardPolicy) as DiscardPolicy,
6!
59
        max_age: this.convertSecondsToNanoseconds(options.messageTTL || this.defaultStreamOption.messageTTL),
6!
60
        max_bytes: options.maxBytes,
61
        duplicate_window: this.convertSecondsToNanoseconds(
62
          options.duplicateTrackingTime || this.defaultStreamOption.duplicateTrackingTime,
6!
63
        ),
64
        allow_rollup_hdrs: options.rollUps || this.defaultStreamOption.rollUps,
6!
65
      };
66

67
      const existingStream = await this.jsm.streams.info(streamName).catch(error => {
6✔
68
        if (this.isNotFoundStreamError(error)) {
2✔
69
          return null;
2✔
70
        }
71
        throw error;
×
72
      });
73
      if (!existingStream) {
6✔
74
        await this.jsm.streams.add(config);
2✔
75
        continue;
2✔
76
      }
77

78
      await this.jsm.streams.update(streamName, { ...existingStream.config, ...config });
4✔
79
    }
80
  }
81

82
  public async createConsumer(
83
    serviceNameFrom: string,
84
    eventName: string,
85
    setting?: GetListenerOptions,
86
  ): Promise<StreamSingleMsgFetcher>;
87
  public async createConsumer(
88
    serviceNameFrom: string,
89
    eventName: string,
90
    setting?: GetBatchListenerOptions,
91
  ): Promise<StreamBatchMsgFetcher>;
92
  public async createConsumer(
93
    serviceNameFrom: string,
94
    eventName: string,
95
    setting?: GetListenerOptions | GetBatchListenerOptions,
96
  ): Promise<StreamSingleMsgFetcher | StreamBatchMsgFetcher> {
97
    const consumerName = this.capitalizeFirstLetter(serviceNameFrom) + this.capitalizeFirstLetter(eventName);
2✔
98
    const prefix = this.param.options.prefix;
2✔
99
    const subject = `${this.param.serviceName}.${prefix}.${eventName}.*`;
2✔
100

101
    if (!this.jsm) {
2✔
102
      this.jsm = await this.param.broker.jetstreamManager();
2✔
103
    }
104

105
    const options = consumerOpts();
2✔
106
    const isPullConsumer = StreamManager.isPullConsumerOptions(setting);
2✔
107

108
    options
2✔
109
      .durable(consumerName)
110
      .manualAck()
111
      .ackExplicit()
112
      .filterSubject(subject)
113
      .maxAckPending(setting?.maxPending || 10);
10✔
114

115
    if (isPullConsumer) {
2✔
116
      if (setting.maxPullRequestExpires) {
1!
117
        options.maxPullRequestExpires(setting.maxPullRequestExpires);
×
118
      }
119

120
      if (setting.maxPullRequestBatch) {
1!
121
        options.maxPullBatch(setting.maxPullRequestBatch);
×
122
      }
123
    }
124

125
    if (setting?.maxAckWaiting) {
2!
126
      options.ackWait(setting.maxAckWaiting);
×
127
    }
128

129
    if (setting?.queue) {
2!
130
      options.queue(setting.queue);
×
131
    }
132

133
    if (setting?.deliver) {
2!
134
      if (setting.deliver === 'new') {
×
135
        options.deliverNew();
×
136
      }
137
      if (setting.deliver === 'all') {
×
138
        options.deliverAll();
×
139
      }
140
    }
141

142
    const streamName = await this.jsm.streams.find(subject);
2✔
143
    if (!streamName) {
2!
144
      throw new Error(`Error creating consumer ${consumerName}. Stream for subject ${subject} not found`);
×
145
    }
146

147
    if (isConsumerOptsBuilder(options)) {
2✔
148
      const isConsumerExist = await this.jsm.consumers.info(streamName, consumerName).catch(async error => {
2✔
149
        if (error.message === this.CONSUMER_NOT_FOUND) {
×
150
          return false;
×
151
        }
152
        throw error;
×
153
      });
154
      if (!isConsumerExist) {
2!
155
        await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
2✔
156
      } else {
NEW
157
        await this.jsm.consumers.update(streamName, consumerName, { ...options.config, filter_subject: subject, deliver_subject: undefined });
×
158
      }
159
    }
160

161
    const consumer = await this.broker.jetstream().consumers.get(streamName, consumerName);
2✔
162

163
    return isPullConsumer
2✔
164
      ? new StreamBatchMsgFetcher(consumer, {
165
          batchSize: setting.maxPullRequestBatch,
166
          batchTimeout: setting.maxPullRequestExpires,
167
        })
168
      : new StreamSingleMsgFetcher(consumer);
169
  }
170

171
  private getStreamName(eventName: string) {
172
    const serviceName = this.capitalizeFirstLetter(this.param.serviceName);
6✔
173
    const prefix = this.buildPrefixForStreamName(this.param.options.prefix);
6✔
174

175
    let streamName = `${serviceName}${prefix}`;
6✔
176

177
    if (eventName !== this.STAR_WILDCARD && eventName !== this.GREATER_WILDCARD) {
6!
178
      streamName += this.capitalizeFirstLetter(eventName);
×
179
    }
180

181
    return streamName;
6✔
182
  }
183

184
  private isNotFoundStreamError(error: unknown) {
185
    const ERROR_TYPE = 'NatsError';
2✔
186
    const ERROR_NOT_FOUND_STREAM = 'stream not found';
2✔
187
    if (error instanceof Error) {
2✔
188
      return error.name === ERROR_TYPE && error.message === ERROR_NOT_FOUND_STREAM;
2✔
189
    }
190
    return false;
×
191
  }
192

193
  private buildPrefixForStreamName(prefix: string) {
194
    return prefix.split(this.SUBJECT_DELIMITER).map(this.capitalizeFirstLetter).join();
6✔
195
  }
196

197
  private capitalizeFirstLetter(word: string) {
198
    return word.charAt(0).toUpperCase() + word.slice(1);
16✔
199
  }
200

201
  private convertSecondsToNanoseconds(seconds: number): Nanos {
202
    return seconds * 1_000_000_000;
12✔
203
  }
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc