• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tabkram / execution-engine / 13502989675

24 Feb 2025 04:34PM UTC coverage: 93.223%. First build
13502989675

Pull #54

github

web-flow
Merge 3b209471e into e8a22c0de
Pull Request #54: feat: introduce standalone `execute()` and `executionTrace()` functions and `@trace()` decorator

212 of 239 branches covered (88.7%)

Branch coverage included in aggregate %.

183 of 187 new or added lines in 10 files covered. (97.86%)

297 of 307 relevant lines covered (96.74%)

104.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.22
/src/engine/traceableEngine.ts
1
/* eslint-disable @typescript-eslint/no-explicit-any */
2
import { AsyncLocalStorage } from 'async_hooks';
3✔
3

4
import { v4 as uuidv4 } from 'uuid';
3✔
5

6
import { isNodeTrace, NodeData, NodeTrace } from '../common/models/engineNodeTrace.model';
3✔
7
import { Edge, Node, Trace } from '../common/models/engineTrace.model';
8
import { DEFAULT_TRACE_CONFIG, TraceOptions } from '../common/models/engineTraceOptions.model';
3✔
9
import { ExecutionTrace, ExecutionTraceExtractor, isExecutionTrace } from '../common/models/executionTrace.model';
3✔
10
import { Awaited } from '../common/utils/awaited';
11
import { extract } from '../common/utils/jsonQuery';
3✔
12
import { ExecutionTimer } from '../timer/executionTimer';
3✔
13
import { executionTrace } from '../trace/trace';
3✔
14

15
/**
16
 * Represents a class for traceable execution of functions.
17
 */
18
export class TraceableEngine {
3✔
19
  private nodes: Array<Node>;
20
  private edges: Array<Edge>;
21
  private asyncLocalStorage = new AsyncLocalStorage<string>();
17✔
22

23
  /**
24
   * A temporary storage for narratives associated with non-found nodes.
25
   * Narratives are stored in memory until the corresponding node is created.
26
   * @private
27
   */
28
  private narrativesForNonFoundNodes: {
29
    [key: string]: Array<string>;
30
  };
31

32
  /**
33
   * Initializes a new instance of the TraceableExecution class.
34
   * @param initialTrace - The initial trace to be used.
35
   */
36
  constructor(initialTrace?: Trace) {
37
    this.initTrace(initialTrace);
17✔
38
  }
39

40
  private static extractIOExecutionTraceWithConfig<I, O>(
41
    ioeExecTrace: ExecutionTrace<I, O>['inputs'] | ExecutionTrace<I, O>['outputs'] | ExecutionTrace<I, O>['errors'],
42
    extractionConfig: boolean | Array<string> | ((i: unknown) => unknown)
43
  ) {
44
    try {
33✔
45
      if (typeof extractionConfig === 'function') {
33✔
46
        return extractionConfig(ioeExecTrace);
5✔
47
      } else if (Array.isArray(extractionConfig)) {
28✔
48
        return extract(ioeExecTrace, extractionConfig);
2✔
49
      } else if (extractionConfig === true) {
26✔
50
        return ioeExecTrace;
6✔
51
      }
52
    } catch (e) {
NEW
53
      throw new Error(`error when mapping/extracting ExecutionTrace with config: "${extractionConfig}", ${e?.message}`);
×
54
    }
55
  }
56

57
  private extractNarrativeWithConfig<I, O>(
58
    nodeData: NodeData<I, O>,
59
    narrativeConfig: ExecutionTraceExtractor<I, O>['narratives']
60
  ): Array<string> {
61
    try {
140✔
62
      const narratives = (this.narrativesForNonFoundNodes[nodeData.id] ?? [])?.concat(nodeData.narratives ?? []);
140✔
63
      delete this.narrativesForNonFoundNodes[nodeData.id];
140✔
64
      if (typeof narrativeConfig === 'function') {
140✔
65
        return narratives.concat(narrativeConfig(nodeData));
3✔
66
      } else if (Array.isArray(narrativeConfig)) {
137✔
67
        return narratives.concat(narrativeConfig);
2✔
68
      } else if (narrativeConfig === true) {
135✔
69
        return narratives;
131✔
70
      }
71
    } catch (e) {
NEW
72
      throw new Error(`error when mapping/extracting Narrative with config: "${narrativeConfig}", ${e?.message}`);
×
73
    }
74
  }
75

76
  /**
77
   * Initializes the trace with given initialTrace.
78
   *
79
   * @param {Trace} initialTrace - The initial trace to initialize: the nodes and edges.
80
   * @return {TraceableExecution} - The traceable execution object after initialization.
81
   */
82
  initTrace(initialTrace: Trace): TraceableEngine {
83
    this.nodes = (initialTrace?.filter((b) => b.group === 'nodes') as Array<Node>) ?? [];
17✔
84
    this.edges = (initialTrace?.filter((b) => b.group === 'edges') as Array<Edge>) ?? [];
17✔
85
    this.narrativesForNonFoundNodes = {};
17✔
86
    return this;
17✔
87
  }
88

89
  /**
90
   * Gets the execution trace.
91
   * @returns An array containing nodes and edges of the execution trace.
92
   */
93
  getTrace(): Trace {
94
    return [...this.nodes, ...this.edges];
13✔
95
  }
96

97
  /**
98
   * Gets the nodes of the execution trace.
99
   * @returns An array containing nodes of the execution trace.
100
   */
101
  getTraceNodes() {
102
    return this.nodes;
6✔
103
  }
104

105
  /**
106
   * ATTENTION: TS chooses the first fitting overload in top-down order, so overloads are sorted from most specific to most broad.
107
   * ATTENTION: arrow function as blockFunction could be caught by this one, even if it doesn't return a promise
108
   * prefer using real functions with function keyword!!
109
   * ATTENTION: functions that return a promise will fit here BUT should be mentioned with ASYNC keyword to work correctly!
110
   * prefer to use functions with the ASYNC keyword even if rechecking is available so that functions with returned promises are executed safely
111
   */
112
  run<O>(
113
    blockFunction: (...params) => Promise<O>,
114
    inputs: Array<unknown>,
115
    options?: TraceOptions<Array<any>, O>
116
  ): Promise<ExecutionTrace<Array<unknown>, Awaited<O>>>;
117

118
  run<O>(
119
    blockFunction: (...params) => Promise<O>,
120
    inputs: Array<unknown>,
121
    options?: TraceOptions<Array<any>, O>['trace']
122
  ): Promise<ExecutionTrace<Array<unknown>, Awaited<O>>>;
123

124
  /**
125
   * ATTENTION: arrow function as blockFunction ARE NOT RECOMMENDED it will work correctly at the overload inferring level to get this signature.
126
   * It will be caught as a Promise in the signature before!!
127
   * Especially if you do:
128
   * ```
129
   *  () => {  throw new Error("error example");}
130
   * ```
131
   * prefer using real functions with function keyword!!
132
   */
133
  run<O>(blockFunction: (...params) => O, inputs: Array<unknown>, options?: TraceOptions<Array<any>, O>): ExecutionTrace<Array<unknown>, O>;
134

135
  run<O>(
136
    blockFunction: (...params) => O,
137
    inputs: Array<unknown>,
138
    options?: TraceOptions<Array<any>, O>['trace']
139
  ): ExecutionTrace<Array<unknown>, O>;
140

141
  /**
142
   *
143
   * @param blockFunction
144
   * @param inputs array of arguments given as input to the function `blockFunction` parameter
145
   * @param options execution options it could be simply  a trace for instance:
146
   *
147
   * ATTENTION: arrow function as blockFunction ARE NOT RECOMMENDED it will work correctly at the overload inferring level, especially if you do:
148
   * ```
149
   *  () => {  throw new Error("error example");}
150
   * ```
151
   */
152
  run<O>(
153
    blockFunction: (...params) => O | Promise<O>,
154
    inputs: Array<unknown> = [],
×
155
    options: TraceOptions<Array<any>, O> | TraceOptions<Array<any>, O>['trace'] = {
93✔
156
      trace: {
157
        id: [blockFunction.name ? blockFunction.name.replace('bound ', '') : 'function', new Date()?.getTime(), uuidv4()]?.join('_'),
93✔
158
        label: blockFunction.name ? blockFunction.name.replace('bound ', '') : 'function'
93✔
159
      },
160
      config: DEFAULT_TRACE_CONFIG
161
    }
162
  ): Promise<ExecutionTrace<Array<unknown>, Awaited<O>>> | ExecutionTrace<Array<unknown>, O> {
163
    const inputHasCircular = inputs.find((i) => i instanceof TraceableEngine);
127✔
164
    if (inputHasCircular) {
116!
NEW
165
      throw Error(
×
166
        `${blockFunction.name} could not have an instance of TraceableExecution as input, this will create circular dependency on trace`
167
      );
168
    }
169
    const nodeTraceConfigFromOptions = isNodeTrace(options) ? undefined : (options.config ?? DEFAULT_TRACE_CONFIG);
116!
170
    const nodeTraceFromOptions = (isNodeTrace(options) ? options : options.trace) ?? {};
116!
171
    nodeTraceFromOptions.parent = nodeTraceFromOptions?.parent ?? this.asyncLocalStorage.getStore();
116✔
172

173
    const executionTimer = new ExecutionTimer();
116✔
174
    executionTimer?.start();
116✔
175
    const nodeTrace: NodeData = {
116✔
176
      id: [
177
        blockFunction.name ? blockFunction.name.replace('bound ', '') : 'function',
116✔
178
        executionTimer?.getStartDate()?.getTime(),
179
        uuidv4()
180
      ]?.join('_'),
181
      label: [
182
        (this.nodes?.length ?? 0) + 1,
116!
183
        nodeTraceFromOptions?.id ?? (blockFunction.name ? blockFunction.name.replace('bound ', '') : 'function')
138✔
184
      ]?.join(' - '),
185
      ...nodeTraceFromOptions
186
    };
187

188
    return executionTrace<O>(
116✔
189
      () => this.asyncLocalStorage.run(nodeTrace.id, () => blockFunction.bind(this)(...inputs, nodeTrace)),
116✔
190
      inputs,
191
      (traceContext) => {
192
        // TODO: metadata are not handled in the engine ExecutionTrace for now, to add it later.
193
        delete traceContext.metadata;
116✔
194
        return this.buildTrace.bind(this)(nodeTrace, traceContext, nodeTraceConfigFromOptions);
116✔
195
      },
196
      { errorStrategy: nodeTraceConfigFromOptions.errors }
197
    );
198
  }
199

200
  /**
201
   * Pushes or appends narratives to a trace node.
202
   * @param nodeId - The ID of the node.
203
   * @param narratives - The narrative or array of narratives to be processed.
204
   * @returns The updated instance of TraceableExecution.
205
   */
206
  pushNarratives(nodeId: NodeTrace['id'], narratives: string | string[]) {
207
    const existingNodeIndex = this.nodes?.findIndex((n) => n.data.id === nodeId);
13✔
208

209
    if (existingNodeIndex >= 0) {
11✔
210
      // Node already exists, update its narratives
211
      this.nodes[existingNodeIndex].data = {
3✔
212
        ...this.nodes[existingNodeIndex]?.data,
213
        narratives: [...(this.nodes[existingNodeIndex]?.data?.narratives ?? []), ...(Array.isArray(narratives) ? narratives : [narratives])]
6!
214
      };
215
    } else {
216
      // Node doesn't exist, store narratives for later use
217
      this.narrativesForNonFoundNodes[nodeId] = [
8✔
218
        ...(this.narrativesForNonFoundNodes[nodeId] ?? []),
16✔
219
        ...(Array.isArray(narratives) ? narratives : [narratives])
8✔
220
      ];
221
    }
222

223
    return this;
11✔
224
  }
225

226
  /**
227
   * Retrieves an ordered array of narratives.
228
   *
229
   * @returns {Array<string>} An array that contains the ordered narratives. If no narratives are found, an empty array is returned.
230
   */
231
  getNarratives(): Array<string> {
232
    return this.nodes?.flatMap?.((n) => n.data?.narratives)?.filter((n) => !!n);
11✔
233
  }
234

235
  private buildTrace<O>(
236
    nodeTrace: NodeData,
237
    executionTrace?: ExecutionTrace<Array<unknown>, O>,
238
    options: TraceOptions<Array<unknown>, O>['config'] = DEFAULT_TRACE_CONFIG,
×
239
    isAutoCreated = false
116✔
240
  ) {
241
    if (nodeTrace.parent && !this.nodes?.find((n) => n.data.id === nodeTrace.parent)) {
2,850✔
242
      this.buildTrace<O>(
28✔
243
        {
244
          id: nodeTrace.parent,
245
          label: nodeTrace.parent
246
        },
247
        { id: nodeTrace.id, errors: executionTrace.errors },
248
        DEFAULT_TRACE_CONFIG,
249
        true
250
      );
251
    }
252

253
    if (!isAutoCreated) {
144✔
254
      let parallelEdge = undefined;
116✔
255
      if (options.parallel === true) {
116✔
256
        parallelEdge = this.edges?.find((edge) => edge.data.parallel && edge.data.parent === nodeTrace.parent);
10✔
257
      } else if (typeof options.parallel === 'string') {
108!
NEW
258
        parallelEdge = this.edges?.find((edge) => edge.data.parallel === options.parallel);
×
259
      }
260

261
      const previousNodes = !parallelEdge
116✔
262
        ? this.nodes?.filter(
263
          (node) =>
264
            !node.data.abstract &&
3,519✔
265
              node.data.parent === nodeTrace.parent &&
266
              (!options?.parallel || !node.data.parallel || !node.data.parent || !nodeTrace.parent) &&
267
              node.data.id !== nodeTrace.id &&
268
              node.data.parent !== nodeTrace.id &&
269
              node.data.id !== nodeTrace.parent &&
270
              !this.edges.find((e) => e.data.source === node.data.id)
2,912✔
271
        )
272
        : [];
273
      this.edges = [
116✔
274
        ...(this.edges ?? []),
116!
275
        ...(previousNodes?.map((previousNode) => ({
74!
276
          data: {
277
            id: `${previousNode.data.id}->${nodeTrace.id}`,
278
            source: previousNode.data.id,
279
            target: nodeTrace.id,
280
            parent: nodeTrace.parent,
281
            parallel: options?.parallel
282
          },
283
          group: 'edges' as const
284
        })) ?? []),
285
        ...(parallelEdge
116✔
286
          ? [
287
            {
288
              data: {
289
                id: `${parallelEdge.data.source}->${nodeTrace.id}`,
290
                source: parallelEdge.data.source,
291
                target: nodeTrace.id,
292
                parent: nodeTrace.parent,
293
                parallel: options?.parallel
294
              },
295
              group: 'edges' as const
296
            }
297
          ]
298
          : [])
299
      ];
300
    }
301

302
    const filteredNodeData: NodeData = {
144✔
303
      ...this.filterNodeTrace(nodeTrace),
304
      ...this.filterNodeExecutionTrace({ ...executionTrace, ...nodeTrace }, options?.traceExecution)
305
    };
306
    if (filteredNodeData?.narratives?.length) {
144✔
307
      this.pushNarratives(nodeTrace.id, filteredNodeData?.narratives);
7✔
308
    }
309
    // si ne node existe déjà (un parent auto-créé):
310
    const existingNodeIndex = this.nodes?.findIndex((n) => n.data.id === nodeTrace?.id);
3,901✔
311
    if (existingNodeIndex >= 0) {
144✔
312
      this.nodes[existingNodeIndex] = {
28✔
313
        data: {
314
          ...this.nodes[existingNodeIndex]?.data,
315
          ...filteredNodeData,
316
          parallel: options?.parallel,
317
          abstract: isAutoCreated,
318
          updateTime: new Date()
319
        },
320
        group: 'nodes'
321
      };
322
    } else {
323
      this.nodes?.push({
116✔
324
        data: {
325
          ...filteredNodeData,
326
          parallel: options?.parallel,
327
          abstract: isAutoCreated,
328
          createTime: new Date()
329
        },
330
        group: 'nodes'
331
      });
332
    }
333
  }
334

335
  private filterNodeTrace(nodeData?: NodeData): NodeTrace {
336
    return {
144✔
337
      id: nodeData?.id,
338
      label: nodeData?.label,
339
      ...(nodeData?.parent ? { parent: nodeData?.parent } : {}),
144✔
340
      ...(nodeData?.parallel ? { parallel: nodeData?.parallel } : {}),
144!
341
      ...(nodeData?.abstract ? { abstract: nodeData?.abstract } : {}),
144!
342
      ...(nodeData?.createTime ? { createTime: nodeData?.createTime } : {}),
144!
343
      ...(nodeData?.updateTime ? { updateTime: nodeData?.updateTime } : {})
144!
344
    };
345
  }
346

347
  private filterNodeExecutionTrace<I, O>(nodeData?: NodeData<I, O>, doTraceExecution?: TraceOptions<I, O>['config']['traceExecution']) {
348
    if (!doTraceExecution) {
144✔
349
      return {};
3✔
350
    }
351
    if (doTraceExecution === true) {
141✔
352
      nodeData.narratives = this.extractNarrativeWithConfig<I, O>(nodeData, true);
129✔
353
      return nodeData;
129✔
354
    }
355
    if (Array.isArray(doTraceExecution)) {
12✔
356
      const execTrace: ExecutionTrace<unknown, unknown> = { id: nodeData.id };
1✔
357
      Object.keys(nodeData).forEach((k) => {
1✔
358
        if (doTraceExecution.includes(k as keyof ExecutionTrace<I, O>)) {
10✔
359
          execTrace[k] = nodeData[k];
2✔
360
        }
361
      });
362
      return execTrace;
1✔
363
    }
364
    if (isExecutionTrace(doTraceExecution)) {
11✔
365
      const execTrace: ExecutionTrace<unknown, unknown> = { id: nodeData.id };
11✔
366
      execTrace.inputs = TraceableEngine.extractIOExecutionTraceWithConfig<I, O>(nodeData.inputs, doTraceExecution.inputs);
11✔
367
      execTrace.outputs = TraceableEngine.extractIOExecutionTraceWithConfig<I, O>(nodeData.outputs, doTraceExecution.outputs);
11✔
368
      execTrace.errors = TraceableEngine.extractIOExecutionTraceWithConfig<I, O>(nodeData.errors, doTraceExecution.errors);
11✔
369

370
      execTrace.narratives = this.extractNarrativeWithConfig<I, O>(nodeData, doTraceExecution.narratives);
11✔
371

372
      if (doTraceExecution.startTime === true) {
11✔
373
        execTrace.startTime = nodeData.startTime;
4✔
374
      }
375
      if (doTraceExecution.endTime === true) {
11✔
376
        execTrace.endTime = nodeData.endTime;
3✔
377
      }
378
      if (doTraceExecution.startTime === true && doTraceExecution.endTime === true) {
11✔
379
        execTrace.duration = nodeData.duration;
3✔
380
        execTrace.elapsedTime = nodeData.elapsedTime;
3✔
381
      }
382
      return execTrace;
11✔
383
    }
384
  }
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc