• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taskforcesh / bullmq / 10173777761

31 Jul 2024 03:54AM CUT coverage: 89.166%. Remained the same
10173777761

push

github

web-flow
test(clean): fix flaky test (#2680)

1157 of 1392 branches covered (83.12%)

Branch coverage included in aggregate %.

2234 of 2411 relevant lines covered (92.66%)

2934.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.38
/src/classes/child-processor.ts
1
import { ParentCommand } from '../enums';
1✔
2
import { SandboxedJob } from '../interfaces';
3
import { JobJsonSandbox } from '../types';
4
import { errorToJSON } from '../utils';
1✔
5

6
enum ChildStatus {
1✔
7
  Idle,
1✔
8
  Started,
1✔
9
  Terminating,
1✔
10
  Errored,
1✔
11
}
12

13
/**
14
 * ChildProcessor
15
 *
16
 * This class acts as the interface between a child process and it parent process
17
 * so that jobs can be processed in different processes.
18
 *
19
 */
20
export class ChildProcessor {
1✔
21
  public status?: ChildStatus;
22
  public processor: any;
23
  public currentJobPromise: Promise<unknown> | undefined;
24

25
  constructor(private send: (msg: any) => Promise<void>) {}
×
26

27
  public async init(processorFile: string): Promise<void> {
28
    let processor;
29
    try {
×
30
      const { default: processorFn } = await import(processorFile);
×
31
      processor = processorFn;
×
32

33
      if (processor.default) {
×
34
        // support es2015 module.
35
        processor = processor.default;
×
36
      }
37

38
      if (typeof processor !== 'function') {
×
39
        throw new Error('No function is exported in processor file');
×
40
      }
41
    } catch (err) {
42
      this.status = ChildStatus.Errored;
×
43
      return this.send({
×
44
        cmd: ParentCommand.InitFailed,
45
        err: errorToJSON(err),
46
      });
47
    }
48

49
    const origProcessor = processor;
×
50
    processor = function (job: SandboxedJob, token?: string) {
×
51
      try {
×
52
        return Promise.resolve(origProcessor(job, token));
×
53
      } catch (err) {
54
        return Promise.reject(err);
×
55
      }
56
    };
57

58
    this.processor = processor;
×
59
    this.status = ChildStatus.Idle;
×
60
    await this.send({
×
61
      cmd: ParentCommand.InitCompleted,
62
    });
63
  }
64

65
  public async start(jobJson: JobJsonSandbox, token?: string): Promise<void> {
66
    if (this.status !== ChildStatus.Idle) {
×
67
      return this.send({
×
68
        cmd: ParentCommand.Error,
69
        err: errorToJSON(new Error('cannot start a not idling child process')),
70
      });
71
    }
72
    this.status = ChildStatus.Started;
×
73
    this.currentJobPromise = (async () => {
×
74
      try {
×
75
        const job = this.wrapJob(jobJson, this.send);
×
76
        const result = await this.processor(job, token);
×
77
        await this.send({
×
78
          cmd: ParentCommand.Completed,
79
          value: typeof result === 'undefined' ? null : result,
×
80
        });
81
      } catch (err) {
82
        await this.send({
×
83
          cmd: ParentCommand.Failed,
84
          value: errorToJSON(!(<Error>err).message ? new Error(<any>err) : err),
×
85
        });
86
      } finally {
87
        this.status = ChildStatus.Idle;
×
88
        this.currentJobPromise = undefined;
×
89
      }
90
    })();
91
  }
92

93
  public async stop(): Promise<void> {}
94

95
  async waitForCurrentJobAndExit(): Promise<void> {
96
    this.status = ChildStatus.Terminating;
×
97
    try {
×
98
      await this.currentJobPromise;
×
99
    } finally {
100
      process.exit(process.exitCode || 0);
×
101
    }
102
  }
103

104
  /**
105
   * Enhance the given job argument with some functions
106
   * that can be called from the sandboxed job processor.
107
   *
108
   * Note, the `job` argument is a JSON deserialized message
109
   * from the main node process to this forked child process,
110
   * the functions on the original job object are not in tact.
111
   * The wrapped job adds back some of those original functions.
112
   */
113
  protected wrapJob(
114
    job: JobJsonSandbox,
115
    send: (msg: any) => Promise<void>,
116
  ): SandboxedJob {
117
    return {
×
118
      ...job,
119
      data: JSON.parse(job.data || '{}'),
×
120
      opts: job.opts,
121
      returnValue: JSON.parse(job.returnvalue || '{}'),
×
122
      /*
123
       * Emulate the real job `updateProgress` function, should works as `progress` function.
124
       */
125
      async updateProgress(progress: number | object) {
126
        // Locally store reference to new progress value
127
        // so that we can return it from this process synchronously.
128
        this.progress = progress;
×
129
        // Send message to update job progress.
130
        await send({
×
131
          cmd: ParentCommand.Progress,
132
          value: progress,
133
        });
134
      },
135
      /*
136
       * Emulate the real job `log` function.
137
       */
138
      log: async (row: any) => {
139
        send({
×
140
          cmd: ParentCommand.Log,
141
          value: row,
142
        });
143
      },
144
      /*
145
       * Emulate the real job `moveToDelayed` function.
146
       */
147
      moveToDelayed: async (timestamp: number, token?: string) => {
148
        send({
×
149
          cmd: ParentCommand.MoveToDelayed,
150
          value: { timestamp, token },
151
        });
152
      },
153
      /*
154
       * Emulate the real job `updateData` function.
155
       */
156
      updateData: async (data: any) => {
157
        send({
×
158
          cmd: ParentCommand.Update,
159
          value: data,
160
        });
161
      },
162
    };
163
  }
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc