• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14704036560

28 Apr 2025 08:53AM UTC coverage: 80.628% (+0.2%) from 80.389%
14704036560

Pull #1484

github

web-flow
Merge dcc1b5309 into e44acad78
Pull Request #1484: perf: import table using bull queue instead of worker

7719 of 8189 branches covered (94.26%)

200 of 278 new or added lines in 5 files covered. (71.94%)

23 existing lines in 2 files now uncovered.

36909 of 45777 relevant lines covered (80.63%)

1763.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.05
/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
2✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import {
5
  FieldKeyType,
6
  FieldType,
7
  getActionTriggerChannel,
8
  getTableImportChannel,
9
} from '@teable/core';
10
import type { IImportColumn } from '@teable/openapi';
11
import { Job, Queue } from 'bullmq';
12
import { toString } from 'lodash';
13
import { ClsService } from 'nestjs-cls';
14
import type { CreateOp } from 'sharedb';
15
import type { LocalPresence } from 'sharedb/lib/client';
16
import { EventEmitterService } from '../../../event-emitter/event-emitter.service';
17
import { Events } from '../../../event-emitter/events';
18
import { ShareDbService } from '../../../share-db/share-db.service';
19
import type { IClsStore } from '../../../types/cls';
20
import { NotificationService } from '../../notification/notification.service';
21
import { RecordOpenApiService } from '../../record/open-api/record-open-api.service';
22
import { parseBoolean } from './import.class';
23

24
interface ITableImportCsvJob {
25
  baseId: string;
26
  userId: string;
27
  chunk: Record<string, unknown[][]>;
28
  sheetKey: string;
29
  columnInfo?: IImportColumn[];
30
  fields: { id: string; type: FieldType }[];
31
  sourceColumnMap?: Record<string, number | null>;
32
  table: { id: string; name: string };
33
  range: [number, number];
34
  notification?: boolean;
35
  lastChunk?: boolean;
36
}
37

38
export const TABLE_IMPORT_CSV_QUEUE = 'import-table-csv-queue';
2✔
39

40
@Injectable()
41
@Processor(TABLE_IMPORT_CSV_QUEUE)
42
export class ImportTableCsvQueueProcessor extends WorkerHost {
2✔
43
  public static readonly JOB_ID_PREFIX = 'import-table-csv';
117✔
44

45
  private logger = new Logger(ImportTableCsvQueueProcessor.name);
117✔
46
  private timer: NodeJS.Timeout | null = null;
117✔
47
  private readonly TIMER_INTERVAL = 5000;
117✔
48

49
  constructor(
117✔
50
    private readonly recordOpenApiService: RecordOpenApiService,
117✔
51
    private readonly shareDbService: ShareDbService,
117✔
52
    private readonly notificationService: NotificationService,
117✔
53
    private readonly eventEmitterService: EventEmitterService,
117✔
54
    private readonly cls: ClsService<IClsStore>,
117✔
55
    @InjectQueue(TABLE_IMPORT_CSV_QUEUE) public readonly queue: Queue<ITableImportCsvJob>
117✔
56
  ) {
117✔
57
    super();
117✔
58
  }
117✔
59

60
  private startQueueTimer() {
117✔
NEW
61
    if (this.timer) {
×
NEW
62
      return;
×
NEW
63
    }
×
NEW
64
    this.logger.log(`Starting import table queue timer with interval: ${this.TIMER_INTERVAL}ms`);
×
NEW
65
    this.timer = setInterval(async () => await this.refreshTableRowCount(), this.TIMER_INTERVAL);
×
NEW
66
  }
×
67

68
  private async refreshTableRowCount() {
117✔
NEW
69
    // it means there still processing, so they need update rowCount
×
NEW
70
    const waitList = [
×
NEW
71
      ...new Set(
×
NEW
72
        (await this.queue.getJobs('waiting'))
×
NEW
73
          .filter((job) => job.id?.startsWith(ImportTableCsvQueueProcessor.JOB_ID_PREFIX))
×
NEW
74
          .map((job) => job.data.table.id)
×
NEW
75
          .filter((id) => id)
×
76
      ),
NEW
77
    ];
×
78

NEW
79
    if (waitList.length) {
×
NEW
80
      waitList.forEach((tableId) => {
×
NEW
81
        this.updateRowCount(tableId);
×
NEW
82
      });
×
NEW
83
    }
×
NEW
84
  }
×
85

86
  public async process(job: Job<ITableImportCsvJob>) {
117✔
87
    const jobId = String(job.id);
8✔
88

89
    const { table, notification, baseId, userId, lastChunk, sourceColumnMap, range } = job.data;
8✔
90
    const localPresence = this.createImportPresence(table.id);
8✔
91

92
    try {
8✔
93
      await this.handleImportChunkCsv(job);
8✔
94
      if (lastChunk) {
8✔
95
        notification &&
8!
NEW
96
          this.notificationService.sendImportResultNotify({
×
NEW
97
            baseId,
×
NEW
98
            tableId: table.id,
×
NEW
99
            toUserId: userId,
×
NEW
100
            message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
×
NEW
101
          });
×
102

103
        this.eventEmitterService.emitAsync(Events.IMPORT_TABLE_COMPLETE, {
8✔
104
          baseId,
8✔
105
          tableId: table.id,
8✔
106
        });
8✔
107
        this.setImportStatus(localPresence, false);
8✔
108
        this.updateRowCount(table.id);
8✔
109
      }
8✔
110
    } catch (error) {
8!
NEW
111
      const err = error as Error;
×
NEW
112
      notification &&
×
NEW
113
        this.notificationService.sendImportResultNotify({
×
NEW
114
          baseId,
×
NEW
115
          tableId: table.id,
×
NEW
116
          toUserId: userId,
×
NEW
117
          message: `❌ ${table.name} import aborted: ${err.message} fail row range: [${range}]. Please check the data for this range and retry.`,
×
NEW
118
        });
×
119

NEW
120
      await this.cleanRelativeTask(jobId);
×
121

NEW
122
      throw err;
×
NEW
123
    }
×
124
  }
8✔
125

126
  private async cleanRelativeTask(jobId: string) {
117✔
NEW
127
    const [sameBatchJobPrefix] = jobId.split('_');
×
NEW
128
    const waitingJobs = await this.queue.getJobs(['waiting', 'active']);
×
NEW
129
    await Promise.all(
×
NEW
130
      waitingJobs.filter((job) => job.id?.startsWith(sameBatchJobPrefix)).map((job) => job.remove())
×
131
    );
NEW
132
  }
×
133

134
  private async handleImportChunkCsv(job: Job<ITableImportCsvJob>) {
117✔
135
    await this.cls.run(async () => {
8✔
136
      this.cls.set('user.id', job.data.userId);
8✔
137
      const { chunk, sheetKey, columnInfo, fields, sourceColumnMap, table } = job.data;
8✔
138
      const currentResult = chunk[sheetKey];
8✔
139
      // fill data
8✔
140
      const records = currentResult.map((row) => {
8✔
141
        const res: { fields: Record<string, unknown> } = {
16✔
142
          fields: {},
16✔
143
        };
16✔
144
        // import new table
16✔
145
        if (columnInfo) {
16✔
146
          columnInfo.forEach((col, index) => {
12✔
147
            const { sourceColumnIndex, type } = col;
72✔
148
            // empty row will be return void row value
72✔
149
            const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
72!
150
            res.fields[fields[index].id] =
72✔
151
              type === FieldType.Checkbox ? parseBoolean(value) : value?.toString();
72✔
152
          });
72✔
153
        }
12✔
154
        // inplace records
16✔
155
        if (sourceColumnMap) {
16✔
156
          for (const [key, value] of Object.entries(sourceColumnMap)) {
4✔
157
            if (value !== null) {
24✔
158
              const { type } = fields.find((f) => f.id === key) || {};
24✔
159
              // link value should be string
24✔
160
              res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
24!
161
            }
24✔
162
          }
24✔
163
        }
4✔
164
        return res;
16✔
165
      });
16✔
166
      if (records.length === 0) {
8!
NEW
167
        return;
×
NEW
168
      }
×
169
      try {
8✔
170
        const createFn = columnInfo
8✔
171
          ? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
6✔
172
          : this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
2✔
173
        await createFn(table.id, {
8✔
174
          fieldKeyType: FieldKeyType.Id,
8✔
175
          typecast: true,
8✔
176
          records,
8✔
177
        });
8✔
178
      } catch (e: unknown) {
8!
NEW
179
        this.logger.error(e);
×
NEW
180
        throw e;
×
NEW
181
      }
×
182
    });
8✔
183
  }
8✔
184

185
  private updateRowCount(tableId: string) {
117✔
186
    const channel = getActionTriggerChannel(tableId);
8✔
187
    const presence = this.shareDbService.connect().getPresence(channel);
8✔
188
    const localPresence = presence.create(tableId);
8✔
189
    localPresence.submit([{ actionKey: 'addRecord' }], (error) => {
8✔
190
      error && this.logger.error(error);
8!
191
    });
8✔
192

193
    const updateEmptyOps = {
8✔
194
      src: 'unknown',
8✔
195
      seq: 1,
8✔
196
      m: {
8✔
197
        ts: Date.now(),
8✔
198
      },
8✔
199
      create: {
8✔
200
        type: 'json0',
8✔
201
        data: undefined,
8✔
202
      },
8✔
203
      v: 0,
8✔
204
    } as CreateOp;
8✔
205
    this.shareDbService.publishRecordChannel(tableId, updateEmptyOps);
8✔
206
  }
8✔
207

208
  private setImportStatus(presence: LocalPresence<unknown>, loading: boolean) {
117✔
209
    presence.submit(
8✔
210
      {
8✔
211
        loading,
8✔
212
      },
8✔
213
      (error) => {
8✔
214
        error && this.logger.error(error);
8!
215
      }
8✔
216
    );
217
  }
8✔
218

219
  private createImportPresence(tableId: string) {
117✔
220
    const channel = getTableImportChannel(tableId);
8✔
221
    const presence = this.shareDbService.connect().getPresence(channel);
8✔
222
    return presence.create(channel);
8✔
223
  }
8✔
224

225
  @OnWorkerEvent('active')
226
  onWorkerEvent(job: Job) {
117✔
NEW
227
    const { table, range } = job.data;
×
NEW
228
    this.logger.log(`import data to ${table.id} job started, range: [${range}]`);
×
NEW
229
    this.startQueueTimer();
×
NEW
230
  }
×
231

232
  @OnWorkerEvent('error')
233
  onError(job: Job) {
117✔
NEW
234
    const { table, range } = job.data;
×
NEW
235
    this.logger.error(`import data to ${table.id} job failed, range: [${range}]`);
×
NEW
236
  }
×
237

238
  @OnWorkerEvent('completed')
117✔
NEW
239
  async onCompleted(job: Job) {
×
NEW
240
    const { table, range } = job.data;
×
NEW
241
    this.logger.log(`import data to ${table.id} job completed, range: [${range}]`);
×
242

NEW
243
    const allJobs = (await this.queue.getJobs(['waiting', 'active'])).filter((job) =>
×
NEW
244
      job.id?.startsWith(ImportTableCsvQueueProcessor.JOB_ID_PREFIX)
×
245
    );
246

NEW
247
    if (!allJobs.length && this.timer) {
×
NEW
248
      this.logger.log('No more import tasks, clearing timer...');
×
NEW
249
      // last task, clear timer
×
NEW
250
      clearInterval(this.timer);
×
NEW
251
    }
×
NEW
252
  }
×
253
}
117✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc