• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mnasyrov / rx-effects / 4178364161

pending completion
4178364161

Pull #14

github

GitHub
Merge de0a35cae into e0abe6014
Pull Request #14: feat: `compute()` function

272 of 275 branches covered (98.91%)

Branch coverage included in aggregate %.

180 of 180 new or added lines in 7 files covered. (100.0%)

518 of 527 relevant lines covered (98.29%)

92.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.47
/packages/rx-effects/src/compute.ts
1
import { Observable, Observer } from 'rxjs';
1✔
2
import { Query } from './query';
3
import { DEFAULT_COMPARATOR } from './utils';
1✔
4

5
type Comparator<T> = (a: T, b: T) => boolean;
6

7
/// PUBLIC API
8

9
/**
10
 * This function returns a current value of a provided Query and registers it as a dependency for computation.
11
 */
12
export type ComputationResolver = {
13
  <T>(query: Query<T>): T;
14
  <T, R>(query: Query<T>, selector: (value: T) => R): R;
15
};
16

17
/**
18
 * This function calculates a value from external sources or resolved sources by a provided "resolver".
19
 */
20
export type Computation<T> = (resolver: ComputationResolver) => T;
21

22
/**
23
 * Options for "compute()" function
24
 */
25
export type ComputationOptions<T> = {
26
  /** A custom comparator to differ complex values */
27
  comparator?: Comparator<T>;
28

29
  /** Explicitly dependencies for refreshing calculations */
30
  dependencies?: Query<unknown>[];
31
};
32

33
/**
34
 * Creates a computable query which calculates its values by provided "computation" function and dependencies.
35
 *
36
 * Rules of "Computation" function:
37
 * - it must have no side effects
38
 * - it recalculates only when specified dependencies are updated
39
 * - its "formula" may have other sources of values, however they don't trigger updates
40
 *
41
 * "Computation" function provides a resolver for using a dependency withing a calculation expression.
42
 *
43
 * Dependency can be declared explicitly as an array by the second argument. It has the following advantages:
44
 * - Faster dependency subscription
45
 * - Ability to specify extra queries or observables as dependencies
46
 *
47
 * A custom value comparator can be specified by "options" object as the second argument.
48
 * It helps to decide if a new value differs from a previous one in complex cases.
49
 *
50
 * @example
51
 * ```ts
52
 * const greeting = createStore('Hello');
53
 * const username = createStore('World');
54
 *
55
 * // Dependency are implicitly resolved
56
 * const message = compute((get) => get(greeting) + ' ' + get(username) + '!');
57
 *
58
 * // Dependency declared explicitly
59
 * const messageUppercase = compute(() => message.get().toUpperCase(), [message]);
60
 *
61
 * expect(message.get()).toBe('Hello World!');
62
 * expect(messageUppercase.get()).toBe('HELLO WORLD!');
63
 * ```
64
 */
65
export const compute: {
1✔
66
  <T>(computation: Computation<T>, dependencies?: Query<unknown>[]): Query<T>;
67
  <T>(computation: Computation<T>, options?: ComputationOptions<T>): Query<T>;
68
} = <T>(
1✔
69
  computation: Computation<T>,
70
  dependenciesOrOptions?: Query<unknown>[] | ComputationOptions<T>,
71
) => {
72
  const options: ComputationOptions<T> | undefined = dependenciesOrOptions
54✔
73
    ? Array.isArray(dependenciesOrOptions)
24✔
74
      ? { dependencies: dependenciesOrOptions }
75
      : dependenciesOrOptions
76
    : undefined;
77

78
  const node = createComputationNode(computation, options);
54✔
79
  return createComputationQuery(node);
54✔
80
};
81

82
/// INTERNAL
83

84
type ComputationQuery<T> = Query<T> & Readonly<{ _node: Node<T> }>;
85

86
export function isComputationQuery<T>(
1✔
87
  value: Query<T>,
88
): value is ComputationQuery<T> {
89
  return '_node' in value;
79✔
90
}
91

92
const FAST_QUERY_GETTER: ComputationResolver = (query: Query<unknown>) =>
1✔
93
  query.get();
12,041✔
94

95
type ValueRef<T> = { value: T; params?: Array<unknown> };
96

97
/** @internal */
98
export type Node<T> = {
99
  computation: Computation<T>;
100
  comparator?: (a: T, b: T) => boolean;
101
  hot: boolean;
102
  version?: number;
103
  valueRef?: ValueRef<T>;
104
  dependencies?: Set<Query<unknown>>;
105
  depsSubscriptions?: (() => void)[];
106
  observers?: Set<Observer<T>>;
107
  treeObserverCount: number;
108
  parents?: Set<Node<any>>;
109
  children?: Set<Node<any>>;
110
};
111

112
export function createComputationNode<T>(
1✔
113
  computation: Computation<T>,
114
  options?: ComputationOptions<T>,
115
): Node<T> {
116
  const dependencies = options?.dependencies;
71✔
117

118
  return {
71✔
119
    computation,
120
    comparator: options?.comparator,
213✔
121
    hot: false,
122
    dependencies: dependencies ? new Set(dependencies) : undefined,
71✔
123
    treeObserverCount: 0,
124
  };
125
}
126

127
export function createComputationQuery<T>(node: Node<T>): ComputationQuery<T> {
1✔
128
  return {
56✔
129
    _node: node,
130

131
    get: () => getQueryValue(node),
9,211✔
132

133
    value$: new Observable<T>((observer) => {
134
      addValueObserver(node, observer);
20✔
135

136
      return () => removeValueObserver(node, observer);
17✔
137
    }),
138
  };
139
}
140

141
/// COMPUTATION ENGINE
142

143
let GLOBAL_VERSION = 0;
1✔
144

145
/** @internal */
146
export function setGlobalVersion(value: number) {
1✔
147
  GLOBAL_VERSION = value;
1✔
148
}
149

150
/** @internal */
151
export function nextVersion(): number {
1✔
152
  return (GLOBAL_VERSION =
49✔
153
    GLOBAL_VERSION === Number.MAX_SAFE_INTEGER ? 0 : GLOBAL_VERSION + 1);
49✔
154
}
155

156
export function getQueryValue<T>(node: Node<T>): T {
1✔
157
  return node.computation(FAST_QUERY_GETTER);
9,214✔
158
}
159

160
export function addValueObserver<T>(node: Node<T>, observer: Observer<T>) {
1✔
161
  if (!node.observers) {
35✔
162
    node.observers = new Set();
30✔
163
  }
164
  node.observers.add(observer);
35✔
165

166
  makeHotNode(node, observer);
35✔
167

168
  updateTreeObserverCount(node);
32✔
169
}
170

171
export function removeValueObserver<T>(node: Node<T>, observer: Observer<T>) {
1✔
172
  node.observers?.delete(observer);
18✔
173

174
  updateTreeObserverCount(node);
18✔
175
}
176

177
function getTreeObserverCount(node: Node<any>): number {
178
  let subtreeCount = 0;
178✔
179

180
  if (node.children) {
178✔
181
    node.children.forEach((childNode) => {
130✔
182
      subtreeCount += childNode.treeObserverCount;
197✔
183
    });
184
  }
185

186
  return subtreeCount + (node.observers?.size ?? 0);
178✔
187
}
188

189
function updateTreeObserverCount(node: Node<any>) {
190
  node.treeObserverCount = getTreeObserverCount(node);
178✔
191

192
  node.parents?.forEach(updateTreeObserverCount);
178✔
193

194
  if (node.treeObserverCount === 0) {
178✔
195
    makeColdNode(node);
39✔
196
  }
197
}
198

199
function makeHotNode<T>(node: Node<T>, observer?: Observer<T>) {
200
  let dependencies = node.dependencies;
76✔
201
  let valueRef = node.valueRef;
76✔
202

203
  if (dependencies) {
76✔
204
    if (observer && !valueRef) {
42✔
205
      valueRef = node.valueRef = calculate(node.computation);
15✔
206
    }
207
  } else {
208
    const visitedDeps: Set<Query<unknown>> = new Set();
34✔
209

210
    const next = calculate(node.computation, (query) => visitedDeps.add(query));
34✔
211

212
    dependencies = visitedDeps;
33✔
213
    node.dependencies = dependencies;
33✔
214

215
    if (observer && !valueRef) {
33✔
216
      valueRef = node.valueRef = next;
16✔
217
    }
218
  }
219

220
  if (dependencies.size > 0 && !node.hot) {
74✔
221
    let depObserver;
222

223
    const depsSubscriptions = (node.depsSubscriptions =
48✔
224
      node.depsSubscriptions ?? []);
144✔
225

226
    for (const parent of dependencies.values()) {
48✔
227
      if (!parent) {
60✔
228
        throw new TypeError('Incorrect dependency');
1✔
229
      }
230

231
      if (isComputationQuery(parent)) {
59✔
232
        addChildNode(parent._node, node);
39✔
233
      } else {
234
        if (!depObserver) {
20✔
235
          depObserver = {
20✔
236
            next: () => onSourceChanged(node),
38✔
237
            error: (error: any) => onSourceError(node, error),
1✔
238
            complete: () => onSourceComplete(node),
2✔
239
          };
240
        }
241

242
        const subscription = parent.value$
20✔
243
          // .pipe(observeOn(asapScheduler))
244
          .subscribe(depObserver);
245

246
        depsSubscriptions.push(() => subscription.unsubscribe());
20✔
247
      }
248
    }
249
  }
250

251
  node.hot = true;
73✔
252

253
  if (observer && valueRef) {
73✔
254
    observer.next(valueRef.value);
32✔
255
  }
256
}
257

258
export function addChildNode(parent: Node<any>, child: Node<any>) {
1✔
259
  if (!parent.children) parent.children = new Set();
41✔
260
  parent.children.add(child);
41✔
261

262
  if (!child.parents) child.parents = new Set();
41✔
263
  child.parents.add(parent);
41✔
264

265
  makeHotNode(parent);
41✔
266
}
267

268
export function makeColdNode<T>(node: Node<T>) {
1✔
269
  node.hot = false;
41✔
270

271
  node.treeObserverCount = 0;
41✔
272
  if (node.depsSubscriptions) {
41✔
273
    for (const unsubscribe of node.depsSubscriptions) {
36✔
274
      unsubscribe();
13✔
275
    }
276
    node.depsSubscriptions.length = 0;
36✔
277
  }
278

279
  node.parents?.forEach((parent) => parent.children?.delete(node));
41✔
280

281
  node.valueRef = undefined;
41✔
282
  node.observers?.clear();
41✔
283
  node.parents?.clear();
41✔
284
  node.children?.clear();
41✔
285
}
286

287
let IS_CHANGING = false;
1✔
288
const PENDING_NODES: Node<any>[] = [];
1✔
289

290
function onSourceChanged<T>(node: Node<T>) {
291
  if (IS_CHANGING) {
38!
292
    PENDING_NODES.push(node);
×
293
    return;
×
294
  }
295

296
  IS_CHANGING = true;
38✔
297

298
  nextVersion();
38✔
299
  recompute(node);
38✔
300

301
  IS_CHANGING = false;
38✔
302

303
  if (PENDING_NODES.length > 0) {
38!
304
    schedulePendingNodes();
×
305
  }
306
}
307

308
function schedulePendingNodes() {
309
  if (PENDING_NODES.length === 0) {
×
310
    return;
×
311
  }
312

313
  Promise.resolve().then(() => {
×
314
    const nodes = [...PENDING_NODES];
×
315
    PENDING_NODES.length = 0;
×
316
    nodes.forEach(onSourceChanged);
×
317
  });
318
}
319

320
export function onSourceError<T>(node: Node<T>, error: any) {
1✔
321
  if (node.observers) {
10✔
322
    for (const observer of node.observers) {
10✔
323
      observer.error(error);
11✔
324
    }
325
  }
326

327
  if (node.children) {
10✔
328
    for (const child of node.children) {
5✔
329
      onSourceError(child, error);
5✔
330
    }
331
  }
332
}
333

334
export function onSourceComplete<T>(node: Node<T>) {
1✔
335
  if (node.observers) {
12✔
336
    for (const observer of node.observers) {
12✔
337
      observer.complete();
12✔
338
    }
339
  }
340

341
  if (node.children) {
12✔
342
    for (const child of node.children) {
6✔
343
      onSourceComplete(child);
6✔
344
    }
345
  }
346
}
347

348
export function recompute<T>(node: Node<T>) {
1✔
349
  if (node.version === GLOBAL_VERSION) {
106✔
350
    return;
19✔
351
  }
352
  node.version = GLOBAL_VERSION;
87✔
353

354
  if (node.observers && node.observers.size > 0) {
87✔
355
    calculateValue(node);
33✔
356
  }
357

358
  if (node.treeObserverCount > 0 && node.children) {
87✔
359
    for (const child of node.children) {
42✔
360
      recompute(child);
60✔
361
    }
362
  }
363
}
364

365
export function calculateValue<T>(node: Node<T>) {
1✔
366
  const comparator = node.comparator ?? DEFAULT_COMPARATOR;
36✔
367

368
  const next = calculate(node.computation);
36✔
369

370
  const isChanged = node.valueRef
36✔
371
    ? isCalculationChanged(comparator, node.valueRef, next)
372
    : true;
373

374
  if (isChanged) {
36✔
375
    node.valueRef = next;
16✔
376

377
    if (node.observers) {
16✔
378
      for (const observer of node.observers) {
16✔
379
        observer.next(next.value);
16✔
380
      }
381
    }
382
  }
383
}
384

385
function calculate<T>(
386
  computation: Computation<T>,
387
  visitor?: (query: Query<unknown>) => void,
388
): ValueRef<T> {
389
  let params: Array<unknown> | undefined;
390

391
  const value = computation(
85✔
392
    (query: Query<unknown>, selector?: (value: unknown) => unknown) => {
393
      let param = query.get();
64✔
394

395
      if (selector) param = selector(param);
63✔
396

397
      if (!params) params = [];
63✔
398
      params.push(param);
63✔
399

400
      visitor?.(query);
63✔
401

402
      return param;
63✔
403
    },
404
  );
405

406
  return { value, params };
83✔
407
}
408

409
function isCalculationChanged<T>(
410
  comparator: Comparator<T>,
411
  a: ValueRef<T>,
412
  b: ValueRef<T>,
413
): boolean {
414
  if (comparator(a.value, b.value)) {
34✔
415
    return false;
17✔
416
  }
417

418
  return !(a.params && b.params && isArrayEqual(a.params, b.params));
17✔
419
}
420

421
function isArrayEqual(
422
  a: ReadonlyArray<unknown>,
423
  b: ReadonlyArray<unknown>,
424
): boolean {
425
  return a.length === b.length && a.every((value, index) => b[index] === value);
15✔
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc