• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14704021331

28 Apr 2025 08:53AM UTC coverage: 80.605% (+0.2%) from 80.389%
14704021331

Pull #1484

github

web-flow
Merge 6a386e08b into e44acad78
Pull Request #1484: perf: import table using bull queue instead of worker

7695 of 8165 branches covered (94.24%)

199 of 278 new or added lines in 5 files covered. (71.58%)

30 existing lines in 5 files now uncovered.

36902 of 45781 relevant lines covered (80.61%)

1764.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.68
/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
2✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import {
5
  FieldKeyType,
6
  FieldType,
7
  getActionTriggerChannel,
8
  getTableImportChannel,
9
} from '@teable/core';
10
import type { IImportColumn } from '@teable/openapi';
11
import { Job, Queue } from 'bullmq';
12
import { toString } from 'lodash';
13
import { ClsService } from 'nestjs-cls';
14
import type { CreateOp } from 'sharedb';
15
import type { LocalPresence } from 'sharedb/lib/client';
16
import { EventEmitterService } from '../../../event-emitter/event-emitter.service';
17
import { Events } from '../../../event-emitter/events';
18
import { ShareDbService } from '../../../share-db/share-db.service';
19
import type { IClsStore } from '../../../types/cls';
20
import { NotificationService } from '../../notification/notification.service';
21
import { RecordOpenApiService } from '../../record/open-api/record-open-api.service';
22
import { parseBoolean } from './import.class';
23

24
interface ITableImportCsvJob {
25
  baseId: string;
26
  userId: string;
27
  chunk: Record<string, unknown[][]>;
28
  sheetKey: string;
29
  columnInfo?: IImportColumn[];
30
  fields: { id: string; type: FieldType }[];
31
  sourceColumnMap?: Record<string, number | null>;
32
  table: { id: string; name: string };
33
  range: [number, number];
34
  notification?: boolean;
35
  lastChunk?: boolean;
36
}
37

38
export const TABLE_IMPORT_CSV_QUEUE = 'import-table-csv-queue';
2✔
39

40
@Injectable()
41
@Processor(TABLE_IMPORT_CSV_QUEUE)
42
export class ImportTableCsvQueueProcessor extends WorkerHost {
2✔
43
  public static readonly JOB_ID_PREFIX = 'import-table-csv';
117✔
44

45
  private logger = new Logger(ImportTableCsvQueueProcessor.name);
117✔
46
  private timer: NodeJS.Timeout | null = null;
117✔
47
  private readonly TIMER_INTERVAL = 5000;
117✔
48

49
  constructor(
117✔
50
    private readonly recordOpenApiService: RecordOpenApiService,
117✔
51
    private readonly shareDbService: ShareDbService,
117✔
52
    private readonly notificationService: NotificationService,
117✔
53
    private readonly eventEmitterService: EventEmitterService,
117✔
54
    private readonly cls: ClsService<IClsStore>,
117✔
55
    @InjectQueue(TABLE_IMPORT_CSV_QUEUE) public readonly queue: Queue<ITableImportCsvJob>
117✔
56
  ) {
117✔
57
    super();
117✔
58
  }
117✔
59

60
  private startQueueTimer() {
117✔
NEW
61
    if (this.timer) {
×
NEW
62
      return;
×
NEW
63
    }
×
NEW
64
    this.logger.log(`Starting import table queue timer with interval: ${this.TIMER_INTERVAL}ms`);
×
NEW
65
    this.timer = setInterval(async () => await this.refreshTableRowCount(), this.TIMER_INTERVAL);
×
NEW
66
  }
×
67

68
  private async refreshTableRowCount() {
117✔
NEW
69
    // it means there still processing, so they need update rowCount
×
NEW
70
    const waitList = [
×
NEW
71
      ...new Set(
×
NEW
72
        (await this.queue.getJobs('waiting'))
×
NEW
73
          .filter((job) => job.id?.startsWith(ImportTableCsvQueueProcessor.JOB_ID_PREFIX))
×
NEW
74
          .map((job) => job.data.table.id)
×
NEW
75
          .filter((id) => id)
×
76
      ),
NEW
77
    ];
×
NEW
78
    console.log('为啥没执行 1', waitList);
×
79

NEW
80
    if (waitList.length) {
×
NEW
81
      console.log('为啥没执行', waitList);
×
NEW
82
      waitList.forEach((tableId) => {
×
NEW
83
        this.updateRowCount(tableId);
×
NEW
84
      });
×
NEW
85
    }
×
NEW
86
  }
×
87

88
  public async process(job: Job<ITableImportCsvJob>) {
117✔
89
    const jobId = String(job.id);
8✔
90

91
    const { table, notification, baseId, userId, lastChunk, sourceColumnMap, range } = job.data;
8✔
92
    const localPresence = this.createImportPresence(table.id);
8✔
93

94
    try {
8✔
95
      await this.handleImportChunkCsv(job);
8✔
96
      if (lastChunk) {
8✔
97
        notification &&
8!
NEW
98
          this.notificationService.sendImportResultNotify({
×
NEW
99
            baseId,
×
NEW
100
            tableId: table.id,
×
NEW
101
            toUserId: userId,
×
NEW
102
            message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
×
NEW
103
          });
×
104

105
        this.eventEmitterService.emitAsync(Events.IMPORT_TABLE_COMPLETE, {
8✔
106
          baseId,
8✔
107
          tableId: table.id,
8✔
108
        });
8✔
109
        this.setImportStatus(localPresence, false);
8✔
110
        this.updateRowCount(table.id);
8✔
111
      }
8✔
112
    } catch (error) {
8!
NEW
113
      const err = error as Error;
×
NEW
114
      notification &&
×
NEW
115
        this.notificationService.sendImportResultNotify({
×
NEW
116
          baseId,
×
NEW
117
          tableId: table.id,
×
NEW
118
          toUserId: userId,
×
NEW
119
          message: `❌ ${table.name} import aborted: ${err.message} fail row range: [${range}]. Please check the data for this range and retry.`,
×
NEW
120
        });
×
121

NEW
122
      await this.cleanRelativeTask(jobId);
×
123

NEW
124
      throw err;
×
NEW
125
    }
×
126
  }
8✔
127

128
  private async cleanRelativeTask(jobId: string) {
117✔
NEW
129
    const [sameBatchJobPrefix] = jobId.split('_');
×
NEW
130
    const waitingJobs = await this.queue.getJobs(['waiting', 'active']);
×
NEW
131
    await Promise.all(
×
NEW
132
      waitingJobs.filter((job) => job.id?.startsWith(sameBatchJobPrefix)).map((job) => job.remove())
×
133
    );
NEW
134
  }
×
135

136
  private async handleImportChunkCsv(job: Job<ITableImportCsvJob>) {
117✔
137
    await this.cls.run(async () => {
8✔
138
      this.cls.set('user.id', job.data.userId);
8✔
139
      const { chunk, sheetKey, columnInfo, fields, sourceColumnMap, table } = job.data;
8✔
140
      const currentResult = chunk[sheetKey];
8✔
141
      // fill data
8✔
142
      const records = currentResult.map((row) => {
8✔
143
        const res: { fields: Record<string, unknown> } = {
16✔
144
          fields: {},
16✔
145
        };
16✔
146
        // import new table
16✔
147
        if (columnInfo) {
16✔
148
          columnInfo.forEach((col, index) => {
12✔
149
            const { sourceColumnIndex, type } = col;
72✔
150
            // empty row will be return void row value
72✔
151
            const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
72!
152
            res.fields[fields[index].id] =
72✔
153
              type === FieldType.Checkbox ? parseBoolean(value) : value?.toString();
72✔
154
          });
72✔
155
        }
12✔
156
        // inplace records
16✔
157
        if (sourceColumnMap) {
16✔
158
          for (const [key, value] of Object.entries(sourceColumnMap)) {
4✔
159
            if (value !== null) {
24✔
160
              const { type } = fields.find((f) => f.id === key) || {};
24✔
161
              // link value should be string
24✔
162
              res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
24!
163
            }
24✔
164
          }
24✔
165
        }
4✔
166
        return res;
16✔
167
      });
16✔
168
      if (records.length === 0) {
8!
NEW
169
        return;
×
NEW
170
      }
×
171
      try {
8✔
172
        const createFn = columnInfo
8✔
173
          ? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
6✔
174
          : this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
2✔
175
        await createFn(table.id, {
8✔
176
          fieldKeyType: FieldKeyType.Id,
8✔
177
          typecast: true,
8✔
178
          records,
8✔
179
        });
8✔
180
      } catch (e: unknown) {
8!
NEW
181
        this.logger.error(e);
×
NEW
182
        throw e;
×
NEW
183
      }
×
184
    });
8✔
185
  }
8✔
186

187
  private updateRowCount(tableId: string) {
117✔
188
    const channel = getActionTriggerChannel(tableId);
8✔
189
    const presence = this.shareDbService.connect().getPresence(channel);
8✔
190
    const localPresence = presence.create(tableId);
8✔
191
    localPresence.submit([{ actionKey: 'addRecord' }], (error) => {
8✔
192
      error && this.logger.error(error);
8!
193
    });
8✔
194

195
    const updateEmptyOps = {
8✔
196
      src: 'unknown',
8✔
197
      seq: 1,
8✔
198
      m: {
8✔
199
        ts: Date.now(),
8✔
200
      },
8✔
201
      create: {
8✔
202
        type: 'json0',
8✔
203
        data: undefined,
8✔
204
      },
8✔
205
      v: 0,
8✔
206
    } as CreateOp;
8✔
207
    this.shareDbService.publishRecordChannel(tableId, updateEmptyOps);
8✔
208
  }
8✔
209

210
  private setImportStatus(presence: LocalPresence<unknown>, loading: boolean) {
117✔
211
    presence.submit(
8✔
212
      {
8✔
213
        loading,
8✔
214
      },
8✔
215
      (error) => {
8✔
216
        error && this.logger.error(error);
8!
217
      }
8✔
218
    );
219
  }
8✔
220

221
  private createImportPresence(tableId: string) {
117✔
222
    const channel = getTableImportChannel(tableId);
8✔
223
    const presence = this.shareDbService.connect().getPresence(channel);
8✔
224
    return presence.create(channel);
8✔
225
  }
8✔
226

227
  @OnWorkerEvent('active')
228
  onWorkerEvent(job: Job) {
117✔
NEW
229
    const { table, range } = job.data;
×
NEW
230
    this.logger.log(`import data to ${table.id} job started, range: [${range}]`);
×
NEW
231
    this.startQueueTimer();
×
NEW
232
  }
×
233

234
  @OnWorkerEvent('error')
235
  onError(job: Job) {
117✔
NEW
236
    const { table, range } = job.data;
×
NEW
237
    this.logger.error(`import data to ${table.id} job failed, range: [${range}]`);
×
NEW
238
  }
×
239

240
  @OnWorkerEvent('completed')
117✔
NEW
241
  async onCompleted(job: Job) {
×
NEW
242
    const { table, range } = job.data;
×
NEW
243
    this.logger.log(`import data to ${table.id} job completed, range: [${range}]`);
×
244

NEW
245
    const allJobs = (await this.queue.getJobs(['waiting', 'active'])).filter((job) =>
×
NEW
246
      job.id?.startsWith(ImportTableCsvQueueProcessor.JOB_ID_PREFIX)
×
247
    );
248

NEW
249
    if (!allJobs.length && this.timer) {
×
NEW
250
      console.log('什么东西', await this.queue.getJobCounts());
×
NEW
251
      console.log('为啥劲来了', allJobs.length);
×
NEW
252
      this.logger.log('No more import tasks, clearing timer...');
×
NEW
253
      // last task, clear timer
×
254
      clearInterval(this.timer);
×
255
    }
×
256
  }
×
257
}
117✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc