• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14401627835

11 Apr 2025 10:59AM UTC coverage: 80.478% (-0.2%) from 80.639%
14401627835

push

github

web-flow
fix: duplicate primary dependent field base (#1432)

* fix: import base foreign key records insert

* fix: duplicate error when primary field is formula

* chore: update i18n template relative

* fix: export base email format

* perf: adjust export base notification css

* fix: template table css error

7631 of 8095 branches covered (94.27%)

259 of 392 new or added lines in 7 files covered. (66.07%)

67 existing lines in 4 files now uncovered.

36302 of 45108 relevant lines covered (80.48%)

1755.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
4✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson } from '@teable/openapi';
8
import { UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import * as unzipper from 'unzipper';
14
import { InjectDbProvider } from '../../../db-provider/db.provider';
15
import { IDbProvider } from '../../../db-provider/db.provider.interface';
16
import StorageAdapter from '../../attachments/plugins/adapter';
17
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
18
import { BatchProcessor } from '../BatchProcessor.class';
19
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
20
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
21
interface IBaseImportCsvJob {
22
  path: string;
23
  userId: string;
24
  tableIdMap: Record<string, string>;
25
  fieldIdMap: Record<string, string>;
26
  viewIdMap: Record<string, string>;
27
  structure: IBaseJson;
28
}
29

30
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
4✔
31

32
@Injectable()
33
@Processor(BASE_IMPORT_CSV_QUEUE)
34
export class BaseImportCsvQueueProcessor extends WorkerHost {
4✔
35
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
125✔
36

37
  private processedJobs = new Set<string>();
125✔
38

39
  constructor(
125✔
40
    private readonly prismaService: PrismaService,
125✔
41
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
125✔
42
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
125✔
43
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
125✔
44
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
125✔
45
    @InjectDbProvider() private readonly dbProvider: IDbProvider
125✔
46
  ) {
125✔
47
    super();
125✔
48
  }
125✔
49

50
  public async process(job: Job<IBaseImportCsvJob>) {
125✔
51
    const jobId = String(job.id);
2✔
52
    if (this.processedJobs.has(jobId)) {
2✔
53
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
54
      return;
×
55
    }
×
56

57
    this.processedJobs.add(jobId);
2✔
58

59
    try {
2✔
60
      await this.handleBaseImportCsv(job);
2✔
61
      this.logger.log('import csv parser job completed');
2✔
62
    } catch (error) {
2✔
63
      this.logger.error(
×
64
        `Process base import csv failed: ${(error as Error)?.message}`,
×
65
        (error as Error)?.stack
×
66
      );
67
    }
×
68
  }
2✔
69

70
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>) {
125✔
71
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure } = job.data;
2✔
72
    const csvStream = await this.storageAdapter.downloadFile(
2✔
73
      StorageAdapter.getBucket(UploadType.Import),
2✔
74
      path
2✔
75
    );
76

77
    const parser = unzipper.Parse();
2✔
78
    csvStream.pipe(parser);
2✔
79

80
    return new Promise<{ success: boolean }>((resolve, reject) => {
2✔
81
      parser.on('entry', (entry) => {
2✔
82
        const filePath = entry.path;
10✔
83
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
10✔
84
        const isJunction = filePath.includes('junction_');
10✔
85

86
        if (isTable && !isJunction) {
10✔
87
          const tableId = filePath.replace('tables/', '').split('.')[0];
4✔
88
          const table = structure.tables.find((table) => table.id === tableId);
4✔
89
          const attachmentsFields =
4✔
90
            table?.fields
4✔
91
              ?.filter(({ type }) => type === FieldType.Attachment)
4✔
92
              .map(({ dbFieldName, id }) => ({
4✔
93
                dbFieldName,
×
94
                id,
×
95
              })) || [];
4✔
96

97
          const batchProcessor = new BatchProcessor<Record<string, unknown>>((chunk) =>
4✔
98
            this.handleChunk(chunk, {
4✔
99
              tableId: tableIdMap[tableId],
4✔
100
              userId,
4✔
101
              fieldIdMap,
4✔
102
              viewIdMap,
4✔
103
              attachmentsFields,
4✔
104
            })
4✔
105
          );
106

107
          entry
4✔
108
            .pipe(
4✔
109
              csvParser.default({
4✔
110
                // strict: true,
4✔
111
                mapValues: ({ value }) => {
4✔
112
                  return value;
1,052✔
113
                },
1,052✔
114
                mapHeaders: ({ header }) => {
4✔
115
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
48✔
116
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
117
                  }
×
118

119
                  // special case for cross base link fields, there is no map causing the old error link config
48✔
120
                  if (header.startsWith('__fk_') && fieldIdMap[header.slice(5)]) {
48✔
121
                    return `__fk_${fieldIdMap[header.slice(5)]}`;
×
122
                  }
×
123
                  return header;
48✔
124
                },
48✔
125
              })
4✔
126
            )
127
            .pipe(batchProcessor)
4✔
128
            .on('error', (error: Error) => {
4✔
129
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
130
              reject(error);
×
131
            })
×
132
            .on('end', () => {
4✔
133
              this.logger.log(`csv ${tableId} finished`);
×
134
              resolve({ success: true });
×
135
            });
×
136
        } else {
10✔
137
          entry.autodrain();
6✔
138
        }
6✔
139
      });
10✔
140

141
      parser.on('close', () => {
2✔
142
        this.logger.log('import csv parser completed');
2✔
143
        resolve({ success: true });
2✔
144
      });
2✔
145

146
      parser.on('error', (error) => {
2✔
147
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
148
        reject(error);
×
149
      });
×
150
    });
2✔
151
  }
2✔
152

153
  private async handleChunk(
125✔
154
    results: Record<string, unknown>[],
4✔
155
    config: {
4✔
156
      tableId: string;
157
      userId: string;
158
      fieldIdMap: Record<string, string>;
159
      viewIdMap: Record<string, string>;
160
      attachmentsFields: { dbFieldName: string; id: string }[];
161
    }
4✔
162
  ) {
4✔
163
    const { tableId, userId, fieldIdMap, attachmentsFields } = config;
4✔
164
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
4✔
165
      where: { id: tableId },
4✔
166
      select: {
4✔
167
        dbTableName: true,
4✔
168
      },
4✔
169
    });
4✔
170

171
    await this.prismaService.$tx(async (prisma) => {
4✔
172
      const allForeignKeyInfos = [] as {
4✔
173
        constraint_name: string;
174
        column_name: string;
175
        referenced_table_schema: string;
176
        referenced_table_name: string;
177
        referenced_column_name: string;
178
        dbTableName: string;
179
      }[];
180

181
      // delete foreign keys if(exist) then duplicate table data
4✔
182
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
4✔
183
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
4✔
184
        {
185
          constraint_name: string;
186
          column_name: string;
187
          referenced_table_schema: string;
188
          referenced_table_name: string;
189
          referenced_column_name: string;
190
        }[]
191
      >(foreignKeysInfoSql);
4✔
192
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
4✔
NEW
193
        ...info,
×
NEW
194
        dbTableName,
×
NEW
195
      }));
×
196
      allForeignKeyInfos.push(...newForeignKeyInfos);
4✔
197

198
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
4✔
NEW
199
        const dropForeignKeyQuery = this.knex.schema
×
NEW
200
          .alterTable(dbTableName, (table) => {
×
NEW
201
            table.dropForeign(column_name, constraint_name);
×
NEW
202
          })
×
NEW
203
          .toQuery();
×
204

NEW
205
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
NEW
206
      }
×
207

208
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
4✔
209
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
4✔
210

211
      const attachmentsTableData = [] as {
4✔
212
        attachmentId: string;
213
        name: string;
214
        token: string;
215
        tableId: string;
216
        recordId: string;
217
        fieldId: string;
218
      }[];
219

220
      const newResult = [...results].map((res) => {
4✔
221
        const newRes = { ...res };
88✔
222

223
        EXCLUDE_SYSTEM_FIELDS.forEach((header) => {
88✔
224
          delete newRes[header];
528✔
225
        });
528✔
226

227
        return newRes;
88✔
228
      });
88✔
229

230
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
4✔
231

232
      const recordsToInsert = newResult.map((result) => {
4✔
233
        const res = { ...result };
88✔
234
        Object.entries(res).forEach(([key, value]) => {
88✔
235
          if (res[key] === '') {
1,052✔
236
            res[key] = null;
402✔
237
          }
402✔
238

239
          // attachment field should add info to attachments table
1,052✔
240
          if (attachmentsDbFieldNames.includes(key) && value) {
1,052✔
NEW
241
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
NEW
242
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
NEW
243
            attValues.forEach((att) => {
×
NEW
244
              const attachmentId = generateAttachmentId();
×
NEW
245
              attachmentsTableData.push({
×
NEW
246
                attachmentId,
×
NEW
247
                name: att.name,
×
NEW
248
                token: att.token,
×
NEW
249
                tableId: tableId,
×
NEW
250
                recordId: res['__id'] as string,
×
NEW
251
                fieldId: fieldIdMap[fieldId!],
×
NEW
252
              });
×
253
            });
×
NEW
254
          }
×
255
        });
1,052✔
256

257
        // default value set
88✔
258
        res['__created_by'] = userId;
88✔
259
        res['__version'] = 1;
88✔
260
        return res;
88✔
261
      });
88✔
262

263
      // add lacking view order field
4✔
264
      if (recordsToInsert.length) {
4✔
265
        const sourceColumns = Object.keys(recordsToInsert[0]);
4✔
266
        const lackingColumns = sourceColumns
4✔
267
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
4✔
268
          .filter((name) => name.startsWith('__row_'));
4✔
269

270
        for (const name of lackingColumns) {
4✔
NEW
271
          const sql = this.knex.schema
×
NEW
272
            .alterTable(dbTableName, (table) => {
×
NEW
273
              table.double(name);
×
NEW
274
            })
×
NEW
275
            .toQuery();
×
NEW
276
          await prisma.$executeRawUnsafe(sql);
×
NEW
277
        }
×
278
      }
4✔
279

280
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
4✔
281
      await prisma.$executeRawUnsafe(sql);
4✔
282
      await this.updateAttachmentTable(userId, attachmentsTableData);
4✔
283

284
      // add foreign keys
4✔
285
      for (const {
4✔
286
        constraint_name,
4✔
287
        column_name,
4✔
288
        dbTableName,
4✔
289
        referenced_table_schema: referencedTableSchema,
4✔
290
        referenced_table_name: referencedTableName,
4✔
291
        referenced_column_name: referencedColumnName,
4✔
292
      } of allForeignKeyInfos) {
4✔
NEW
293
        const addForeignKeyQuery = this.knex.schema
×
294
          .alterTable(dbTableName, (table) => {
×
NEW
295
            table
×
NEW
296
              .foreign(column_name, constraint_name)
×
NEW
297
              .references(referencedColumnName)
×
NEW
298
              .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
299
          })
×
300
          .toQuery();
×
NEW
301
        await prisma.$executeRawUnsafe(addForeignKeyQuery);
×
302
      }
×
303
    });
4✔
304
  }
4✔
305

306
  // when insert table data relative to attachment, we need to update the attachment table
125✔
307
  private async updateAttachmentTable(
125✔
308
    userId: string,
4✔
309
    attachmentsTableData: {
4✔
310
      attachmentId: string;
311
      name: string;
312
      token: string;
313
      tableId: string;
314
      recordId: string;
315
      fieldId: string;
316
    }[]
4✔
317
  ) {
4✔
318
    await this.prismaService.txClient().attachmentsTable.createMany({
4✔
319
      data: attachmentsTableData.map((a) => ({
4✔
320
        ...a,
×
321
        createdBy: userId,
×
322
      })),
×
323
    });
4✔
324
  }
4✔
325

326
  @OnWorkerEvent('completed')
125✔
327
  async onCompleted(job: Job) {
×
328
    const { fieldIdMap, path, structure, userId } = job.data;
×
329
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
330
      'import_base_junction_csv',
×
331
      {
×
332
        fieldIdMap,
×
333
        path,
×
334
        structure,
×
335
      },
×
336
      {
×
337
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
338
        delay: 2000,
×
339
      }
×
340
    );
341
  }
×
342
}
125✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc