• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 16872696657

11 Aug 2025 06:35AM UTC coverage: 81.178% (+0.07%) from 81.107%
16872696657

Pull #1687

github

web-flow
Merge 24494a019 into 09ee7f5b9
Pull Request #1687: feat/button-field

8622 of 9154 branches covered (94.19%)

293 of 311 new or added lines in 18 files covered. (94.21%)

2 existing lines in 1 file now uncovered.

39637 of 48827 relevant lines covered (81.18%)

1730.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.29
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
4✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson } from '@teable/openapi';
8
import { UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import * as unzipper from 'unzipper';
14
import { InjectDbProvider } from '../../../db-provider/db.provider';
15
import { IDbProvider } from '../../../db-provider/db.provider.interface';
16
import StorageAdapter from '../../attachments/plugins/adapter';
17
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
18
import { BatchProcessor } from '../BatchProcessor.class';
19
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
20
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
21
interface IBaseImportCsvJob {
22
  path: string;
23
  userId: string;
24
  tableIdMap: Record<string, string>;
25
  fieldIdMap: Record<string, string>;
26
  viewIdMap: Record<string, string>;
27
  fkMap: Record<string, string>;
28
  structure: IBaseJson;
29
}
30

31
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
4✔
32

33
@Injectable()
34
@Processor(BASE_IMPORT_CSV_QUEUE)
35
export class BaseImportCsvQueueProcessor extends WorkerHost {
4✔
36
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
133✔
37

38
  private processedJobs = new Set<string>();
133✔
39

40
  constructor(
133✔
41
    private readonly prismaService: PrismaService,
133✔
42
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
133✔
43
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
133✔
44
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
133✔
45
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
133✔
46
    @InjectDbProvider() private readonly dbProvider: IDbProvider
133✔
47
  ) {
133✔
48
    super();
133✔
49
  }
133✔
50

51
  public async process(job: Job<IBaseImportCsvJob>) {
133✔
52
    const jobId = String(job.id);
2✔
53
    if (this.processedJobs.has(jobId)) {
2✔
54
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
55
      return;
×
56
    }
×
57

58
    this.processedJobs.add(jobId);
2✔
59

60
    try {
2✔
61
      await this.handleBaseImportCsv(job);
2✔
62
      this.logger.log('import csv parser job completed');
2✔
63
    } catch (error) {
2✔
64
      this.logger.error(
×
65
        `Process base import csv failed: ${(error as Error)?.message}`,
×
66
        (error as Error)?.stack
×
67
      );
68
    }
×
69
  }
2✔
70

71
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>) {
133✔
72
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure, fkMap } = job.data;
2✔
73
    const csvStream = await this.storageAdapter.downloadFile(
2✔
74
      StorageAdapter.getBucket(UploadType.Import),
2✔
75
      path
2✔
76
    );
77

78
    const parser = unzipper.Parse();
2✔
79
    csvStream.pipe(parser);
2✔
80

81
    return new Promise<{ success: boolean }>((resolve, reject) => {
2✔
82
      parser.on('entry', (entry) => {
2✔
83
        const filePath = entry.path;
10✔
84
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
10✔
85
        const isJunction = filePath.includes('junction_');
10✔
86

87
        if (isTable && !isJunction) {
10✔
88
          const tableId = filePath.replace('tables/', '').split('.')[0];
4✔
89
          const table = structure.tables.find((table) => table.id === tableId);
4✔
90
          const attachmentsFields =
4✔
91
            table?.fields
4✔
92
              ?.filter(({ type }) => type === FieldType.Attachment)
4✔
93
              .map(({ dbFieldName, id }) => ({
4✔
94
                dbFieldName,
×
95
                id,
×
96
              })) || [];
4✔
97

98
          const buttonFields =
4✔
99
            table?.fields
4✔
100
              ?.filter(({ type }) => type === FieldType.Button)
4✔
101
              .map(({ dbFieldName, id }) => ({
4✔
NEW
102
                dbFieldName,
×
NEW
103
                id,
×
104
              })) || [];
4✔
105
          const buttonDbFieldNames = buttonFields.map(({ dbFieldName }) => dbFieldName);
4✔
106

107
          const excludeDbFieldNames = [...EXCLUDE_SYSTEM_FIELDS, ...buttonDbFieldNames];
4✔
108
          const batchProcessor = new BatchProcessor<Record<string, unknown>>((chunk) =>
4✔
109
            this.handleChunk(
4✔
110
              chunk,
4✔
111
              {
4✔
112
                tableId: tableIdMap[tableId],
4✔
113
                userId,
4✔
114
                fieldIdMap,
4✔
115
                viewIdMap,
4✔
116
                fkMap,
4✔
117
                attachmentsFields,
4✔
118
              },
4✔
119
              excludeDbFieldNames
4✔
120
            )
121
          );
122

123
          entry
4✔
124
            .pipe(
4✔
125
              csvParser.default({
4✔
126
                // strict: true,
4✔
127
                mapValues: ({ value }) => {
4✔
128
                  return value;
1,140✔
129
                },
1,140✔
130
                mapHeaders: ({ header }) => {
4✔
131
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
52✔
132
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
133
                  }
×
134

135
                  // special case for cross base link fields, there is no map causing the old error link config
52✔
136
                  if (header.startsWith('__fk_')) {
52✔
137
                    return fieldIdMap[header.slice(5)]
×
138
                      ? `__fk_${fieldIdMap[header.slice(5)]}`
×
139
                      : fkMap[header] || header;
×
140
                  }
×
141

142
                  return header;
52✔
143
                },
52✔
144
              })
4✔
145
            )
146
            .pipe(batchProcessor)
4✔
147
            .on('error', (error: Error) => {
4✔
148
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
149
              reject(error);
×
150
            })
×
151
            .on('end', () => {
4✔
152
              this.logger.log(`csv ${tableId} finished`);
×
153
              resolve({ success: true });
×
154
            });
×
155
        } else {
10✔
156
          entry.autodrain();
6✔
157
        }
6✔
158
      });
10✔
159

160
      parser.on('close', () => {
2✔
161
        this.logger.log('import csv parser completed');
2✔
162
        resolve({ success: true });
2✔
163
      });
2✔
164

165
      parser.on('error', (error) => {
2✔
166
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
167
        reject(error);
×
168
      });
×
169
    });
2✔
170
  }
2✔
171

172
  private async handleChunk(
133✔
173
    results: Record<string, unknown>[],
4✔
174
    config: {
4✔
175
      tableId: string;
176
      userId: string;
177
      fieldIdMap: Record<string, string>;
178
      viewIdMap: Record<string, string>;
179
      fkMap: Record<string, string>;
180
      attachmentsFields: { dbFieldName: string; id: string }[];
181
    },
4✔
182
    excludeDbFieldNames: string[]
4✔
183
  ) {
4✔
184
    const { tableId, userId, fieldIdMap, attachmentsFields, fkMap } = config;
4✔
185
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
4✔
186
      where: { id: tableId },
4✔
187
      select: {
4✔
188
        dbTableName: true,
4✔
189
      },
4✔
190
    });
4✔
191

192
    const allForeignKeyInfos = [] as {
4✔
193
      constraint_name: string;
194
      column_name: string;
195
      referenced_table_schema: string;
196
      referenced_table_name: string;
197
      referenced_column_name: string;
198
      dbTableName: string;
199
    }[];
200

201
    await this.prismaService.$tx(async (prisma) => {
4✔
202
      // delete foreign keys if(exist) then duplicate table data
4✔
203
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
4✔
204
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
4✔
205
        {
206
          constraint_name: string;
207
          column_name: string;
208
          referenced_table_schema: string;
209
          referenced_table_name: string;
210
          referenced_column_name: string;
211
        }[]
212
      >(foreignKeysInfoSql);
4✔
213
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
4✔
214
        ...info,
×
215
        dbTableName,
×
216
      }));
×
217
      allForeignKeyInfos.push(...newForeignKeyInfos);
4✔
218

219
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
4✔
220
        const dropForeignKeyQuery = this.knex.schema
×
221
          .alterTable(dbTableName, (table) => {
×
222
            table.dropForeign(column_name, constraint_name);
×
223
          })
×
224
          .toQuery();
×
225

226
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
227
      }
×
228

229
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
4✔
230
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
4✔
231

232
      const attachmentsTableData = [] as {
4✔
233
        attachmentId: string;
234
        name: string;
235
        token: string;
236
        tableId: string;
237
        recordId: string;
238
        fieldId: string;
239
      }[];
240

241
      const newResult = [...results].map((res) => {
4✔
242
        const newRes = { ...res };
88✔
243

244
        excludeDbFieldNames.forEach((header) => {
88✔
245
          delete newRes[header];
528✔
246
        });
528✔
247

248
        return newRes;
88✔
249
      });
88✔
250

251
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
4✔
252

253
      const fkColumns = columnInfo
4✔
254
        .filter(({ name }) => name.startsWith('__fk_'))
4✔
255
        .map(({ name }) => {
4✔
256
          return fieldIdMap[name.slice(5)]
×
257
            ? `__fk_${fieldIdMap[name.slice(5)]}`
×
258
            : fkMap[name] || name;
×
259
        });
×
260

261
      const recordsToInsert = newResult.map((result) => {
4✔
262
        const res = { ...result };
88✔
263
        Object.entries(res).forEach(([key, value]) => {
88✔
264
          if (res[key] === '') {
1,140✔
265
            res[key] = null;
428✔
266
          }
428✔
267

268
          // filter unnecessary columns
1,140✔
269
          if (key.startsWith('__fk_') && !fkColumns.includes(key)) {
1,140✔
270
            delete res[key];
×
271
          }
×
272

273
          // attachment field should add info to attachments table
1,140✔
274
          if (attachmentsDbFieldNames.includes(key) && value) {
1,140✔
275
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
276
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
277
            attValues.forEach((att) => {
×
278
              const attachmentId = generateAttachmentId();
×
279
              attachmentsTableData.push({
×
280
                attachmentId,
×
281
                name: att.name,
×
282
                token: att.token,
×
283
                tableId: tableId,
×
284
                recordId: res['__id'] as string,
×
285
                fieldId: fieldIdMap[fieldId!],
×
286
              });
×
287
            });
×
288
          }
×
289
        });
1,140✔
290

291
        // default value set
88✔
292
        res['__created_by'] = userId;
88✔
293
        res['__version'] = 1;
88✔
294
        return res;
88✔
295
      });
88✔
296

297
      // add lacking view order field
4✔
298
      if (recordsToInsert.length) {
4✔
299
        const sourceColumns = Object.keys(recordsToInsert[0]);
4✔
300
        const lackingColumns = sourceColumns
4✔
301
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
4✔
302
          .filter((name) => name.startsWith('__row_'));
4✔
303

304
        for (const name of lackingColumns) {
4✔
305
          const sql = this.knex.schema
×
306
            .alterTable(dbTableName, (table) => {
×
307
              table.double(name);
×
308
            })
×
309
            .toQuery();
×
310
          await prisma.$executeRawUnsafe(sql);
×
311
        }
×
312
      }
4✔
313

314
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
4✔
315
      await prisma.$executeRawUnsafe(sql);
4✔
316
      await this.updateAttachmentTable(userId, attachmentsTableData);
4✔
317
    });
4✔
318

319
    // add foreign keys, do not in one transaction with deleting foreign keys
4✔
320
    for (const {
4✔
321
      constraint_name,
4✔
322
      column_name,
4✔
323
      dbTableName,
4✔
324
      referenced_table_schema: referencedTableSchema,
4✔
325
      referenced_table_name: referencedTableName,
4✔
326
      referenced_column_name: referencedColumnName,
4✔
327
    } of allForeignKeyInfos) {
4✔
328
      const addForeignKeyQuery = this.knex.schema
×
329
        .alterTable(dbTableName, (table) => {
×
330
          table
×
331
            .foreign(column_name, constraint_name)
×
332
            .references(referencedColumnName)
×
333
            .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
334
        })
×
335
        .toQuery();
×
336
      await this.prismaService.$executeRawUnsafe(addForeignKeyQuery);
×
337
    }
×
338
  }
4✔
339

340
  // when insert table data relative to attachment, we need to update the attachment table
133✔
341
  private async updateAttachmentTable(
133✔
342
    userId: string,
4✔
343
    attachmentsTableData: {
4✔
344
      attachmentId: string;
345
      name: string;
346
      token: string;
347
      tableId: string;
348
      recordId: string;
349
      fieldId: string;
350
    }[]
4✔
351
  ) {
4✔
352
    await this.prismaService.txClient().attachmentsTable.createMany({
4✔
353
      data: attachmentsTableData.map((a) => ({
4✔
354
        ...a,
×
355
        createdBy: userId,
×
356
      })),
×
357
    });
4✔
358
  }
4✔
359

360
  @OnWorkerEvent('completed')
133✔
361
  async onCompleted(job: Job) {
×
362
    const { fieldIdMap, path, structure, userId } = job.data;
×
363
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
364
      'import_base_junction_csv',
×
365
      {
×
366
        fieldIdMap,
×
367
        path,
×
368
        structure,
×
369
      },
×
370
      {
×
371
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
372
        delay: 2000,
×
373
      }
×
374
    );
375
  }
×
376
}
133✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc