• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bpatrik / pigallery2 / 6488973310

11 Oct 2023 11:05PM UTC coverage: 65.19% (+1.3%) from 63.854%
6488973310

push

github

bpatrik
tweak tests

1360 of 2403 branches covered (0.0%)

Branch coverage included in aggregate %.

4056 of 5905 relevant lines covered (68.69%)

14577.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.11
/src/backend/model/threading/ThreadPool.ts
1
import * as cluster from 'cluster';
1✔
2
import {Worker} from 'cluster';
3
import {Logger} from '../../Logger';
1✔
4
import {DiskManagerTask, WorkerMessage, WorkerTask, WorkerTaskTypes,} from './Worker';
1✔
5
import {ParentDirectoryDTO} from '../../../common/entities/DirectoryDTO';
6
import {TaskQue, TaskQueEntry} from './TaskQue';
1✔
7
import {ITaskExecuter} from './TaskExecuter';
8
import {DirectoryScanSettings} from './DiskMangerWorker';
9

10
interface WorkerWrapper<O> {
11
  worker: Worker;
12
  poolTask: TaskQueEntry<WorkerTask, O>;
13
}
14

15
const LOG_TAG = '[ThreadPool]';
1✔
16

17
export class ThreadPool<O> {
1✔
18
  public static WorkerCount = 0;
1✔
19
  private workers: WorkerWrapper<O>[] = [];
×
20
  private taskQue = new TaskQue<WorkerTask, O>();
×
21

22
  constructor(private size: number) {
×
23
    Logger.silly(LOG_TAG, 'Creating thread pool with', size, 'workers');
×
24
    for (let i = 0; i < size; i++) {
×
25
      this.startWorker();
×
26
    }
27
  }
28

29
  protected executeTask(task: WorkerTask): Promise<O> {
30
    const promise = this.taskQue.add(task).promise.obj;
×
31
    this.run();
×
32
    return promise;
×
33
  }
34

35
  private run = (): void => {
×
36
    if (this.taskQue.isEmpty()) {
×
37
      return;
×
38
    }
39
    const worker = this.getFreeWorker();
×
40
    if (worker == null) {
×
41
      return;
×
42
    }
43

44
    const poolTask = this.taskQue.get();
×
45
    worker.poolTask = poolTask;
×
46
    worker.worker.send(poolTask.data);
×
47
  };
48

49
  private getFreeWorker(): null | WorkerWrapper<O> {
50
    for (const worker of this.workers) {
×
51
      if (worker.poolTask == null) {
×
52
        return worker;
×
53
      }
54
    }
55
    return null;
×
56
  }
57

58
  private startWorker(): void {
59
    const worker = {
×
60
      poolTask: null,
61
      worker: (cluster as any).fork(),
62
    } as WorkerWrapper<O>;
63
    this.workers.push(worker);
×
64
    worker.worker.on('online', (): void => {
×
65
      ThreadPool.WorkerCount++;
×
66
      Logger.debug(
×
67
          LOG_TAG,
68
          'Worker ' + worker.worker.process.pid + ' is online, worker count:',
69
          ThreadPool.WorkerCount
70
      );
71
    });
72
    worker.worker.on('exit', (code, signal): void => {
×
73
      ThreadPool.WorkerCount--;
×
74
      Logger.warn(
×
75
          LOG_TAG,
76
          'Worker ' +
77
          worker.worker.process.pid +
78
          ' died with code: ' +
79
          code +
80
          ', and signal: ' +
81
          signal +
82
          ', worker count:',
83
          ThreadPool.WorkerCount
84
      );
85
      Logger.debug(LOG_TAG, 'Starting a new worker');
×
86
      this.startWorker();
×
87
    });
88

89
    worker.worker.on('message', (msg: WorkerMessage<O>): void => {
×
90
      if (worker.poolTask == null) {
×
91
        throw new Error('No worker task after worker task is completed');
×
92
      }
93
      if (msg.error) {
×
94
        worker.poolTask.promise.reject(msg.error);
×
95
      } else {
96
        worker.poolTask.promise.resolve(msg.result);
×
97
      }
98
      this.taskQue.ready(worker.poolTask);
×
99
      worker.poolTask = null;
×
100
      this.run();
×
101
    });
102
  }
103
}
104

105
export class DiskManagerTH
1✔
106
    extends ThreadPool<ParentDirectoryDTO>
107
    implements ITaskExecuter<string, ParentDirectoryDTO> {
108
  execute(
109
      relativeDirectoryName: string,
110
      settings: DirectoryScanSettings = {}
×
111
  ): Promise<ParentDirectoryDTO> {
112
    return super.executeTask({
×
113
      type: WorkerTaskTypes.diskManager,
114
      relativeDirectoryName,
115
      settings,
116
    } as DiskManagerTask);
117
  }
118
}
119

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc