• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 20015007456

08 Dec 2025 02:44AM UTC coverage: 71.706% (+0.004%) from 71.702%
20015007456

push

github

web-flow
Merge pull request #2227 from teableio/feat/last-modified-by-tracking

feat/last modified by tracking

22065 of 24768 branches covered (89.09%)

83 of 109 new or added lines in 4 files covered. (76.15%)

19 existing lines in 6 files now uncovered.

55840 of 77874 relevant lines covered (71.71%)

4325.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.75
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
7✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson, ImportBaseRo } from '@teable/openapi';
8
import { CreateRecordAction, UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import { ClsService } from 'nestjs-cls';
14
import * as unzipper from 'unzipper';
15
import { InjectDbProvider } from '../../../db-provider/db.provider';
16
import { IDbProvider } from '../../../db-provider/db.provider.interface';
17
import { EventEmitterService } from '../../../event-emitter/event-emitter.service';
18
import { Events } from '../../../event-emitter/events';
19
import type { IClsStore } from '../../../types/cls';
20
import StorageAdapter from '../../attachments/plugins/adapter';
21
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
22
import { BatchProcessor } from '../BatchProcessor.class';
23
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
24
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
25
interface IBaseImportCsvJob {
26
  path: string;
27
  userId: string;
28
  baseId: string;
29
  origin?: {
30
    ip: string;
31
    byApi: boolean;
32
    userAgent: string;
33
    referer: string;
34
  };
35
  tableIdMap: Record<string, string>;
36
  fieldIdMap: Record<string, string>;
37
  viewIdMap: Record<string, string>;
38
  fkMap: Record<string, string>;
39
  structure: IBaseJson;
40
  importBaseRo: ImportBaseRo;
41
  logId: string;
42
}
43

44
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
7✔
45

46
@Injectable()
47
@Processor(BASE_IMPORT_CSV_QUEUE)
48
export class BaseImportCsvQueueProcessor extends WorkerHost {
7✔
49
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
111✔
50

51
  private processedJobs = new Set<string>();
111✔
52

53
  constructor(
111✔
54
    private readonly prismaService: PrismaService,
111✔
55
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
111✔
56
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
111✔
57
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
111✔
58
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
111✔
59
    @InjectDbProvider() private readonly dbProvider: IDbProvider,
111✔
60
    private readonly cls: ClsService<IClsStore>,
111✔
61
    private readonly eventEmitterService: EventEmitterService
111✔
62
  ) {
111✔
63
    super();
111✔
64
  }
111✔
65

66
  public async process(job: Job<IBaseImportCsvJob>) {
111✔
67
    const jobId = String(job.id);
4✔
68
    if (this.processedJobs.has(jobId)) {
4!
69
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
70
      return;
×
71
    }
×
72

73
    this.processedJobs.add(jobId);
4✔
74

75
    try {
4✔
76
      await this.handleBaseImportCsv(job);
4✔
77
      this.logger.log('import csv parser job completed');
4✔
78
    } catch (error) {
4!
79
      this.logger.error(
×
80
        `Process base import csv failed: ${(error as Error)?.message}`,
×
81
        (error as Error)?.stack
×
82
      );
83
    }
×
84
  }
4✔
85

86
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>): Promise<void> {
111✔
87
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure, fkMap } = job.data;
4✔
88
    const csvStream = await this.storageAdapter.downloadFile(
4✔
89
      StorageAdapter.getBucket(UploadType.Import),
4✔
90
      path
4✔
91
    );
92

93
    const parser = unzipper.Parse();
4✔
94
    csvStream.pipe(parser);
4✔
95
    let totalRecordsCount = 0;
4✔
96

97
    return new Promise<void>((resolve, reject) => {
4✔
98
      parser.on('entry', (entry) => {
4✔
99
        const filePath = entry.path;
15✔
100
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
15✔
101
        const isJunction = filePath.includes('junction_');
15✔
102

103
        if (isTable && !isJunction) {
15✔
104
          const tableId = filePath.replace('tables/', '').split('.')[0];
7✔
105
          const table = structure.tables.find((table) => table.id === tableId);
7✔
106
          const attachmentsFields =
7✔
107
            table?.fields
7✔
108
              ?.filter(({ type }) => type === FieldType.Attachment)
7✔
109
              .map(({ dbFieldName, id }) => ({
7✔
110
                dbFieldName,
×
111
                id,
×
112
              })) || [];
7!
113

114
          const buttonFields =
7✔
115
            table?.fields
7✔
116
              ?.filter(({ type }) => type === FieldType.Button)
7✔
117
              .map(({ dbFieldName, id }) => ({
7✔
118
                dbFieldName,
×
119
                id,
×
120
              })) || [];
7!
121
          const buttonDbFieldNames = buttonFields.map(({ dbFieldName }) => dbFieldName);
7✔
122

123
          const excludeDbFieldNames = [...EXCLUDE_SYSTEM_FIELDS, ...buttonDbFieldNames];
7✔
124
          const batchProcessor = new BatchProcessor<Record<string, unknown>>(async (chunk) => {
7✔
125
            totalRecordsCount += chunk.length;
7✔
126
            await this.handleChunk(
7✔
127
              chunk,
7✔
128
              {
7✔
129
                tableId: tableIdMap[tableId],
7✔
130
                userId,
7✔
131
                fieldIdMap,
7✔
132
                viewIdMap,
7✔
133
                fkMap,
7✔
134
                attachmentsFields,
7✔
135
              },
7✔
136
              excludeDbFieldNames
7✔
137
            );
138
            // Update audit log after each chunk is written to database
7✔
139
            await this.emitBaseImportAuditLog(job, totalRecordsCount);
7✔
140
          });
7✔
141

142
          entry
7✔
143
            .pipe(
7✔
144
              csvParser.default({
7✔
145
                // strict: true,
7✔
146
                mapValues: ({ value }) => {
7✔
147
                  return value;
972✔
148
                },
972✔
149
                mapHeaders: ({ header }) => {
7✔
150
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
53!
151
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
152
                  }
×
153

154
                  // special case for cross base link fields, there is no map causing the old error link config
53✔
155
                  if (header.startsWith('__fk_')) {
53!
156
                    return fieldIdMap[header.slice(5)]
×
157
                      ? `__fk_${fieldIdMap[header.slice(5)]}`
×
158
                      : fkMap[header] || header;
×
159
                  }
×
160

161
                  return header;
53✔
162
                },
53✔
163
              })
7✔
164
            )
165
            .pipe(batchProcessor)
7✔
166
            .on('error', (error: Error) => {
7✔
UNCOV
167
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
UNCOV
168
              reject(error);
×
UNCOV
169
            })
×
170
            .on('end', () => {
7✔
171
              this.logger.log(
×
172
                `csv ${tableId} finished, total records so far: ${totalRecordsCount}`
×
173
              );
174
            });
×
175
        } else {
15✔
176
          entry.autodrain();
8✔
177
        }
8✔
178
      });
15✔
179

180
      parser.on('close', () => {
4✔
181
        this.logger.log(`import csv parser completed, total records: ${totalRecordsCount}`);
4✔
182
        resolve();
4✔
183
      });
4✔
184

185
      parser.on('error', (error) => {
4✔
186
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
187
        reject(error);
×
188
      });
×
189
    });
4✔
190
  }
4✔
191

192
  private async handleChunk(
111✔
193
    results: Record<string, unknown>[],
7✔
194
    config: {
7✔
195
      tableId: string;
196
      userId: string;
197
      fieldIdMap: Record<string, string>;
198
      viewIdMap: Record<string, string>;
199
      fkMap: Record<string, string>;
200
      attachmentsFields: { dbFieldName: string; id: string }[];
201
    },
7✔
202
    excludeDbFieldNames: string[]
7✔
203
  ) {
7✔
204
    const { tableId, userId, fieldIdMap, attachmentsFields, fkMap } = config;
7✔
205
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
7✔
206
      where: { id: tableId },
7✔
207
      select: {
7✔
208
        dbTableName: true,
7✔
209
      },
7✔
210
    });
7✔
211

212
    const allForeignKeyInfos = [] as {
7✔
213
      constraint_name: string;
214
      column_name: string;
215
      referenced_table_schema: string;
216
      referenced_table_name: string;
217
      referenced_column_name: string;
218
      dbTableName: string;
219
    }[];
220

221
    await this.prismaService.$tx(async (prisma) => {
7✔
222
      // delete foreign keys if(exist) then duplicate table data
7✔
223
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
7✔
224
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
7✔
225
        {
226
          constraint_name: string;
227
          column_name: string;
228
          referenced_table_schema: string;
229
          referenced_table_name: string;
230
          referenced_column_name: string;
231
        }[]
232
      >(foreignKeysInfoSql);
7✔
233
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
7✔
234
        ...info,
×
235
        dbTableName,
×
236
      }));
×
237
      allForeignKeyInfos.push(...newForeignKeyInfos);
7✔
238

239
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
7!
240
        const dropForeignKeyQuery = this.knex.schema
×
241
          .alterTable(dbTableName, (table) => {
×
242
            table.dropForeign(column_name, constraint_name);
×
243
          })
×
244
          .toQuery();
×
245

246
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
247
      }
×
248

249
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
7✔
250
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
7✔
251

252
      const attachmentsTableData = [] as {
7✔
253
        attachmentId: string;
254
        name: string;
255
        token: string;
256
        tableId: string;
257
        recordId: string;
258
        fieldId: string;
259
      }[];
260

261
      const newResult = [...results].map((res) => {
7✔
262
        const newRes = { ...res };
95✔
263

264
        excludeDbFieldNames.forEach((header) => {
95✔
265
          delete newRes[header];
570✔
266
        });
570✔
267

268
        return newRes;
95✔
269
      });
95✔
270

271
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
7✔
272

273
      const fkColumns = columnInfo
7✔
274
        .filter(({ name }) => name.startsWith('__fk_'))
7✔
275
        .map(({ name }) => {
7✔
276
          return fieldIdMap[name.slice(5)]
×
277
            ? `__fk_${fieldIdMap[name.slice(5)]}`
×
278
            : fkMap[name] || name;
×
279
        });
×
280

281
      const recordsToInsert = newResult.map((result) => {
7✔
282
        const res = { ...result };
95✔
283
        Object.entries(res).forEach(([key, value]) => {
95✔
284
          if (res[key] === '') {
972✔
285
            res[key] = null;
356✔
286
          }
356✔
287

288
          // filter unnecessary columns
972✔
289
          if (key.startsWith('__fk_') && !fkColumns.includes(key)) {
972!
290
            delete res[key];
×
291
          }
×
292

293
          // attachment field should add info to attachments table
972✔
294
          if (attachmentsDbFieldNames.includes(key) && value) {
972!
295
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
296
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
297
            attValues.forEach((att) => {
×
298
              const attachmentId = generateAttachmentId();
×
299
              attachmentsTableData.push({
×
300
                attachmentId,
×
301
                name: att.name,
×
302
                token: att.token,
×
303
                tableId: tableId,
×
304
                recordId: res['__id'] as string,
×
305
                fieldId: fieldIdMap[fieldId!],
×
306
              });
×
307
            });
×
308
          }
×
309
        });
972✔
310

311
        // default value set
95✔
312
        res['__created_by'] = userId;
95✔
313
        res['__version'] = 1;
95✔
314
        return res;
95✔
315
      });
95✔
316

317
      // add lacking view order field
7✔
318
      if (recordsToInsert.length) {
7✔
319
        const sourceColumns = Object.keys(recordsToInsert[0]);
7✔
320
        const lackingColumns = sourceColumns
7✔
321
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
7✔
322
          .filter((name) => name.startsWith('__row_'));
7✔
323

324
        for (const name of lackingColumns) {
7!
325
          const sql = this.knex.schema
×
326
            .alterTable(dbTableName, (table) => {
×
327
              table.double(name);
×
328
            })
×
329
            .toQuery();
×
330
          await prisma.$executeRawUnsafe(sql);
×
331
        }
×
332
      }
7✔
333

334
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
7✔
335
      await prisma.$executeRawUnsafe(sql);
7✔
336
      await this.updateAttachmentTable(userId, attachmentsTableData);
7✔
337
    });
7✔
338

339
    // add foreign keys, do not in one transaction with deleting foreign keys
7✔
340
    for (const {
7✔
341
      constraint_name,
7✔
342
      column_name,
7✔
343
      dbTableName,
7✔
344
      referenced_table_schema: referencedTableSchema,
7✔
345
      referenced_table_name: referencedTableName,
7✔
346
      referenced_column_name: referencedColumnName,
7✔
347
    } of allForeignKeyInfos) {
7!
348
      const addForeignKeyQuery = this.knex.schema
×
349
        .alterTable(dbTableName, (table) => {
×
350
          table
×
351
            .foreign(column_name, constraint_name)
×
352
            .references(referencedColumnName)
×
353
            .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
354
        })
×
355
        .toQuery();
×
356
      await this.prismaService.$executeRawUnsafe(addForeignKeyQuery);
×
357
    }
×
358
  }
7✔
359

360
  // when insert table data relative to attachment, we need to update the attachment table
111✔
361
  private async updateAttachmentTable(
111✔
362
    userId: string,
7✔
363
    attachmentsTableData: {
7✔
364
      attachmentId: string;
365
      name: string;
366
      token: string;
367
      tableId: string;
368
      recordId: string;
369
      fieldId: string;
370
    }[]
7✔
371
  ) {
7✔
372
    await this.prismaService.txClient().attachmentsTable.createMany({
7✔
373
      data: attachmentsTableData.map((a) => ({
7✔
374
        ...a,
×
375
        createdBy: userId,
×
376
      })),
×
377
    });
7✔
378
  }
7✔
379

380
  @OnWorkerEvent('completed')
111✔
381
  async onCompleted(job: Job) {
×
382
    const { fieldIdMap, path, structure, userId } = job.data;
×
383
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
384
      'import_base_junction_csv',
×
385
      {
×
386
        fieldIdMap,
×
387
        path,
×
388
        structure,
×
389
      },
×
390
      {
×
391
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
392
        delay: 2000,
×
393
      }
×
394
    );
395
  }
×
396

397
  private async emitBaseImportAuditLog(job: Job<IBaseImportCsvJob>, recordsLength: number) {
111✔
398
    const { origin, userId, baseId, importBaseRo, logId } = job.data;
7✔
399

400
    await this.cls.run(async () => {
7✔
401
      this.cls.set('origin', origin!);
7✔
402
      this.cls.set('user.id', userId);
7✔
403
      await this.eventEmitterService.emitAsync(Events.TABLE_RECORD_CREATE_RELATIVE, {
7✔
404
        action: CreateRecordAction.BaseImport,
7✔
405
        resourceId: baseId,
7✔
406
        recordCount: recordsLength,
7✔
407
        params: importBaseRo,
7✔
408
        logId,
7✔
409
      });
7✔
410
    });
7✔
411
  }
7✔
412
}
111✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc