• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 15946344504

28 Jun 2025 05:05PM UTC coverage: 95.791% (-0.9%) from 96.662%
15946344504

push

github

rotorsoft
chore: up lock

233 of 254 branches covered (91.73%)

Branch coverage included in aggregate %.

427 of 435 relevant lines covered (98.16%)

18.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.52
/libs/act/src/adapters/InMemoryStore.ts
1
/**
2
 * In-memory event store adapter for the Act Framework.
3
 *
4
 * This adapter implements the Store interface and is suitable for development, testing, and demonstration purposes.
5
 * All data is stored in memory and lost on process exit.
6
 *
7
 * @category Adapters
8
 */
9
import { SNAP_EVENT } from "../ports.js";
10
import { ConcurrencyError } from "../types/errors.js";
11
import type {
12
  Committed,
13
  EventMeta,
14
  Lease,
15
  Message,
16
  Query,
17
  Schemas,
18
  Store,
19
} from "../types/index.js";
20
import { sleep } from "../utils.js";
21

22
class InMemoryStream {
23
  _at = -1;
10✔
24
  _retry = -1;
10✔
25
  _lease: Lease | undefined;
26
  _blocked = false;
10✔
27

28
  constructor(public readonly stream: string) {}
10✔
29

30
  /**
31
   * Attempt to lease this stream for processing.
32
   * @param lease - Lease request.
33
   * @returns The granted lease or undefined if blocked.
34
   */
35
  lease(lease: Lease): Lease | undefined {
36
    if (!this._blocked && lease.at > this._at) {
13!
37
      this._lease = { ...lease, retry: this._retry + 1 };
13✔
38
      return this._lease;
13✔
39
    }
40
  }
41

42
  /**
43
   * Acknowledge completion of processing for this stream.
44
   * @param lease - Lease to acknowledge.
45
   */
46
  ack(lease: Lease) {
47
    if (this._lease && lease.at >= this._at) {
11✔
48
      this._retry = lease.retry;
10✔
49
      this._blocked = lease.block;
10✔
50
      if (!this._blocked && !lease.error) {
10✔
51
        this._at = lease.at;
3✔
52
        this._retry = 0;
3✔
53
      }
54
      this._lease = undefined;
10✔
55
    }
56
  }
57
}
58

59
/**
60
 * In-memory implementation of the Store interface.
61
 *
62
 * Suitable for development, testing, and demonstration. Not for production use.
63
 * All events and streams are stored in memory and lost on process exit.
64
 *
65
 * @example
66
 *   const store = new InMemoryStore();
67
 *   await store.commit('streamA', [{ name: 'event', data: {} }], meta);
68
 */
69
export class InMemoryStore implements Store {
70
  // stored events
71
  private _events: Committed<Schemas, keyof Schemas>[] = [];
46✔
72
  // stored stream positions and other metadata
73
  private _streams: Map<string, InMemoryStream> = new Map();
46✔
74

75
  /**
76
   * Dispose of the store and clear all events.
77
   */
78
  async dispose() {
79
    await sleep();
27✔
80
    this._events.length = 0;
27✔
81
  }
82

83
  /**
84
   * Seed the store with initial data (no-op for in-memory).
85
   */
86
  async seed() {
87
    await sleep();
5✔
88
  }
89

90
  /**
91
   * Drop all data from the store.
92
   */
93
  async drop() {
94
    await sleep();
5✔
95
    this._events.length = 0;
5✔
96
  }
97

98
  /**
99
   * Query events in the store, optionally filtered by query options.
100
   * @param callback - Function to call for each event.
101
   * @param query - Optional query options.
102
   * @returns The number of events processed.
103
   */
104
  async query<E extends Schemas>(
105
    callback: (event: Committed<E, keyof E>) => void,
106
    query?: Query
107
  ) {
108
    await sleep();
51✔
109
    const {
110
      stream,
111
      names,
112
      before,
113
      after = -1,
37✔
114
      limit,
115
      created_before,
116
      created_after,
117
      correlation,
118
    } = query || {};
51✔
119
    let i = after + 1,
51✔
120
      count = 0;
51✔
121
    while (i < this._events.length) {
51✔
122
      const e = this._events[i++];
51✔
123
      if (stream && e.stream !== stream) continue;
51✔
124
      if (names && !names.includes(e.name)) continue;
49✔
125
      if (correlation && e.meta?.correlation !== correlation) continue;
46✔
126
      if (created_after && e.created <= created_after) continue;
44!
127
      if (before && e.id >= before) break;
44✔
128
      if (created_before && e.created >= created_before) break;
43!
129
      callback(e as Committed<E, keyof E>);
43✔
130
      count++;
43✔
131
      if (limit && count >= limit) break;
43✔
132
    }
133
    return count;
51✔
134
  }
135

136
  /**
137
   * Commit one or more events to a stream.
138
   * @param stream - The stream name.
139
   * @param msgs - The events/messages to commit.
140
   * @param meta - Event metadata.
141
   * @param expectedVersion - Optional optimistic concurrency check.
142
   * @returns The committed events with metadata.
143
   * @throws ConcurrencyError if expectedVersion does not match.
144
   */
145
  async commit<E extends Schemas>(
146
    stream: string,
147
    msgs: Message<E, keyof E>[],
148
    meta: EventMeta,
149
    expectedVersion?: number
150
  ) {
151
    await sleep();
34✔
152
    const instance = this._events.filter((e) => e.stream === stream); // ignore state events, this is a production optimization
34✔
153
    if (expectedVersion && instance.length - 1 !== expectedVersion)
34✔
154
      throw new ConcurrencyError(
1✔
155
        instance.length - 1,
156
        msgs as Message<Schemas, keyof Schemas>[],
157
        expectedVersion
158
      );
159

160
    let version = instance.length;
33✔
161
    return msgs.map(({ name, data }) => {
33✔
162
      const committed: Committed<E, keyof E> = {
37✔
163
        id: this._events.length,
164
        stream,
165
        version,
166
        created: new Date(),
167
        name,
168
        data,
169
        meta,
170
      };
171
      this._events.push(committed as Committed<Schemas, keyof Schemas>);
37✔
172
      version++;
37✔
173
      return committed;
37✔
174
    });
175
  }
176

177
  /**
178
   * Fetches new events from stream watermarks for processing.
179
   * @param limit - Maximum number of streams to fetch.
180
   * @returns Fetched streams and events.
181
   */
182
  async fetch<E extends Schemas>(limit: number) {
183
    const streams = [...this._streams.values()]
14✔
184
      .filter((s) => !s._blocked)
2✔
185
      .sort((a, b) => a._at - b._at)
×
186
      .slice(0, limit);
187

188
    const after = streams.length
14✔
189
      ? streams.reduce(
190
          (min, s) => Math.min(min, s._at),
2✔
191
          Number.MAX_SAFE_INTEGER
192
        )
193
      : -1;
194

195
    const events: Committed<E, keyof E>[] = [];
14✔
196
    await this.query<E>((e) => e.name !== SNAP_EVENT && events.push(e), {
14✔
197
      after,
198
      limit,
199
    });
200
    return { streams: streams.map(({ stream }) => stream), events };
14✔
201
  }
202

203
  /**
204
   * Lease streams for processing (e.g., for distributed consumers).
205
   * @param leases - Lease requests.
206
   * @returns Granted leases.
207
   */
208
  async lease(leases: Lease[]) {
209
    await sleep();
14✔
210
    return leases
14✔
211
      .map((lease) => {
212
        const stream =
213
          this._streams.get(lease.stream) ||
13✔
214
          // store new correlations
215
          this._streams
216
            .set(lease.stream, new InMemoryStream(lease.stream))
217
            .get(lease.stream)!;
218
        return stream.lease(lease);
13✔
219
      })
220
      .filter((l): l is Lease => !!l);
13✔
221
  }
222

223
  /**
224
   * Acknowledge completion of processing for leased streams.
225
   * @param leases - Leases to acknowledge.
226
   */
227
  async ack(leases: Lease[]) {
228
    await sleep();
12✔
229
    leases.forEach((lease) => this._streams.get(lease.stream)?.ack(lease));
12✔
230
  }
231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc