• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 23378774633

21 Mar 2026 11:33AM UTC coverage: 98.358%. First build
23378774633

Pull #38

github

web-flow
Merge 699ec9686 into 37f832b35
Pull Request #38: New: Redis-backed projection views with distributed locking (experimental)

600 of 653 branches covered (91.88%)

176 of 206 new or added lines in 9 files covered. (85.44%)

1797 of 1827 relevant lines covered (98.36%)

39.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.49
/src/redis/RedisObjectStorage.ts
1
import type { Redis } from 'ioredis';
2
import type { IContainer } from 'node-cqrs';
3
import type { IObjectStorage } from '../interfaces/index.ts';
4
import { assertFunction, assertString } from '../utils/assert.ts';
3✔
5
import { AbstractRedisAccessor } from './AbstractRedisAccessor.ts';
3✔
6

7
/**
8
 * Lua script for atomic version-checked update.
9
 *
10
 * KEYS[1] = record key
11
 * ARGV[1] = expected version (as string)
12
 * ARGV[2] = new envelope JSON to store
13
 *
14
 * Returns:
15
 *   1  = success (updated)
16
 *   0  = version mismatch (retry)
17
 *  -1  = key does not exist
18
 */
19
const SCRIPT_UPDATE_IF_VERSION = `
3✔
20
local current = redis.call("GET", KEYS[1])
21
if not current then return -1 end
22
local envelope = cjson.decode(current)
23
if tostring(envelope["v"]) ~= ARGV[1] then return 0 end
24
redis.call("SET", KEYS[1], ARGV[2])
25
return 1
26
`;
27

28
type RecordEnvelope<TRecord> = { d: TRecord; v: number };
29

30
/**
31
 * Redis-backed implementation of IObjectStorage.
32
 *
33
 * Each record is stored as a JSON string at key `{tableName}:{id}` with the shape
34
 * `{ "d": <data>, "v": <version> }`.  The version field enables optimistic
35
 * concurrency control: `update` and `updateEnforcingNew` re-read the record after
36
 * the user callback runs and atomically commit only when the version still matches.
37
 * On mismatch the operation retries up to `maxRetries` times.
38
 */
39
export class RedisObjectStorage<TRecord> extends AbstractRedisAccessor implements IObjectStorage<TRecord> {
3✔
40

41
        #tableName: string;
42
        #maxRetries: number;
43

44
        constructor(o: Partial<Pick<IContainer, 'viewModelRedis' | 'viewModelRedisFactory'>> & {
45
                tableName: string;
46
                maxRetries?: number;
47
        }) {
48
                super(o);
27✔
49

50
                assertString(o.tableName, 'o.tableName');
26✔
51

52
                this.#tableName = o.tableName;
25✔
53
                this.#maxRetries = o.maxRetries ?? 100;
25✔
54
        }
55

56
        // eslint-disable-next-line class-methods-use-this
57
        protected initialize(_redis: Redis): void {
58
                // No Redis-level setup required for object storage
59
        }
60

61
        #key(id: string): string {
62
                return `${this.#tableName}:${id}`;
2,587✔
63
        }
64

65
        async get(id: string): Promise<TRecord | undefined> {
66
                assertString(id, 'id');
9✔
67

68
                await this.assertConnection();
8✔
69

70
                const raw = await this.redis!.get(this.#key(id));
8✔
71
                if (!raw)
8✔
72
                        return undefined;
2✔
73

74
                const envelope: RecordEnvelope<TRecord> = JSON.parse(raw);
6✔
75
                return envelope.d;
6✔
76
        }
77

78
        async create(id: string, data: TRecord): Promise<void> {
79
                assertString(id, 'id');
11✔
80

81
                await this.assertConnection();
10✔
82

83
                const envelope: RecordEnvelope<TRecord> = { d: data, v: 1 };
10✔
84
                const result = await this.redis!.set(this.#key(id), JSON.stringify(envelope), 'NX');
10✔
85
                if (!result)
10✔
86
                        throw new Error(`Record '${id}' could not be created`);
1✔
87
        }
88

89
        async update(id: string, update: (r: TRecord) => TRecord): Promise<void> {
90
                assertString(id, 'id');
6✔
91
                assertFunction(update, 'update');
5✔
92

93
                await this.assertConnection();
4✔
94

95
                for (let attempt = 0; attempt <= this.#maxRetries; attempt++) {
4✔
96
                        const raw = await this.redis!.get(this.#key(id));
5✔
97
                        if (!raw)
5✔
98
                                throw new Error(`Record '${id}' does not exist`);
1✔
99

100
                        const envelope: RecordEnvelope<TRecord> = JSON.parse(raw);
4✔
101
                        const updated: RecordEnvelope<TRecord> = {
4✔
102
                                d: update(envelope.d),
103
                                v: envelope.v + 1
104
                        };
105

106
                        const result = await this.redis!.eval(
4✔
107
                                SCRIPT_UPDATE_IF_VERSION,
108
                                1,
109
                                this.#key(id),
110
                                String(envelope.v),
111
                                JSON.stringify(updated)
112
                        ) as number;
113

114
                        if (result === 1)
4✔
115
                                return;
2✔
116

117
                        if (result === -1)
2!
NEW
118
                                throw new Error(`Record '${id}' does not exist`);
×
119

120
                        // result === 0: version mismatch — retry
121
                }
122

123
                throw new Error(`Record '${id}' could not be updated after ${this.#maxRetries} retries`);
1✔
124
        }
125

126
        async updateEnforcingNew(id: string, update: (r?: TRecord) => TRecord): Promise<void> {
127
                assertString(id, 'id');
54✔
128
                assertFunction(update, 'update');
54✔
129

130
                await this.assertConnection();
54✔
131

132
                for (let attempt = 0; attempt <= this.#maxRetries; attempt++) {
54✔
133
                        const raw = await this.redis!.get(this.#key(id));
1,279✔
134

135
                        if (raw) {
1,279✔
136
                                const envelope: RecordEnvelope<TRecord> = JSON.parse(raw);
1,227✔
137
                                const updated: RecordEnvelope<TRecord> = {
1,227✔
138
                                        d: update(envelope.d),
139
                                        v: envelope.v + 1
140
                                };
141

142
                                const result = await this.redis!.eval(
1,227✔
143
                                        SCRIPT_UPDATE_IF_VERSION,
144
                                        1,
145
                                        this.#key(id),
146
                                        String(envelope.v),
147
                                        JSON.stringify(updated)
148
                                ) as number;
149

150
                                if (result === 1)
1,227✔
151
                                        return;
51✔
152

153
                                // result === 0 or -1: concurrent modification — retry
154
                        }
155
                        else {
156
                                const envelope: RecordEnvelope<TRecord> = { d: update(undefined), v: 1 };
52✔
157
                                const result = await this.redis!.set(this.#key(id), JSON.stringify(envelope), 'NX');
52✔
158
                                if (result)
52✔
159
                                        return;
3✔
160

161
                                // Another process created the key between our GET and SET — retry
162
                        }
163
                }
164

NEW
165
                throw new Error(`Record '${id}' could not be upserted after ${this.#maxRetries} retries`);
×
166
        }
167

168
        async delete(id: string): Promise<boolean> {
169
                assertString(id, 'id');
3✔
170

171
                await this.assertConnection();
2✔
172

173
                const count = await this.redis!.del(this.#key(id));
2✔
174
                return count === 1;
2✔
175
        }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc