• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.27
/obj/src/objectstore.ts
1
/*
2
 * Copyright 2022-2025 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import type {
17
  MsgHdrs,
18
  NatsConnection,
19
  QueuedIterator,
20
} from "@nats-io/nats-core/internal";
21
import {
1✔
22
  DataBuffer,
1✔
23
  deferred,
1✔
24
  Feature,
1✔
25
  headers,
1✔
26
  MsgHdrsImpl,
1✔
27
  nanos,
1✔
28
  nuid,
1✔
29
  QueuedIteratorImpl,
1✔
30
} from "@nats-io/nats-core/internal";
1✔
31

32
import type {
33
  ConsumerConfig,
34
  JetStreamClient,
35
  JetStreamClientImpl,
36
  JetStreamManager,
37
  JsMsg,
38
  Lister,
39
  ListerFieldFilter,
40
  PubAck,
41
  PurgeResponse,
42
  PushConsumerMessagesImpl,
43
  StorageType,
44
  StreamConfig,
45
  StreamInfo,
46
  StreamInfoRequestOptions,
47
  StreamListResponse,
48
} from "@nats-io/jetstream/internal";
49
import {
1✔
50
  DeliverPolicy,
1✔
51
  DiscardPolicy,
1✔
52
  isMessageNotFound,
1✔
53
  JetStreamApiCodes,
1✔
54
  JetStreamApiError,
1✔
55
  JsHeaders,
1✔
56
  ListerImpl,
1✔
57
  PubHeaders,
1✔
58
  StoreCompression,
1✔
59
  toJetStreamClient,
1✔
60
} from "@nats-io/jetstream/internal";
1✔
61

62
import type {
63
  ObjectInfo,
64
  ObjectResult,
65
  ObjectStore,
66
  ObjectStoreMeta,
67
  ObjectStoreMetaOptions,
68
  ObjectStoreOptions,
69
  ObjectStorePutOpts,
70
  ObjectStoreStatus,
71
  ObjectWatchInfo,
72
} from "./types.ts";
73
import { Base64UrlPaddedCodec } from "./base64.ts";
1✔
74
import { sha256 } from "js-sha256";
1✔
75
import { checkSha256, parseSha256 } from "./sha_digest.parser.ts";
1✔
76

77
export const osPrefix = "OBJ_";
1✔
78
export const digestType = "SHA-256=";
1✔
79

80
export function objectStoreStreamName(bucket: string): string {
1✔
81
  validateBucket(bucket);
46✔
82
  return `${osPrefix}${bucket}`;
46✔
83
}
46✔
84

85
export function objectStoreBucketName(stream: string): string {
1✔
86
  if (stream.startsWith(osPrefix)) {
2✔
87
    return stream.substring(4);
2✔
88
  }
2!
89
  return stream;
×
90
}
2✔
91

92
/**
93
 * The entry point to creating and managing new ObjectStore instances.
94
 */
95
export class Objm {
1✔
96
  js: JetStreamClientImpl;
1✔
97

98
  /**
99
   * Creates an instance of the Objm that allows you to create and access ObjectStore.
100
   * Note that if the argument is a NatsConnection, default JetStream Options are
101
   * used. If you want to set some options, please provide a JetStreamClient instead.
102
   * @param nc
103
   */
104
  constructor(nc: JetStreamClient | NatsConnection) {
1✔
105
    this.js = toJetStreamClient(nc) as JetStreamClientImpl;
39✔
106
  }
39✔
107

108
  /**
109
   * Creates and opens the specified ObjectStore. If the ObjectStore already exists,
110
   * it opens the existing ObjectStore.
111
   * @param name
112
   * @param opts
113
   */
114
  create(
1✔
115
    name: string,
1✔
116
    opts: Partial<ObjectStoreOptions> = {},
1✔
117
  ): Promise<ObjectStore> {
1✔
118
    return this.#maybeCreate(name, opts);
43✔
119
  }
43✔
120

121
  /**
122
   * Opens the specified ObjectStore
123
   * @param name
124
   * @param check - if set to false, it will not check if the ObjectStore exists.
125
   */
126
  async open(name: string, check = true): Promise<ObjectStore> {
1✔
127
    const jsm = await this.js.jetstreamManager();
4✔
128
    const os = new ObjectStoreImpl(name, jsm, this.js);
4✔
129
    os.stream = objectStoreStreamName(name);
4✔
130
    if (check) {
4✔
131
      await os.status();
6✔
132
    }
7✔
133
    return Promise.resolve(os);
6✔
134
  }
4✔
135

136
  #maybeCreate(
1✔
137
    name: string,
1✔
138
    opts: Partial<ObjectStoreOptions> = {},
1✔
139
  ): Promise<ObjectStore> {
140
    if (typeof crypto?.subtle?.digest !== "function") {
×
141
      return Promise.reject(
×
142
        new Error(
×
143
          "objectstore: unable to calculate hashes - crypto.subtle.digest with sha256 support is required",
×
144
        ),
145
      );
146
    }
×
147
    const { ok, min } = this.js.nc.features.get(Feature.JS_OBJECTSTORE);
43✔
148
    if (!ok) {
43✔
149
      return Promise.reject(
44✔
150
        new Error(`objectstore is only supported on servers ${min} or better`),
44✔
151
      );
152
    }
44✔
153

154
    return ObjectStoreImpl.create(this.js, name, opts);
84✔
155
  }
43✔
156

157
  /**
158
   * Returns a list of ObjectStoreStatus for all streams that are identified as
159
   * being a ObjectStore (that is having names that have the prefix `OBJ_`)
160
   */
161
  list(): Lister<ObjectStoreStatus> {
×
162
    const filter: ListerFieldFilter<ObjectStoreStatus> = (
×
163
      v: unknown,
×
164
    ): ObjectStoreStatus[] => {
165
      const slr = v as StreamListResponse;
×
166
      const streams = slr.streams.filter((v) => {
×
167
        return v.config.name.startsWith(osPrefix);
×
168
      });
×
169
      streams.forEach((si) => {
×
170
        si.config.sealed = si.config.sealed || false;
×
171
        si.config.deny_delete = si.config.deny_delete || false;
×
172
        si.config.deny_purge = si.config.deny_purge || false;
×
173
        si.config.allow_rollup_hdrs = si.config.allow_rollup_hdrs || false;
×
174
      });
×
175
      return streams.map((si) => {
×
176
        return new ObjectStoreStatusImpl(si);
×
177
      });
×
178
    };
×
179
    const subj = `${this.js.prefix}.STREAM.LIST`;
×
180
    return new ListerImpl<ObjectStoreStatus>(subj, filter, this.js);
×
181
  }
×
182
}
1✔
183

184
export class ObjectStoreStatusImpl implements ObjectStoreStatus {
1✔
185
  si: StreamInfo;
1✔
186
  backingStore: string;
1✔
187

188
  constructor(si: StreamInfo) {
1✔
189
    this.si = si;
16✔
190
    this.backingStore = "JetStream";
16✔
191
  }
16✔
192
  get bucket(): string {
1✔
193
    return objectStoreBucketName(this.si.config.name);
2✔
194
  }
2✔
195
  get description(): string {
1✔
196
    return this.si.config.description ?? "";
×
197
  }
3✔
198
  get ttl(): number {
1✔
199
    return this.si.config.max_age;
3✔
200
  }
3✔
201
  get storage(): StorageType {
1✔
202
    return this.si.config.storage;
2✔
203
  }
2✔
204
  get replicas(): number {
1✔
205
    return this.si.config.num_replicas;
3✔
206
  }
3✔
207
  get sealed(): boolean {
1✔
208
    return this.si.config.sealed;
2✔
209
  }
2✔
210
  get size(): number {
1✔
211
    return this.si.state.bytes;
2✔
212
  }
2✔
213
  get streamInfo(): StreamInfo {
1✔
214
    return this.si;
9✔
215
  }
9✔
216
  get metadata(): Record<string, string> | undefined {
1✔
217
    return this.si.config.metadata;
2✔
218
  }
2✔
219

220
  get compression(): boolean {
1✔
221
    if (this.si.config.compression) {
3✔
222
      return this.si.config.compression !== StoreCompression.None;
3✔
223
    }
3!
224
    return false;
×
225
  }
×
226
}
1✔
227
export function validateBucket(name: string) {
1✔
228
  const validBucketRe = /^[-\w]+$/;
46✔
229
  if (!validBucketRe.test(name)) {
×
230
    throw new Error(`invalid bucket name: ${name}`);
×
231
  }
×
232
}
46✔
233

234
export type ServerObjectStoreMeta = {
235
  name: string;
236
  description?: string;
237
  headers?: Record<string, string[]>;
238
  options?: ObjectStoreMetaOptions;
239
};
240

241
export type ServerObjectInfo = {
242
  bucket: string;
243
  nuid: string;
244
  size: number;
245
  chunks: number;
246
  digest: string;
247
  deleted?: boolean;
248
  mtime: string;
249
  revision: number;
250
  metadata?: Record<string, string>;
251
} & ServerObjectStoreMeta;
252

253
class ObjectInfoImpl implements ObjectInfo {
1✔
254
  info: ServerObjectInfo;
1✔
255
  hdrs!: MsgHdrs;
1✔
256
  constructor(oi: ServerObjectInfo) {
1✔
257
    this.info = oi;
84✔
258
  }
84✔
259
  get name(): string {
1✔
260
    return this.info.name;
8✔
261
  }
8✔
262
  get description(): string {
1✔
263
    return this.info.description ?? "";
×
264
  }
2✔
265
  get headers(): MsgHdrs {
1✔
266
    if (!this.hdrs) {
2✔
267
      this.hdrs = MsgHdrsImpl.fromRecord(this.info.headers || {});
×
268
    }
2✔
269
    return this.hdrs;
2✔
270
  }
2✔
271
  get options(): ObjectStoreMetaOptions | undefined {
1✔
272
    return this.info.options;
10✔
273
  }
10✔
274
  get bucket(): string {
1✔
275
    return this.info.bucket;
9✔
276
  }
9✔
277
  get chunks(): number {
1✔
278
    return this.info.chunks;
5✔
279
  }
5✔
280
  get deleted(): boolean {
1✔
281
    return this.info.deleted ?? false;
10✔
282
  }
10✔
283
  get digest(): string {
1✔
284
    return this.info.digest;
9✔
285
  }
9✔
286
  get mtime(): string {
1✔
287
    return this.info.mtime;
2✔
288
  }
2✔
289
  get nuid(): string {
1✔
290
    return this.info.nuid;
9✔
291
  }
9✔
292
  get size(): number {
1✔
293
    return this.info.size;
3✔
294
  }
3✔
295
  get revision(): number {
1✔
296
    return this.info.revision;
7✔
297
  }
7✔
298
  get metadata(): Record<string, string> {
1✔
299
    return this.info.metadata || {};
×
300
  }
3✔
301
  isLink() {
1✔
302
    return (this.info.options?.link !== undefined) &&
5✔
303
      (this.info.options?.link !== null);
5✔
304
  }
5✔
305
}
1✔
306

307
function toServerObjectStoreMeta(
59✔
308
  meta: Partial<ObjectStoreMeta>,
59✔
309
): ServerObjectStoreMeta {
310
  const v = {
59✔
311
    name: meta.name,
59✔
312
    description: meta.description ?? "",
59✔
313
    options: meta.options,
59✔
314
    metadata: meta.metadata,
59✔
315
  } as ServerObjectStoreMeta;
59✔
316

317
  if (meta.headers) {
59✔
318
    const mhi = meta.headers as MsgHdrsImpl;
60✔
319
    v.headers = mhi.toRecord();
60✔
320
  }
60✔
321
  return v;
59✔
322
}
59✔
323

324
function emptyReadableStream(): ReadableStream {
2✔
325
  return new ReadableStream({
2✔
326
    pull(c) {
2✔
327
      c.enqueue(new Uint8Array(0));
3✔
328
      c.close();
3✔
329
    },
3✔
330
  });
2✔
331
}
2✔
332

333
export class ObjectStoreImpl implements ObjectStore {
1✔
334
  jsm: JetStreamManager;
1✔
335
  js: JetStreamClient;
46✔
336
  stream!: string;
46✔
337
  name: string;
1✔
338

339
  constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) {
1✔
340
    this.name = name;
46✔
341
    this.jsm = jsm;
46✔
342
    this.js = js;
46✔
343
  }
46✔
344

345
  _checkNotEmpty(name: string): { name: string; error?: Error } {
1✔
346
    if (!name || name.length === 0) {
167✔
347
      return { name, error: new Error("name cannot be empty") };
672✔
348
    }
168✔
349
    return { name };
996✔
350
  }
167✔
351

352
  async info(name: string): Promise<ObjectInfo | null> {
1✔
353
    const info = await this.rawInfo(name);
71✔
354
    return info ? new ObjectInfoImpl(info) : null;
71✔
355
  }
71✔
356

357
  async list(): Promise<ObjectInfo[]> {
1✔
358
    const buf: ObjectInfo[] = [];
7✔
359
    const iter = await this.watch({
7✔
360
      ignoreDeletes: true,
7✔
361
      includeHistory: true,
7✔
362
      //@ts-ignore: hidden
363
      historyOnly: true,
7✔
364
    });
7✔
365

366
    // historyOnly will stop the iterator
367
    for await (const info of iter) {
7✔
368
      buf.push(info);
11✔
369
    }
11✔
370
    return Promise.resolve(buf);
7✔
371
  }
7✔
372

373
  async rawInfo(name: string): Promise<ServerObjectInfo | null> {
1✔
374
    const { name: obj, error } = this._checkNotEmpty(name);
102✔
375
    if (error) {
102✔
376
      return Promise.reject(error);
103✔
377
    }
103✔
378

379
    const meta = this._metaSubject(obj);
202✔
380
    try {
202✔
381
      const m = await this.jsm.streams.getMessage(this.stream, {
202✔
382
        last_by_subj: meta,
202✔
383
      });
202✔
384
      if (m === null) {
102✔
385
        return null;
160✔
386
      }
160✔
387
      const soi = m.json<ServerObjectInfo>();
141✔
388
      soi.revision = m.seq;
141✔
389
      return soi;
141✔
390
    } catch (err) {
102✔
391
      return Promise.reject(err);
105✔
392
    }
105✔
393
  }
102✔
394

395
  async _si(
1✔
396
    opts?: Partial<StreamInfoRequestOptions>,
1✔
397
  ): Promise<StreamInfo | null> {
1✔
398
    try {
19✔
399
      return await this.jsm.streams.info(this.stream, opts);
19✔
400
    } catch (err) {
19✔
401
      if (
22✔
402
        err instanceof JetStreamApiError &&
22✔
403
        err.code === JetStreamApiCodes.StreamNotFound
22✔
404
      ) {
22✔
405
        return null;
22✔
406
      }
22!
407
      return Promise.reject(err);
×
408
    }
×
409
  }
19✔
410

411
  async seal(): Promise<ObjectStoreStatus> {
1✔
412
    let info = await this._si();
3✔
413
    if (info === null) {
3✔
414
      return Promise.reject(new Error("object store not found"));
4✔
415
    }
4✔
416
    info.config.sealed = true;
4✔
417
    info = await this.jsm.streams.update(this.stream, info.config);
4✔
418
    return Promise.resolve(new ObjectStoreStatusImpl(info));
4✔
419
  }
3✔
420

421
  async status(
1✔
422
    opts?: Partial<StreamInfoRequestOptions>,
1✔
423
  ): Promise<ObjectStoreStatus> {
1✔
424
    const info = await this._si(opts);
17✔
425
    if (info === null) {
17✔
426
      return Promise.reject(new Error("object store not found"));
19✔
427
    }
19✔
428
    return Promise.resolve(new ObjectStoreStatusImpl(info));
31✔
429
  }
17✔
430

431
  destroy(): Promise<boolean> {
1✔
432
    return this.jsm.streams.delete(this.stream);
2✔
433
  }
2✔
434

435
  async _put(
1✔
436
    meta: ObjectStoreMeta,
1✔
437
    rs: ReadableStream<Uint8Array> | null,
1✔
438
    opts?: ObjectStorePutOpts,
1✔
439
  ): Promise<ObjectInfo> {
1✔
440
    const jsopts = this.js.getOptions();
60✔
441
    opts = opts || { timeout: jsopts.timeout };
230✔
442
    opts.timeout = opts.timeout || jsopts.timeout;
60✔
443
    opts.previousRevision = opts.previousRevision ?? undefined;
60✔
444
    const { timeout, previousRevision } = opts;
60✔
445
    const si = (this.js as unknown as { nc: NatsConnection }).nc.info;
60✔
446
    const maxPayload = si?.max_payload || 1024;
×
447
    meta = meta || {} as ObjectStoreMeta;
×
448
    meta.options = meta.options || {};
60✔
449
    let maxChunk = meta.options?.max_chunk_size || 128 * 1024;
60✔
450
    maxChunk = maxChunk > maxPayload ? maxPayload : maxChunk;
60✔
451
    meta.options.max_chunk_size = maxChunk;
60✔
452

453
    const old = await this.info(meta.name);
60✔
454
    const { name: n, error } = this._checkNotEmpty(meta.name);
116✔
455
    if (error) {
×
456
      return Promise.reject(error);
×
457
    }
✔
458

459
    const id = nuid.next();
116✔
460
    const chunkSubj = this._chunkSubject(id);
116✔
461
    const metaSubj = this._metaSubject(n);
116✔
462

463
    const info = Object.assign({
116✔
464
      bucket: this.name,
116✔
465
      nuid: id,
116✔
466
      size: 0,
116✔
467
      chunks: 0,
116✔
468
    }, toServerObjectStoreMeta(meta)) as ServerObjectInfo;
116✔
469

470
    const d = deferred<ObjectInfo>();
116✔
471

472
    const db = new DataBuffer();
116✔
473
    try {
116✔
474
      const reader = rs ? rs.getReader() : null;
60✔
475
      const sha = sha256.create();
60✔
476

477
      while (true) {
60✔
478
        const { done, value } = reader
171✔
479
          ? await reader.read()
171✔
480
          : { done: true, value: undefined };
686✔
481
        if (done) {
171✔
482
          // put any partial chunk in
483
          if (db.size() > 0) {
227✔
484
            const payload = db.drain();
273✔
485
            sha.update(payload);
273✔
486
            info.chunks!++;
273✔
487
            info.size! += payload.length;
273✔
488
            await this.js.publish(chunkSubj, payload, { timeout });
819✔
489
          }
273✔
490

491
          // prepare the metadata
492
          info.mtime = new Date().toISOString();
227✔
493
          const digest = Base64UrlPaddedCodec.encode(
227✔
494
            Uint8Array.from(sha.digest()),
227✔
495
          );
496
          info.digest = `${digestType}${digest}`;
227✔
497

498
          info.deleted = false;
227✔
499

500
          // trailing md for the object
501
          const h = headers();
227✔
502
          if (typeof previousRevision === "number") {
227✔
503
            h.set(
232✔
504
              PubHeaders.ExpectedLastSubjectSequenceHdr,
232✔
505
              `${previousRevision}`,
232✔
506
            );
507
          }
232✔
508
          h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
227✔
509

510
          // try to update the metadata
511
          const pa = await this.js.publish(metaSubj, JSON.stringify(info), {
227✔
512
            headers: h,
227✔
513
            timeout,
227✔
514
          });
227✔
515
          // update the revision to point to the sequence where we inserted
516
          info.revision = pa.seq;
227✔
517

518
          // if we are here, the new entry is live
519
          if (old) {
227✔
520
            try {
233✔
521
              await this.jsm.streams.purge(this.stream, {
233✔
522
                filter: `$O.${this.name}.C.${old.nuid}`,
233✔
523
              });
233✔
524
            } catch (_err) {
×
525
              // rejecting here, would mean send the wrong signal
526
              // the update succeeded, but cleanup of old chunks failed.
527
            }
×
528
          }
233✔
529

530
          // resolve the ObjectInfo
531
          d.resolve(new ObjectInfoImpl(info!));
227✔
532
          // stop
533
          break;
227✔
534
        }
227✔
535
        if (value) {
226✔
536
          db.fill(value);
226✔
537
          while (db.size() > maxChunk) {
226✔
538
            info.chunks!++;
12,774✔
539
            info.size! += maxChunk;
12,774✔
540
            const payload = db.drain(meta.options.max_chunk_size);
12,774✔
541
            sha.update(payload);
12,774✔
542
            await this.js.publish(chunkSubj, payload, { timeout });
38,322✔
543
          }
12,774✔
544
        }
226✔
545
      }
171✔
546
    } catch (err) {
×
547
      // we failed, remove any partials
548
      await this.jsm.streams.purge(this.stream, { filter: chunkSubj });
×
549
      d.reject(err);
×
550
    }
✔
551

552
    return d;
116✔
553
  }
60✔
554

555
  putBlob(
1✔
556
    meta: ObjectStoreMeta,
1✔
557
    data: Uint8Array | null,
1✔
558
    opts?: ObjectStorePutOpts,
1✔
559
  ): Promise<ObjectInfo> {
1✔
560
    function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
3✔
561
      return new ReadableStream<Uint8Array>({
5✔
562
        pull(controller) {
5✔
563
          controller.enqueue(data);
7✔
564
          controller.close();
7✔
565
        },
7✔
566
      });
5✔
567
    }
5✔
568
    if (data === null) {
×
569
      data = new Uint8Array(0);
×
570
    }
×
571
    return this.put(meta, readableStreamFrom(data), opts);
3✔
572
  }
3✔
573

574
  put(
1✔
575
    meta: ObjectStoreMeta,
1✔
576
    rs: ReadableStream<Uint8Array> | null,
1✔
577
    opts?: ObjectStorePutOpts,
1✔
578
  ): Promise<ObjectInfo> {
1✔
579
    if (meta?.options?.link) {
60✔
580
      return Promise.reject(
61✔
581
        new Error("link cannot be set when putting the object in bucket"),
61✔
582
      );
583
    }
61✔
584
    return this._put(meta, rs, opts);
118✔
585
  }
60✔
586

587
  async getBlob(name: string): Promise<Uint8Array | null> {
1✔
588
    async function fromReadableStream(
5✔
589
      rs: ReadableStream<Uint8Array>,
5✔
590
    ): Promise<Uint8Array> {
591
      const buf = new DataBuffer();
9✔
592
      const reader = rs.getReader();
9✔
593
      while (true) {
9✔
594
        const { done, value } = await reader.read();
33✔
595
        if (done) {
33✔
596
          return buf.drain();
37✔
597
        }
37✔
598
        if (value && value.length) {
33✔
599
          buf.fill(value);
53✔
600
        }
53✔
601
      }
33✔
602
    }
9✔
603

604
    const r = await this.get(name);
5✔
605
    if (r === null) {
×
606
      return Promise.resolve(null);
×
607
    }
×
608

609
    const vs = await Promise.all([r.error, fromReadableStream(r.data)]);
20✔
610
    if (vs[0]) {
×
611
      return Promise.reject(vs[0]);
×
612
    } else {
×
613
      return Promise.resolve(vs[1]);
5✔
614
    }
5✔
615
  }
5✔
616

617
  async get(name: string): Promise<ObjectResult | null> {
1✔
618
    const info = await this.rawInfo(name);
17✔
619
    if (info === null) {
17✔
620
      return Promise.resolve(null);
18✔
621
    }
18✔
622

623
    if (info.deleted) {
17✔
624
      return Promise.resolve(null);
18✔
625
    }
18✔
626

627
    if (info.options && info.options.link) {
17✔
628
      const ln = info.options.link.name || "";
×
629
      if (ln === "") {
×
630
        throw new Error("link is a bucket");
×
631
      }
×
632
      const os = info.options.link.bucket !== this.name
19✔
633
        ? await ObjectStoreImpl.create(
19✔
634
          this.js,
19✔
635
          info.options.link.bucket,
19✔
636
        )
637
        : this;
19✔
638
      return os.get(ln);
19✔
639
    }
19✔
640

641
    if (!info.digest.startsWith(digestType)) {
×
642
      return Promise.reject(new Error(`unknown digest type: ${info.digest}`));
×
643
    }
✔
644
    const digest = parseSha256(info.digest.substring(8));
28✔
645
    if (digest === null) {
×
646
      return Promise.reject(
×
647
        new Error(`unable to parse digest: ${info.digest}`),
×
648
      );
649
    }
✔
650

651
    const d = deferred<Error | null>();
28✔
652

653
    const r: Partial<ObjectResult> = {
28✔
654
      info: new ObjectInfoImpl(info),
28✔
655
      error: d,
28✔
656
    };
28✔
657
    if (info.size === 0) {
17✔
658
      r.data = emptyReadableStream();
18✔
659
      d.resolve(null);
18✔
660
      return Promise.resolve(r as ObjectResult);
18✔
661
    }
18✔
662

663
    const sha = sha256.create();
27✔
664
    let controller: ReadableStreamDefaultController;
27✔
665

666
    const cc: Partial<ConsumerConfig> = {};
27✔
667
    cc.filter_subject = `$O.${this.name}.C.${info.nuid}`;
27✔
668
    cc.idle_heartbeat = nanos(30_000);
27✔
669
    cc.flow_control = true;
27✔
670
    const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
27✔
671
    const iter = await oc.consume() as PushConsumerMessagesImpl;
27✔
672

673
    (async () => {
27✔
674
      for await (const jm of iter) {
37✔
675
        if (jm.data.length > 0) {
195✔
676
          sha.update(jm.data);
195✔
677
          controller!.enqueue(jm.data);
195✔
678
        }
195✔
679
        if (jm.info.pending === 0) {
195✔
680
          const digest = Uint8Array.from(sha.digest());
205✔
681
          if (!checkSha256(digest, Uint8Array.from(sha.digest()))) {
×
682
            controller!.error(
×
683
              new Error(
×
684
                `received a corrupt object, digests do not match received: ${info.digest} calculated ${digest}`,
×
685
              ),
686
            );
687
          } else {
×
688
            controller!.close();
205✔
689
          }
205✔
690
          break;
205✔
691
        }
205✔
692
      }
195✔
693
    })()
27✔
694
      .then(() => {
27✔
695
        d.resolve();
37✔
696
      })
×
697
      .catch((err) => {
×
698
        controller!.error(err);
×
699
        d.reject(err);
×
700
      });
×
701

702
    r.data = new ReadableStream({
27✔
703
      start(c) {
27✔
704
        controller = c;
37✔
705
      },
27✔
706
      cancel() {
×
707
        iter.stop();
×
708
      },
×
709
    });
27✔
710

711
    return r as ObjectResult;
27✔
712
  }
17✔
713

714
  linkStore(name: string, bucket: ObjectStore): Promise<ObjectInfo> {
1✔
715
    if (!(bucket instanceof ObjectStoreImpl)) {
×
716
      return Promise.reject("bucket required");
×
717
    }
×
718
    const osi = bucket as ObjectStoreImpl;
2✔
719
    const { name: n, error } = this._checkNotEmpty(name);
2✔
720
    if (error) {
×
721
      return Promise.reject(error);
×
722
    }
×
723

724
    const meta = {
2✔
725
      name: n,
2✔
726
      options: { link: { bucket: osi.name } },
10✔
727
    };
2✔
728
    return this._put(meta, null);
2✔
729
  }
2✔
730

731
  async link(name: string, info: ObjectInfo): Promise<ObjectInfo> {
1✔
732
    const { name: n, error } = this._checkNotEmpty(name);
6✔
733
    if (error) {
×
734
      return Promise.reject(error);
×
735
    }
×
736
    if (info.deleted) {
6✔
737
      return Promise.reject(new Error("src object is deleted"));
7✔
738
    }
7✔
739
    if ((info as ObjectInfoImpl).isLink()) {
6✔
740
      return Promise.reject(new Error("src object is a link"));
7✔
741
    }
7✔
742
    const dest = await this.rawInfo(name);
9✔
743
    if (dest !== null && !dest.deleted) {
×
744
      return Promise.reject(
×
745
        new Error("an object already exists with that name"),
×
746
      );
747
    }
✔
748

749
    const link = { bucket: info.bucket, name: info.name };
36✔
750
    const mm = {
9✔
751
      name: n,
9✔
752
      bucket: info.bucket,
9✔
753
      options: { link: link },
27✔
754
    } as ObjectStoreMeta;
9✔
755
    await this.js.publish(this._metaSubject(name), JSON.stringify(mm));
9✔
756
    const i = await this.info(name);
9✔
757
    return Promise.resolve(i!);
9✔
758
  }
6✔
759

760
  async delete(name: string): Promise<PurgeResponse> {
1✔
761
    const info = await this.rawInfo(name);
7✔
762
    if (info === null) {
7✔
763
      return Promise.resolve({ purged: 0, success: false });
32✔
764
    }
8✔
765
    info.deleted = true;
12✔
766
    info.size = 0;
12✔
767
    info.chunks = 0;
12✔
768
    info.digest = "";
12✔
769

770
    const h = headers();
12✔
771
    h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
12✔
772

773
    await this.js.publish(this._metaSubject(info.name), JSON.stringify(info), {
12✔
774
      headers: h,
12✔
775
    });
12✔
776
    return this.jsm.streams.purge(this.stream, {
12✔
777
      filter: this._chunkSubject(info.nuid),
12✔
778
    });
12✔
779
  }
7✔
780

781
  async update(
1✔
782
    name: string,
1✔
783
    meta: Partial<ObjectStoreMeta> = {},
1✔
784
  ): Promise<PubAck> {
1✔
785
    const info = await this.rawInfo(name);
7✔
786
    if (info === null) {
7✔
787
      return Promise.reject(new Error("object not found"));
9✔
788
    }
9✔
789
    if (info.deleted) {
7✔
790
      return Promise.reject(
8✔
791
        new Error("cannot update meta for a deleted object"),
8✔
792
      );
793
    }
8✔
794
    meta.name = meta.name ?? info.name;
7✔
795
    const { name: n, error } = this._checkNotEmpty(meta.name);
7✔
796
    if (error) {
×
797
      return Promise.reject(error);
×
798
    }
✔
799
    if (name !== meta.name) {
7✔
800
      const i = await this.info(meta.name);
9✔
801
      if (i && !i.deleted) {
9✔
802
        return Promise.reject(
10✔
803
          new Error("an object already exists with that name"),
10✔
804
        );
805
      }
10✔
806
    }
9✔
807
    meta.name = n;
9✔
808
    const ii = Object.assign({}, info, toServerObjectStoreMeta(meta!));
9✔
809
    // if the name changed, delete the old meta
810
    const ack = await this.js.publish(
9✔
811
      this._metaSubject(ii.name),
9✔
812
      JSON.stringify(ii),
9✔
813
    );
814
    if (name !== meta.name) {
7✔
815
      await this.jsm.streams.purge(this.stream, {
8✔
816
        filter: this._metaSubject(name),
8✔
817
      });
8✔
818
    }
8✔
819
    return Promise.resolve(ack);
9✔
820
  }
7✔
821

822
  async watch(opts: Partial<
1✔
823
    {
824
      ignoreDeletes?: boolean;
825
      includeHistory?: boolean;
826
    }
827
  > = {}): Promise<QueuedIterator<ObjectWatchInfo>> {
1✔
828
    opts.includeHistory = opts.includeHistory ?? false;
11✔
829
    opts.ignoreDeletes = opts.ignoreDeletes ?? false;
11✔
830
    // @ts-ignore: not exposed
831
    const historyOnly = opts.historyOnly ?? false;
11✔
832
    const qi = new QueuedIteratorImpl<ObjectWatchInfo>();
11✔
833
    const subj = this._metaSubjectAll();
11✔
834
    try {
11✔
835
      await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj });
33✔
836
    } catch (err) {
×
837
      if (!isMessageNotFound(err as Error)) {
×
838
        qi.stop(err as Error);
×
839
      }
×
840
    }
×
841
    const cc: Partial<ConsumerConfig> = {};
11✔
842
    cc.name = `OBJ_WATCHER_${nuid.next()}`;
11✔
843
    cc.filter_subject = subj;
11✔
844
    if (opts.includeHistory) {
11✔
845
      cc.deliver_policy = DeliverPolicy.LastPerSubject;
19✔
846
    } else {
11✔
847
      // FIXME: Go's implementation doesn't seem correct - if history is not desired
848
      //  the watch should only be giving notifications on new entries
849
      cc.deliver_policy = DeliverPolicy.New;
13✔
850
    }
13✔
851

852
    const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
11✔
853
    const info = await oc.info(true);
11✔
854
    const count = info.num_pending;
11✔
855
    let isUpdate = cc.deliver_policy === DeliverPolicy.New || count === 0;
11✔
856
    qi._data = oc;
11✔
857
    let i = 0;
11✔
858
    const iter = await oc.consume({
11✔
859
      callback: (jm: JsMsg) => {
11✔
860
        if (!isUpdate) {
22✔
861
          i++;
27✔
862
          isUpdate = i >= count;
27✔
863
        }
27✔
864
        const oi = jm.json<ObjectWatchInfo>();
22✔
865
        oi.isUpdate = isUpdate;
22✔
866
        if (oi.deleted && opts.ignoreDeletes === true) {
×
867
          // do nothing
868
        } else {
×
869
          qi.push(oi);
22✔
870
        }
22✔
871
        if (historyOnly && i === count) {
22✔
872
          iter.stop();
25✔
873
        }
25✔
874
      },
22✔
875
    });
11✔
876

877
    (async () => {
11✔
NEW
878
      for await (const s of iter.status()) {
×
NEW
879
        switch (s.type) {
×
NEW
880
          case "heartbeat":
×
NEW
881
            if (historyOnly) {
×
882
              // we got all the keys...
NEW
883
              qi.push(() => {
×
NEW
884
                qi.stop();
×
NEW
885
              });
×
NEW
886
            }
×
NEW
887
        }
×
NEW
888
      }
×
889
    })().then();
11✔
890

891
    if (historyOnly && count === 0) {
11✔
892
      iter.stop();
14✔
893
    }
14✔
894

895
    iter.closed().then(() => {
11✔
896
      qi.push(() => {
21✔
897
        qi.stop();
27✔
898
      });
21✔
899
    });
11✔
900
    qi.iterClosed.then(() => {
11✔
901
      iter.stop();
21✔
902
    });
11✔
903

904
    return qi;
11✔
905
  }
11✔
906

907
  _chunkSubject(id: string) {
1✔
908
    return `$O.${this.name}.C.${id}`;
62✔
909
  }
62✔
910

911
  _metaSubject(n: string): string {
1✔
912
    return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(n)}`;
168✔
913
  }
168✔
914

915
  _metaSubjectAll(): string {
1✔
916
    return `$O.${this.name}.M.>`;
11✔
917
  }
11✔
918

919
  async init(opts: Partial<ObjectStoreOptions> = {}): Promise<void> {
1✔
920
    try {
43✔
921
      this.stream = objectStoreStreamName(this.name);
43✔
922
    } catch (err) {
×
923
      return Promise.reject(err);
×
924
    }
×
925
    const max_age = opts?.ttl || 0;
43✔
926
    delete opts.ttl;
43✔
927
    // pacify the tsc compiler downstream
928
    const sc = Object.assign({ max_age }, opts) as unknown as StreamConfig;
129✔
929
    sc.name = this.stream;
43✔
930
    sc.allow_direct = true;
43✔
931
    sc.allow_rollup_hdrs = true;
43✔
932
    sc.num_replicas = opts.replicas || 1;
43✔
933
    sc.discard = DiscardPolicy.New;
43✔
934
    sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`];
172✔
935
    if (opts.placement) {
×
936
      sc.placement = opts.placement;
×
937
    }
×
938
    if (opts.metadata) {
43✔
939
      sc.metadata = opts.metadata;
44✔
940
    }
44✔
941
    if (typeof opts.compression === "boolean") {
43✔
942
      sc.compression = opts.compression
×
943
        ? StoreCompression.S2
×
944
        : StoreCompression.None;
×
945
    }
44✔
946

947
    try {
43✔
948
      await this.jsm.streams.info(sc.name);
43✔
949
    } catch (err) {
43✔
950
      if ((err as Error).message === "stream not found") {
84✔
951
        await this.jsm.streams.add(sc);
84✔
952
      }
84✔
953
    }
84✔
954
  }
43✔
955

956
  static async create(
1✔
957
    js: JetStreamClient,
1✔
958
    name: string,
1✔
959
    opts: Partial<ObjectStoreOptions> = {},
1✔
960
  ): Promise<ObjectStore> {
1✔
961
    const jsm = await js.jetstreamManager();
43✔
962
    const os = new ObjectStoreImpl(name, jsm, js);
43✔
963
    await os.init(opts);
43✔
964
    return Promise.resolve(os);
43✔
965
  }
43✔
966
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc