• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 19889415988

03 Dec 2025 09:45AM UTC coverage: 71.655% (-0.02%) from 71.673%
19889415988

Pull #2205

github

web-flow
Merge 843701797 into 239642229
Pull Request #2205: fix: import csv chunk waiting too long

21953 of 24635 branches covered (89.11%)

10 of 13 new or added lines in 2 files covered. (76.92%)

14 existing lines in 3 files now uncovered.

55490 of 77441 relevant lines covered (71.65%)

4333.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.29
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
7✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson } from '@teable/openapi';
8
import { UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import * as unzipper from 'unzipper';
14
import { InjectDbProvider } from '../../../db-provider/db.provider';
15
import { IDbProvider } from '../../../db-provider/db.provider.interface';
16
import StorageAdapter from '../../attachments/plugins/adapter';
17
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
18
import { BatchProcessor } from '../BatchProcessor.class';
19
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
20
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
21
interface IBaseImportCsvJob {
22
  path: string;
23
  userId: string;
24
  tableIdMap: Record<string, string>;
25
  fieldIdMap: Record<string, string>;
26
  viewIdMap: Record<string, string>;
27
  fkMap: Record<string, string>;
28
  structure: IBaseJson;
29
}
30

31
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
7✔
32

33
@Injectable()
34
@Processor(BASE_IMPORT_CSV_QUEUE)
35
export class BaseImportCsvQueueProcessor extends WorkerHost {
7✔
36
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
110✔
37

38
  private processedJobs = new Set<string>();
110✔
39

40
  constructor(
110✔
41
    private readonly prismaService: PrismaService,
110✔
42
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
110✔
43
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
110✔
44
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
110✔
45
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
110✔
46
    @InjectDbProvider() private readonly dbProvider: IDbProvider
110✔
47
  ) {
110✔
48
    super();
110✔
49
  }
110✔
50

51
  public async process(job: Job<IBaseImportCsvJob>) {
110✔
52
    const jobId = String(job.id);
4✔
53
    if (this.processedJobs.has(jobId)) {
4!
54
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
55
      return;
×
56
    }
×
57

58
    this.processedJobs.add(jobId);
4✔
59

60
    try {
4✔
61
      await this.handleBaseImportCsv(job);
4✔
62
      this.logger.log('import csv parser job completed');
4✔
63
    } catch (error) {
4!
64
      this.logger.error(
×
65
        `Process base import csv failed: ${(error as Error)?.message}`,
×
66
        (error as Error)?.stack
×
67
      );
68
    }
×
69
  }
4✔
70

71
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>) {
110✔
72
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure, fkMap } = job.data;
4✔
73
    const csvStream = await this.storageAdapter.downloadFile(
4✔
74
      StorageAdapter.getBucket(UploadType.Import),
4✔
75
      path
4✔
76
    );
77

78
    const parser = unzipper.Parse();
4✔
79
    csvStream.pipe(parser);
4✔
80

81
    return new Promise<{ success: boolean }>((resolve, reject) => {
4✔
82
      parser.on('entry', (entry) => {
4✔
83
        const filePath = entry.path;
15✔
84
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
15✔
85
        const isJunction = filePath.includes('junction_');
15✔
86

87
        if (isTable && !isJunction) {
15✔
88
          const tableId = filePath.replace('tables/', '').split('.')[0];
7✔
89
          const table = structure.tables.find((table) => table.id === tableId);
7✔
90
          const attachmentsFields =
7✔
91
            table?.fields
7✔
92
              ?.filter(({ type }) => type === FieldType.Attachment)
7✔
93
              .map(({ dbFieldName, id }) => ({
7✔
94
                dbFieldName,
×
95
                id,
×
96
              })) || [];
7!
97

98
          const buttonFields =
7✔
99
            table?.fields
7✔
100
              ?.filter(({ type }) => type === FieldType.Button)
7✔
101
              .map(({ dbFieldName, id }) => ({
7✔
102
                dbFieldName,
×
103
                id,
×
104
              })) || [];
7!
105
          const buttonDbFieldNames = buttonFields.map(({ dbFieldName }) => dbFieldName);
7✔
106

107
          const excludeDbFieldNames = [...EXCLUDE_SYSTEM_FIELDS, ...buttonDbFieldNames];
7✔
108
          const batchProcessor = new BatchProcessor<Record<string, unknown>>((chunk) =>
7✔
109
            this.handleChunk(
7✔
110
              chunk,
7✔
111
              {
7✔
112
                tableId: tableIdMap[tableId],
7✔
113
                userId,
7✔
114
                fieldIdMap,
7✔
115
                viewIdMap,
7✔
116
                fkMap,
7✔
117
                attachmentsFields,
7✔
118
              },
7✔
119
              excludeDbFieldNames
7✔
120
            )
121
          );
122

123
          entry
7✔
124
            .pipe(
7✔
125
              csvParser.default({
7✔
126
                // strict: true,
7✔
127
                mapValues: ({ value }) => {
7✔
128
                  return value;
972✔
129
                },
972✔
130
                mapHeaders: ({ header }) => {
7✔
131
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
53!
132
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
133
                  }
×
134

135
                  // special case for cross base link fields, there is no map causing the old error link config
53✔
136
                  if (header.startsWith('__fk_')) {
53!
137
                    return fieldIdMap[header.slice(5)]
×
138
                      ? `__fk_${fieldIdMap[header.slice(5)]}`
×
139
                      : fkMap[header] || header;
×
140
                  }
×
141

142
                  return header;
53✔
143
                },
53✔
144
              })
7✔
145
            )
146
            .pipe(batchProcessor)
7✔
147
            .on('error', (error: Error) => {
7✔
UNCOV
148
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
UNCOV
149
              reject(error);
×
UNCOV
150
            })
×
151
            .on('end', () => {
7✔
152
              this.logger.log(`csv ${tableId} finished`);
×
153
              resolve({ success: true });
×
154
            });
×
155
        } else {
15✔
156
          entry.autodrain();
8✔
157
        }
8✔
158
      });
15✔
159

160
      parser.on('close', () => {
4✔
161
        this.logger.log('import csv parser completed');
4✔
162
        resolve({ success: true });
4✔
163
      });
4✔
164

165
      parser.on('error', (error) => {
4✔
166
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
167
        reject(error);
×
168
      });
×
169
    });
4✔
170
  }
4✔
171

172
  private async handleChunk(
110✔
173
    results: Record<string, unknown>[],
7✔
174
    config: {
7✔
175
      tableId: string;
176
      userId: string;
177
      fieldIdMap: Record<string, string>;
178
      viewIdMap: Record<string, string>;
179
      fkMap: Record<string, string>;
180
      attachmentsFields: { dbFieldName: string; id: string }[];
181
    },
7✔
182
    excludeDbFieldNames: string[]
7✔
183
  ) {
7✔
184
    const { tableId, userId, fieldIdMap, attachmentsFields, fkMap } = config;
7✔
185
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
7✔
186
      where: { id: tableId },
7✔
187
      select: {
7✔
188
        dbTableName: true,
7✔
189
      },
7✔
190
    });
7✔
191

192
    const allForeignKeyInfos = [] as {
7✔
193
      constraint_name: string;
194
      column_name: string;
195
      referenced_table_schema: string;
196
      referenced_table_name: string;
197
      referenced_column_name: string;
198
      dbTableName: string;
199
    }[];
200

201
    await this.prismaService.$tx(async (prisma) => {
7✔
202
      // delete foreign keys if(exist) then duplicate table data
7✔
203
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
7✔
204
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
7✔
205
        {
206
          constraint_name: string;
207
          column_name: string;
208
          referenced_table_schema: string;
209
          referenced_table_name: string;
210
          referenced_column_name: string;
211
        }[]
212
      >(foreignKeysInfoSql);
7✔
213
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
7✔
214
        ...info,
×
215
        dbTableName,
×
216
      }));
×
217
      allForeignKeyInfos.push(...newForeignKeyInfos);
7✔
218

219
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
7!
220
        const dropForeignKeyQuery = this.knex.schema
×
221
          .alterTable(dbTableName, (table) => {
×
222
            table.dropForeign(column_name, constraint_name);
×
223
          })
×
224
          .toQuery();
×
225

226
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
227
      }
×
228

229
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
7✔
230
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
7✔
231

232
      const attachmentsTableData = [] as {
7✔
233
        attachmentId: string;
234
        name: string;
235
        token: string;
236
        tableId: string;
237
        recordId: string;
238
        fieldId: string;
239
      }[];
240

241
      const newResult = [...results].map((res) => {
7✔
242
        const newRes = { ...res };
95✔
243

244
        excludeDbFieldNames.forEach((header) => {
95✔
245
          delete newRes[header];
570✔
246
        });
570✔
247

248
        return newRes;
95✔
249
      });
95✔
250

251
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
7✔
252

253
      const fkColumns = columnInfo
7✔
254
        .filter(({ name }) => name.startsWith('__fk_'))
7✔
255
        .map(({ name }) => {
7✔
256
          return fieldIdMap[name.slice(5)]
×
257
            ? `__fk_${fieldIdMap[name.slice(5)]}`
×
258
            : fkMap[name] || name;
×
259
        });
×
260

261
      const recordsToInsert = newResult.map((result) => {
7✔
262
        const res = { ...result };
95✔
263
        Object.entries(res).forEach(([key, value]) => {
95✔
264
          if (res[key] === '') {
972✔
265
            res[key] = null;
356✔
266
          }
356✔
267

268
          // filter unnecessary columns
972✔
269
          if (key.startsWith('__fk_') && !fkColumns.includes(key)) {
972!
270
            delete res[key];
×
271
          }
×
272

273
          // attachment field should add info to attachments table
972✔
274
          if (attachmentsDbFieldNames.includes(key) && value) {
972!
275
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
276
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
277
            attValues.forEach((att) => {
×
278
              const attachmentId = generateAttachmentId();
×
279
              attachmentsTableData.push({
×
280
                attachmentId,
×
281
                name: att.name,
×
282
                token: att.token,
×
283
                tableId: tableId,
×
284
                recordId: res['__id'] as string,
×
285
                fieldId: fieldIdMap[fieldId!],
×
286
              });
×
287
            });
×
288
          }
×
289
        });
972✔
290

291
        // default value set
95✔
292
        res['__created_by'] = userId;
95✔
293
        res['__version'] = 1;
95✔
294
        return res;
95✔
295
      });
95✔
296

297
      // add lacking view order field
7✔
298
      if (recordsToInsert.length) {
7✔
299
        const sourceColumns = Object.keys(recordsToInsert[0]);
7✔
300
        const lackingColumns = sourceColumns
7✔
301
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
7✔
302
          .filter((name) => name.startsWith('__row_'));
7✔
303

304
        for (const name of lackingColumns) {
7!
305
          const sql = this.knex.schema
×
306
            .alterTable(dbTableName, (table) => {
×
307
              table.double(name);
×
308
            })
×
309
            .toQuery();
×
310
          await prisma.$executeRawUnsafe(sql);
×
311
        }
×
312
      }
7✔
313

314
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
7✔
315
      await prisma.$executeRawUnsafe(sql);
7✔
316
      await this.updateAttachmentTable(userId, attachmentsTableData);
7✔
317
    });
7✔
318

319
    // add foreign keys, do not in one transaction with deleting foreign keys
7✔
320
    for (const {
7✔
321
      constraint_name,
7✔
322
      column_name,
7✔
323
      dbTableName,
7✔
324
      referenced_table_schema: referencedTableSchema,
7✔
325
      referenced_table_name: referencedTableName,
7✔
326
      referenced_column_name: referencedColumnName,
7✔
327
    } of allForeignKeyInfos) {
7!
328
      const addForeignKeyQuery = this.knex.schema
×
329
        .alterTable(dbTableName, (table) => {
×
330
          table
×
331
            .foreign(column_name, constraint_name)
×
332
            .references(referencedColumnName)
×
333
            .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
334
        })
×
335
        .toQuery();
×
336
      await this.prismaService.$executeRawUnsafe(addForeignKeyQuery);
×
337
    }
×
338
  }
7✔
339

340
  // when insert table data relative to attachment, we need to update the attachment table
110✔
341
  private async updateAttachmentTable(
110✔
342
    userId: string,
7✔
343
    attachmentsTableData: {
7✔
344
      attachmentId: string;
345
      name: string;
346
      token: string;
347
      tableId: string;
348
      recordId: string;
349
      fieldId: string;
350
    }[]
7✔
351
  ) {
7✔
352
    await this.prismaService.txClient().attachmentsTable.createMany({
7✔
353
      data: attachmentsTableData.map((a) => ({
7✔
354
        ...a,
×
355
        createdBy: userId,
×
356
      })),
×
357
    });
7✔
358
  }
7✔
359

360
  @OnWorkerEvent('completed')
110✔
361
  async onCompleted(job: Job) {
×
362
    const { fieldIdMap, path, structure, userId } = job.data;
×
363
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
364
      'import_base_junction_csv',
×
365
      {
×
366
        fieldIdMap,
×
367
        path,
×
368
        structure,
×
369
      },
×
370
      {
×
371
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
372
        delay: 2000,
×
373
      }
×
374
    );
375
  }
×
376
}
110✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc