• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 15800362180

21 Jun 2025 10:45PM UTC coverage: 95.007% (+3.4%) from 91.629%
15800362180

push

github

rotorsoft
chore: reduce branch coverage

233 of 260 branches covered (89.62%)

Branch coverage included in aggregate %.

414 of 421 relevant lines covered (98.34%)

16.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/libs/act/src/adapters/InMemoryStore.ts
1
import { ConcurrencyError } from "../types/errors.js";
2
import type {
3
  Committed,
4
  EventMeta,
5
  Lease,
6
  Message,
7
  Query,
8
  Schemas,
9
  Store,
10
} from "../types/index.js";
11
import { sleep } from "../utils.js";
12

13
class InMemoryStream {
14
  _at = -1;
8✔
15
  _retry = -1;
8✔
16
  _lease: Lease | undefined;
17
  _blocked = false;
8✔
18

19
  constructor(public readonly stream: string) {}
8✔
20

21
  lease(lease: Lease): Lease | undefined {
22
    if (!this._blocked && lease.at > this._at) {
12!
23
      this._lease = { ...lease, retry: this._retry + 1 };
12✔
24
      return this._lease;
12✔
25
    }
26
  }
27

28
  ack(lease: Lease) {
29
    if (this._lease && lease.at >= this._at) {
10✔
30
      this._retry = lease.retry;
9✔
31
      this._blocked = lease.block;
9✔
32
      if (!this._retry && !this._blocked) {
9✔
33
        this._at = lease.at;
3✔
34
      }
35
      this._lease = undefined;
9✔
36
    }
37
  }
38
}
39

40
/**
41
 * @category Adapters
42
 * @remarks In-memory event store
43
 */
44
export class InMemoryStore implements Store {
45
  // stored events
46
  private _events: Committed<Schemas, keyof Schemas>[] = [];
35✔
47
  // stored stream positions and other metadata
48
  private _streams: Map<string, InMemoryStream> = new Map();
35✔
49

50
  async dispose() {
51
    await sleep();
24✔
52
    this._events.length = 0;
24✔
53
  }
54

55
  async seed() {
56
    await sleep();
4✔
57
  }
58

59
  async drop() {
60
    await sleep();
4✔
61
    this._events.length = 0;
4✔
62
  }
63

64
  async query<E extends Schemas>(
65
    callback: (event: Committed<E, keyof E>) => void,
66
    query?: Query
67
  ) {
68
    await sleep();
43✔
69
    const {
70
      stream,
71
      names,
72
      before,
73
      after = -1,
33✔
74
      limit,
75
      created_before,
76
      created_after,
77
      correlation,
78
    } = query || {};
43✔
79
    let i = after + 1,
43✔
80
      count = 0;
43✔
81
    while (i < this._events.length) {
43✔
82
      const e = this._events[i++];
47✔
83
      if (stream && e.stream !== stream) continue;
47✔
84
      if (names && !names.includes(e.name)) continue;
45✔
85
      if (correlation && e.meta?.correlation !== correlation) continue;
42✔
86
      if (created_after && e.created <= created_after) continue;
40!
87
      if (before && e.id >= before) break;
40✔
88
      if (created_before && e.created >= created_before) break;
39!
89
      callback(e as Committed<E, keyof E>);
39✔
90
      count++;
39✔
91
      if (limit && count >= limit) break;
39✔
92
    }
93
    return count;
43✔
94
  }
95

96
  async commit<E extends Schemas>(
97
    stream: string,
98
    msgs: Message<E, keyof E>[],
99
    meta: EventMeta,
100
    expectedVersion?: number
101
  ) {
102
    await sleep();
28✔
103
    const instance = this._events.filter((e) => e.stream === stream); // ignore state events, this is a production optimization
28✔
104
    if (expectedVersion && instance.length - 1 !== expectedVersion)
28✔
105
      throw new ConcurrencyError(
1✔
106
        instance.length - 1,
107
        msgs as Message<Schemas, keyof Schemas>[],
108
        expectedVersion
109
      );
110

111
    let version = instance.length;
27✔
112
    return msgs.map(({ name, data }) => {
27✔
113
      const committed: Committed<E, keyof E> = {
31✔
114
        id: this._events.length,
115
        stream,
116
        version,
117
        created: new Date(),
118
        name,
119
        data,
120
        meta,
121
      };
122
      this._events.push(committed as Committed<Schemas, keyof Schemas>);
31✔
123
      version++;
31✔
124
      return committed;
31✔
125
    });
126
  }
127

128
  /**
129
   * Fetches new events from stream watermarks
130
   */
131
  async fetch<E extends Schemas>(limit: number) {
132
    const streams = [...this._streams.values()]
10✔
133
      .filter((s) => !s._blocked)
3✔
134
      .sort((a, b) => a._at - b._at)
×
135
      .slice(0, limit);
136

137
    const after = streams.length
10✔
138
      ? streams.reduce(
139
          (min, s) => Math.min(min, s._at),
3✔
140
          Number.MAX_SAFE_INTEGER
141
        )
142
      : -1;
143

144
    const events: Committed<E, keyof E>[] = [];
10✔
145
    await this.query<E>((e) => events.push(e), { after, limit });
10✔
146
    return { streams: streams.map(({ stream }) => stream), events };
10✔
147
  }
148

149
  async lease(leases: Lease[]) {
150
    await sleep();
12✔
151
    return leases
12✔
152
      .map((lease) => {
153
        const stream =
154
          this._streams.get(lease.stream) ||
12✔
155
          // store new correlations
156
          this._streams
157
            .set(lease.stream, new InMemoryStream(lease.stream))
158
            .get(lease.stream)!;
159
        return stream.lease(lease);
12✔
160
      })
161
      .filter((l): l is Lease => !!l);
12✔
162
  }
163

164
  async ack(leases: Lease[]) {
165
    await sleep();
10✔
166
    leases.forEach((lease) => this._streams.get(lease.stream)?.ack(lease));
10✔
167
  }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc