• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 15668620488

15 Jun 2025 11:43PM UTC coverage: 80.847%. First build
15668620488

push

github

web-flow
fix: import e2e error (#1599)

8050 of 8570 branches covered (93.93%)

0 of 4 new or added lines in 1 file covered. (0.0%)

38370 of 47460 relevant lines covered (80.85%)

1763.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.02
/apps/nestjs-backend/src/features/import/open-api/import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
2✔
2
import { join } from 'path';
3
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
4
import { Injectable, Logger } from '@nestjs/common';
5
import {
6
  FieldKeyType,
7
  FieldType,
8
  getActionTriggerChannel,
9
  getRandomString,
10
  getTableImportChannel,
11
} from '@teable/core';
12
import { UploadType, type IImportColumn } from '@teable/openapi';
13
import { Job, Queue } from 'bullmq';
14
import { toString } from 'lodash';
15
import { ClsService } from 'nestjs-cls';
16
import Papa from 'papaparse';
17
import type { CreateOp } from 'sharedb';
18
import type { LocalPresence } from 'sharedb/lib/client';
19
import { EventEmitterService } from '../../../event-emitter/event-emitter.service';
20
import { Events } from '../../../event-emitter/events';
21
import { ShareDbService } from '../../../share-db/share-db.service';
22
import type { IClsStore } from '../../../types/cls';
23
import StorageAdapter from '../../attachments/plugins/adapter';
24
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
25
import { NotificationService } from '../../notification/notification.service';
26
import { RecordOpenApiService } from '../../record/open-api/record-open-api.service';
27
import { parseBoolean } from './import.class';
28

29
interface ITableImportCsvJob {
30
  baseId: string;
31
  userId: string;
32
  path: string;
33
  columnInfo?: IImportColumn[];
34
  fields: { id: string; type: FieldType }[];
35
  sourceColumnMap?: Record<string, number | null>;
36
  table: { id: string; name: string };
37
  range: [number, number];
38
  notification?: boolean;
39
  lastChunk?: boolean;
40
  parentJobId: string;
41
}
42

43
export const TABLE_IMPORT_CSV_QUEUE = 'import-table-csv-queue';
2✔
44

45
@Injectable()
46
@Processor(TABLE_IMPORT_CSV_QUEUE)
47
export class ImportTableCsvQueueProcessor extends WorkerHost {
2✔
48
  public static readonly JOB_ID_PREFIX = 'import-table-csv';
121✔
49

50
  private logger = new Logger(ImportTableCsvQueueProcessor.name);
121✔
51
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
121✔
52
  private presences: LocalPresence<any>[] = [];
121✔
53

54
  constructor(
121✔
55
    private readonly recordOpenApiService: RecordOpenApiService,
121✔
56
    private readonly shareDbService: ShareDbService,
121✔
57
    private readonly notificationService: NotificationService,
121✔
58
    private readonly eventEmitterService: EventEmitterService,
121✔
59
    private readonly cls: ClsService<IClsStore>,
121✔
60
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
121✔
61
    @InjectQueue(TABLE_IMPORT_CSV_QUEUE) public readonly queue: Queue<ITableImportCsvJob>
121✔
62
  ) {
121✔
63
    super();
121✔
64
  }
121✔
65

66
  public async process(job: Job<ITableImportCsvJob>) {
121✔
67
    const { table, notification, baseId, userId, lastChunk, sourceColumnMap, range } = job.data;
8✔
68
    const localPresence = this.createImportPresence(table.id, 'status');
8✔
69
    this.setImportStatus(localPresence, true);
8✔
70
    try {
8✔
71
      await this.handleImportChunkCsv(job);
8✔
72
      if (lastChunk) {
8✔
73
        notification &&
8!
74
          this.notificationService.sendImportResultNotify({
×
75
            baseId,
×
76
            tableId: table.id,
×
77
            toUserId: userId,
×
78
            message: `🎉 ${table.name} ${sourceColumnMap ? 'inplace' : ''} imported successfully`,
×
79
          });
×
80

81
        this.eventEmitterService.emitAsync(Events.IMPORT_TABLE_COMPLETE, {
8✔
82
          baseId,
8✔
83
          tableId: table.id,
8✔
84
        });
8✔
85

86
        this.setImportStatus(localPresence, false);
8✔
87
        localPresence.destroy();
8✔
88
        this.presences = this.presences.filter(
8✔
89
          (presence) => presence.presenceId !== localPresence.presenceId
8✔
90
        );
91

92
        const dir = StorageAdapter.getDir(UploadType.Import);
8✔
93
        const fullPath = join(dir, job.data.parentJobId);
8✔
94
        await this.storageAdapter.deleteDir(
8✔
95
          StorageAdapter.getBucket(UploadType.Import),
8✔
96
          fullPath,
8✔
97
          false
8✔
98
        );
99
      }
8✔
100
    } catch (error) {
8!
101
      const err = error as Error;
×
102
      notification &&
×
103
        this.notificationService.sendImportResultNotify({
×
104
          baseId,
×
105
          tableId: table.id,
×
106
          toUserId: userId,
×
107
          message: `❌ ${table.name} import aborted: ${err.message} fail row range: [${range}]. Please check the data for this range and retry.`,
×
108
        });
×
109

110
      throw err;
×
111
    }
×
112
  }
8✔
113

114
  private async cleanRelativeTask(parentJobId: string) {
121✔
115
    const allJobs = (await this.queue.getJobs(['waiting', 'active'])).filter((job) =>
×
116
      job.id?.startsWith(parentJobId)
×
117
    );
118

119
    for (const relatedJob of allJobs) {
×
120
      relatedJob.remove();
×
121
    }
×
122
  }
×
123

124
  private async handleImportChunkCsv(job: Job<ITableImportCsvJob>) {
121✔
125
    await this.cls.run(async () => {
8✔
126
      this.cls.set('user.id', job.data.userId);
8✔
127
      const { columnInfo, fields, sourceColumnMap, table } = job.data;
8✔
128
      const currentResult = await this.getChunkData(job);
8✔
129
      // fill data
8✔
130
      const records = currentResult.map((row) => {
8✔
131
        const res: { fields: Record<string, unknown> } = {
16✔
132
          fields: {},
16✔
133
        };
16✔
134
        // import new table
16✔
135
        if (columnInfo) {
16✔
136
          columnInfo.forEach((col, index) => {
12✔
137
            const { sourceColumnIndex, type } = col;
72✔
138
            // empty row will be return void row value
72✔
139
            const value = Array.isArray(row) ? row[sourceColumnIndex] : null;
72!
140
            res.fields[fields[index].id] =
72✔
141
              type === FieldType.Checkbox ? parseBoolean(value) : value?.toString();
72✔
142
          });
72✔
143
        }
12✔
144
        // inplace records
16✔
145
        if (sourceColumnMap) {
16✔
146
          for (const [key, value] of Object.entries(sourceColumnMap)) {
4✔
147
            if (value !== null) {
24✔
148
              const { type } = fields.find((f) => f.id === key) || {};
24✔
149
              // link value should be string
24✔
150
              res.fields[key] = type === FieldType.Link ? toString(row[value]) : row[value];
24!
151
            }
24✔
152
          }
24✔
153
        }
4✔
154
        return res;
16✔
155
      });
16✔
156
      if (records.length === 0) {
8!
157
        return;
×
158
      }
×
159
      try {
8✔
160
        const createFn = columnInfo
8✔
161
          ? this.recordOpenApiService.createRecordsOnlySql.bind(this.recordOpenApiService)
6✔
162
          : this.recordOpenApiService.multipleCreateRecords.bind(this.recordOpenApiService);
2✔
163
        await createFn(table.id, {
8✔
164
          fieldKeyType: FieldKeyType.Id,
8✔
165
          typecast: true,
8✔
166
          records,
8✔
167
        });
8✔
168
      } catch (e: unknown) {
8!
169
        this.logger.error(e);
×
170
        throw e;
×
171
      }
×
172
    });
8✔
173
  }
8✔
174

175
  private async getChunkData(job: Job<ITableImportCsvJob>): Promise<unknown[][]> {
121✔
176
    const { path } = job.data;
8✔
177
    const stream = await this.storageAdapter.downloadFile(
8✔
178
      StorageAdapter.getBucket(UploadType.Import),
8✔
179
      path
8✔
180
    );
181
    return new Promise((resolve, reject) => {
8✔
182
      Papa.parse(stream, {
8✔
183
        download: false,
8✔
184
        dynamicTyping: true,
8✔
185
        complete: (result) => {
8✔
186
          resolve(result.data as unknown[][]);
8✔
187
        },
8✔
188
        error: (err) => {
8✔
189
          reject(err);
×
190
        },
×
191
      });
8✔
192
    });
8✔
193
  }
8✔
194

195
  private updateRowCount(tableId: string) {
121✔
196
    const localPresence = this.createImportPresence(tableId, 'rowCount');
×
197
    localPresence.submit([{ actionKey: 'addRecord' }], (error) => {
×
198
      error && this.logger.error(error);
×
199
    });
×
200

201
    const updateEmptyOps = {
×
202
      src: 'unknown',
×
203
      seq: 1,
×
204
      m: {
×
205
        ts: Date.now(),
×
206
      },
×
207
      create: {
×
208
        type: 'json0',
×
209
        data: undefined,
×
210
      },
×
211
      v: 0,
×
212
    } as CreateOp;
×
213
    this.shareDbService.publishRecordChannel(tableId, updateEmptyOps);
×
214
  }
×
215

216
  setImportStatus(presence: LocalPresence<unknown>, loading: boolean) {
121✔
217
    presence.submit(
16✔
218
      {
16✔
219
        loading,
16✔
220
      },
16✔
221
      (error) => {
16✔
222
        error && this.logger.error(error);
16!
223
      }
16✔
224
    );
225
  }
16✔
226

227
  createImportPresence(tableId: string, type: 'rowCount' | 'status' = 'status') {
121✔
228
    const channel =
8✔
229
      type === 'rowCount' ? getActionTriggerChannel(tableId) : getTableImportChannel(tableId);
8!
230
    const existPresence = this.presences.find(({ presence }) => {
8✔
231
      return presence.channel === channel;
×
232
    });
×
233
    if (existPresence) {
8!
234
      return existPresence;
×
235
    }
×
236
    const presence = this.shareDbService.connect().getPresence(channel);
8✔
237
    const localPresence = presence.create(channel);
8✔
238
    this.presences.push(localPresence);
8✔
239
    return localPresence;
8✔
240
  }
8✔
241

242
  public getChunkImportJobIdPrefix(parentId: string) {
121✔
243
    return `${parentId}_import_${getRandomString(6)}`;
8✔
244
  }
8✔
245

246
  public getChunkImportJobId(jobId: string, range: [number, number]) {
121✔
247
    const prefix = this.getChunkImportJobIdPrefix(jobId);
8✔
248
    return `${prefix}_[${range[0]},${range[1]}]`;
8✔
249
  }
8✔
250

251
  @OnWorkerEvent('active')
252
  onWorkerEvent(job: Job) {
121✔
253
    const { table, range } = job.data;
×
254
    this.logger.log(`import data to ${table.id} job started, range: [${range}]`);
×
255
  }
×
256

257
  @OnWorkerEvent('error')
121✔
258
  async onError(job: Job) {
×
NEW
259
    if (!job?.data) {
×
NEW
260
      this.logger.error('import csv job data is undefined');
×
NEW
261
      return;
×
NEW
262
    }
×
263
    const { table, range, parentJobId } = job.data;
×
264
    this.logger.error(`import data to ${table.id} job failed, range: [${range}]`);
×
265
    this.cleanRelativeTask(parentJobId);
×
266
    const localPresence = this.createImportPresence(table.id, 'status');
×
267
    this.setImportStatus(localPresence, false);
×
268
  }
×
269

270
  @OnWorkerEvent('completed')
121✔
271
  async onCompleted(job: Job) {
×
272
    const { table, range } = job.data;
×
273
    this.logger.log(`import data to ${table.id} job completed, range: [${range}]`);
×
274
    this.updateRowCount(table.id);
×
275
  }
×
276
}
121✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc