• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 20537761653

27 Dec 2025 10:17AM UTC coverage: 71.689% (+0.02%) from 71.666%
20537761653

Pull #2350

github

web-flow
Merge e62fcb45b into 66ae6e924
Pull Request #2350: fix: handle autonumber fallback for non-generated columns

24382 of 27248 branches covered (89.48%)

20 of 20 new or added lines in 3 files covered. (100.0%)

3 existing lines in 1 file now uncovered.

59771 of 83375 relevant lines covered (71.69%)

4560.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.87
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
8✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson, ImportBaseRo } from '@teable/openapi';
8
import { CreateRecordAction, UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import { ClsService } from 'nestjs-cls';
14
import * as unzipper from 'unzipper';
15
import { InjectDbProvider } from '../../../db-provider/db.provider';
16
import { IDbProvider } from '../../../db-provider/db.provider.interface';
17
import { EventEmitterService } from '../../../event-emitter/event-emitter.service';
18
import { Events } from '../../../event-emitter/events';
19
import type { IClsStore } from '../../../types/cls';
20
import StorageAdapter from '../../attachments/plugins/adapter';
21
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
22
import { BatchProcessor } from '../BatchProcessor.class';
23
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
24
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
25
interface IBaseImportCsvJob {
26
  path: string;
27
  userId: string;
28
  baseId: string;
29
  origin?: {
30
    ip: string;
31
    byApi: boolean;
32
    userAgent: string;
33
    referer: string;
34
  };
35
  tableIdMap: Record<string, string>;
36
  fieldIdMap: Record<string, string>;
37
  viewIdMap: Record<string, string>;
38
  fkMap: Record<string, string>;
39
  structure: IBaseJson;
40
  importBaseRo: ImportBaseRo;
41
  logId: string;
42
}
43

44
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
8✔
45

46
@Injectable()
47
@Processor(BASE_IMPORT_CSV_QUEUE)
48
export class BaseImportCsvQueueProcessor extends WorkerHost {
8✔
49
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
141✔
50

51
  private processedJobs = new Set<string>();
141✔
52

53
  constructor(
141✔
54
    private readonly prismaService: PrismaService,
141✔
55
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
141✔
56
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
141✔
57
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
141✔
58
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
141✔
59
    @InjectDbProvider() private readonly dbProvider: IDbProvider,
141✔
60
    private readonly cls: ClsService<IClsStore>,
141✔
61
    private readonly eventEmitterService: EventEmitterService
141✔
62
  ) {
141✔
63
    super();
141✔
64
  }
141✔
65

66
  public async process(job: Job<IBaseImportCsvJob>) {
141✔
67
    const jobId = String(job.id);
5✔
68
    if (this.processedJobs.has(jobId)) {
5✔
69
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
70
      return;
×
71
    }
×
72

73
    this.processedJobs.add(jobId);
5✔
74

75
    try {
5✔
76
      await this.handleBaseImportCsv(job);
5✔
77
      this.logger.log('import csv parser job completed');
5✔
78
    } catch (error) {
5✔
79
      this.logger.error(
×
80
        `Process base import csv failed: ${(error as Error)?.message}`,
×
81
        (error as Error)?.stack
×
82
      );
83
    }
×
84
  }
5✔
85

86
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>): Promise<void> {
141✔
87
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure, fkMap } = job.data;
5✔
88
    const csvStream = await this.storageAdapter.downloadFile(
5✔
89
      StorageAdapter.getBucket(UploadType.Import),
5✔
90
      path
5✔
91
    );
92

93
    const parser = unzipper.Parse();
5✔
94
    csvStream.pipe(parser);
5✔
95
    let totalRecordsCount = 0;
5✔
96

97
    return new Promise<void>((resolve, reject) => {
5✔
98
      parser.on('entry', (entry) => {
5✔
99
        const filePath = entry.path;
18✔
100
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
18✔
101
        const isJunction = filePath.includes('junction_');
18✔
102

103
        if (isTable && !isJunction) {
18✔
104
          const tableId = filePath.replace('tables/', '').split('.')[0];
9✔
105
          const table = structure.tables.find((table) => table.id === tableId);
9✔
106
          const attachmentsFields =
9✔
107
            table?.fields
9✔
108
              ?.filter(({ type }) => type === FieldType.Attachment)
9✔
109
              .map(({ dbFieldName, id }) => ({
9✔
110
                dbFieldName,
×
111
                id,
×
112
              })) || [];
9✔
113

114
          const buttonFields =
9✔
115
            table?.fields
9✔
116
              ?.filter(({ type }) => type === FieldType.Button)
9✔
117
              .map(({ dbFieldName, id }) => ({
9✔
118
                dbFieldName,
×
119
                id,
×
120
              })) || [];
9✔
121

122
          const computedFields =
9✔
123
            table?.fields
9✔
124
              ?.filter(({ type }) =>
9✔
125
                [
48✔
126
                  FieldType.Formula,
48✔
127
                  FieldType.Rollup,
48✔
128
                  // FieldType.ConditionalRollup,
48✔
129
                  FieldType.CreatedTime,
48✔
130
                  FieldType.LastModifiedTime,
48✔
131
                  FieldType.CreatedBy,
48✔
132
                  FieldType.LastModifiedBy,
48✔
133
                  FieldType.AutoNumber,
48✔
134
                ].includes(type)
48✔
135
              )
136
              .map(({ dbFieldName, id }) => ({
9✔
137
                dbFieldName,
5✔
138
                id,
5✔
139
              })) || [];
9✔
140

141
          const buttonDbFieldNames = buttonFields.map(({ dbFieldName }) => dbFieldName);
9✔
142
          const computedDbFieldNames = computedFields.map(({ dbFieldName }) => dbFieldName);
9✔
143
          const excludeDbFieldNames = [
9✔
144
            ...EXCLUDE_SYSTEM_FIELDS,
9✔
145
            ...buttonDbFieldNames,
9✔
146
            ...computedDbFieldNames,
9✔
147
          ];
9✔
148

149
          const batchProcessor = new BatchProcessor<Record<string, unknown>>(async (chunk) => {
9✔
150
            totalRecordsCount += chunk.length;
9✔
151
            await this.handleChunk(
9✔
152
              chunk,
9✔
153
              {
9✔
154
                tableId: tableIdMap[tableId],
9✔
155
                userId,
9✔
156
                fieldIdMap,
9✔
157
                viewIdMap,
9✔
158
                fkMap,
9✔
159
                attachmentsFields,
9✔
160
              },
9✔
161
              excludeDbFieldNames
9✔
162
            );
163
            // Update audit log after each chunk is written to database
9✔
164
            await this.emitBaseImportAuditLog(job, totalRecordsCount);
9✔
165
          });
9✔
166

167
          entry
9✔
168
            .pipe(
9✔
169
              csvParser.default({
9✔
170
                // strict: true,
9✔
171
                mapValues: ({ value }) => {
9✔
172
                  return value;
984✔
173
                },
984✔
174
                mapHeaders: ({ header }) => {
9✔
175
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
57✔
176
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
177
                  }
×
178

179
                  // special case for cross base link fields, there is no map causing the old error link config
57✔
180
                  if (header.startsWith('__fk_')) {
57✔
181
                    return fieldIdMap[header.slice(5)]
×
182
                      ? `__fk_${fieldIdMap[header.slice(5)]}`
×
183
                      : fkMap[header] || header;
×
184
                  }
×
185

186
                  return header;
57✔
187
                },
57✔
188
              })
9✔
189
            )
190
            .pipe(batchProcessor)
9✔
191
            .on('error', (error: Error) => {
9✔
UNCOV
192
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
UNCOV
193
              reject(error);
×
UNCOV
194
            })
×
195
            .on('end', () => {
9✔
196
              this.logger.log(
×
197
                `csv ${tableId} finished, total records so far: ${totalRecordsCount}`
×
198
              );
199
            });
×
200
        } else {
9✔
201
          entry.autodrain();
9✔
202
        }
9✔
203
      });
18✔
204

205
      parser.on('close', () => {
5✔
206
        this.logger.log(`import csv parser completed, total records: ${totalRecordsCount}`);
5✔
207
        resolve();
5✔
208
      });
5✔
209

210
      parser.on('error', (error) => {
5✔
211
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
212
        reject(error);
×
213
      });
×
214
    });
5✔
215
  }
5✔
216

217
  private async handleChunk(
141✔
218
    results: Record<string, unknown>[],
9✔
219
    config: {
9✔
220
      tableId: string;
221
      userId: string;
222
      fieldIdMap: Record<string, string>;
223
      viewIdMap: Record<string, string>;
224
      fkMap: Record<string, string>;
225
      attachmentsFields: { dbFieldName: string; id: string }[];
226
    },
9✔
227
    excludeDbFieldNames: string[]
9✔
228
  ) {
9✔
229
    const { tableId, userId, fieldIdMap, attachmentsFields, fkMap } = config;
9✔
230
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
9✔
231
      where: { id: tableId },
9✔
232
      select: {
9✔
233
        dbTableName: true,
9✔
234
      },
9✔
235
    });
9✔
236

237
    const allForeignKeyInfos = [] as {
9✔
238
      constraint_name: string;
239
      column_name: string;
240
      referenced_table_schema: string;
241
      referenced_table_name: string;
242
      referenced_column_name: string;
243
      dbTableName: string;
244
    }[];
245

246
    await this.prismaService.$tx(async (prisma) => {
9✔
247
      // delete foreign keys if(exist) then duplicate table data
9✔
248
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
9✔
249
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
9✔
250
        {
251
          constraint_name: string;
252
          column_name: string;
253
          referenced_table_schema: string;
254
          referenced_table_name: string;
255
          referenced_column_name: string;
256
        }[]
257
      >(foreignKeysInfoSql);
9✔
258
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
9✔
259
        ...info,
×
260
        dbTableName,
×
261
      }));
×
262
      allForeignKeyInfos.push(...newForeignKeyInfos);
9✔
263

264
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
9✔
265
        const dropForeignKeyQuery = this.knex.schema
×
266
          .alterTable(dbTableName, (table) => {
×
267
            table.dropForeign(column_name, constraint_name);
×
268
          })
×
269
          .toQuery();
×
270

271
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
272
      }
×
273

274
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
9✔
275
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
9✔
276

277
      const attachmentsTableData = [] as {
9✔
278
        attachmentId: string;
279
        name: string;
280
        token: string;
281
        tableId: string;
282
        recordId: string;
283
        fieldId: string;
284
      }[];
285

286
      const newResult = [...results].map((res) => {
9✔
287
        const newRes = { ...res };
101✔
288

289
        excludeDbFieldNames.forEach((header) => {
101✔
290
          delete newRes[header];
697✔
291
        });
697✔
292

293
        return newRes;
101✔
294
      });
101✔
295

296
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
9✔
297

298
      const fkColumns = columnInfo
9✔
299
        .filter(({ name }) => name.startsWith('__fk_'))
9✔
300
        .map(({ name }) => {
9✔
301
          return fieldIdMap[name.slice(5)]
×
302
            ? `__fk_${fieldIdMap[name.slice(5)]}`
×
303
            : fkMap[name] || name;
×
304
        });
×
305

306
      const recordsToInsert = newResult.map((result) => {
9✔
307
        const res = { ...result };
101✔
308
        Object.entries(res).forEach(([key, value]) => {
101✔
309
          if (res[key] === '') {
893✔
310
            res[key] = null;
347✔
311
          }
347✔
312

313
          // filter unnecessary columns
893✔
314
          if (key.startsWith('__fk_') && !fkColumns.includes(key)) {
893✔
315
            delete res[key];
×
316
          }
×
317

318
          // attachment field should add info to attachments table
893✔
319
          if (attachmentsDbFieldNames.includes(key) && value) {
893✔
320
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
321
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
322
            attValues.forEach((att) => {
×
323
              const attachmentId = generateAttachmentId();
×
324
              attachmentsTableData.push({
×
325
                attachmentId,
×
326
                name: att.name,
×
327
                token: att.token,
×
328
                tableId: tableId,
×
329
                recordId: res['__id'] as string,
×
330
                fieldId: fieldIdMap[fieldId!],
×
331
              });
×
332
            });
×
333
          }
×
334
        });
893✔
335

336
        // default value set
101✔
337
        res['__created_by'] = userId;
101✔
338
        res['__version'] = 1;
101✔
339
        return res;
101✔
340
      });
101✔
341

342
      // add lacking view order field
9✔
343
      if (recordsToInsert.length) {
9✔
344
        const sourceColumns = Object.keys(recordsToInsert[0]);
9✔
345
        const lackingColumns = sourceColumns
9✔
346
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
9✔
347
          .filter((name) => name.startsWith('__row_'));
9✔
348

349
        for (const name of lackingColumns) {
9✔
350
          const sql = this.knex.schema
×
351
            .alterTable(dbTableName, (table) => {
×
352
              table.double(name);
×
353
            })
×
354
            .toQuery();
×
355
          await prisma.$executeRawUnsafe(sql);
×
356
        }
×
357
      }
9✔
358

359
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
9✔
360
      await prisma.$executeRawUnsafe(sql);
9✔
361
      await this.updateAttachmentTable(userId, attachmentsTableData);
9✔
362
    });
9✔
363

364
    // add foreign keys, do not in one transaction with deleting foreign keys
9✔
365
    for (const {
9✔
366
      constraint_name,
9✔
367
      column_name,
9✔
368
      dbTableName,
9✔
369
      referenced_table_schema: referencedTableSchema,
9✔
370
      referenced_table_name: referencedTableName,
9✔
371
      referenced_column_name: referencedColumnName,
9✔
372
    } of allForeignKeyInfos) {
9✔
373
      const addForeignKeyQuery = this.knex.schema
×
374
        .alterTable(dbTableName, (table) => {
×
375
          table
×
376
            .foreign(column_name, constraint_name)
×
377
            .references(referencedColumnName)
×
378
            .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
379
        })
×
380
        .toQuery();
×
381
      await this.prismaService.$executeRawUnsafe(addForeignKeyQuery);
×
382
    }
×
383
  }
9✔
384

385
  // when insert table data relative to attachment, we need to update the attachment table
141✔
386
  private async updateAttachmentTable(
141✔
387
    userId: string,
9✔
388
    attachmentsTableData: {
9✔
389
      attachmentId: string;
390
      name: string;
391
      token: string;
392
      tableId: string;
393
      recordId: string;
394
      fieldId: string;
395
    }[]
9✔
396
  ) {
9✔
397
    await this.prismaService.txClient().attachmentsTable.createMany({
9✔
398
      data: attachmentsTableData.map((a) => ({
9✔
399
        ...a,
×
400
        createdBy: userId,
×
401
      })),
×
402
    });
9✔
403
  }
9✔
404

405
  @OnWorkerEvent('completed')
141✔
406
  async onCompleted(job: Job) {
×
407
    const { fieldIdMap, path, structure, userId } = job.data;
×
408
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
409
      'import_base_junction_csv',
×
410
      {
×
411
        fieldIdMap,
×
412
        path,
×
413
        structure,
×
414
      },
×
415
      {
×
416
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
417
        delay: 2000,
×
418
      }
×
419
    );
420
  }
×
421

422
  private async emitBaseImportAuditLog(job: Job<IBaseImportCsvJob>, recordsLength: number) {
141✔
423
    const { origin, userId, baseId, logId } = job.data;
9✔
424

425
    await this.cls.run(async () => {
9✔
426
      this.cls.set('origin', origin!);
9✔
427
      this.cls.set('user.id', userId);
9✔
428
      await this.eventEmitterService.emitAsync(Events.TABLE_RECORD_CREATE_RELATIVE, {
9✔
429
        action: CreateRecordAction.BaseImport,
9✔
430
        resourceId: baseId,
9✔
431
        recordCount: recordsLength,
9✔
432
        logId,
9✔
433
      });
9✔
434
    });
9✔
435
  }
9✔
436
}
141✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc