• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14828572373

05 May 2025 03:40AM CUT coverage: 80.511% (-0.001%) from 80.512%
14828572373

Pull #1504

github

web-flow
Merge cc5349b00 into f701856cc
Pull Request #1504: feat: multi-line field names & batch collapse by group

7698 of 8166 branches covered (94.27%)

22 of 23 new or added lines in 1 file covered. (95.65%)

5 existing lines in 2 files now uncovered.

36829 of 45744 relevant lines covered (80.51%)

1759.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.35
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
4✔
2
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import type { IAttachmentCellValue } from '@teable/core';
5
import { FieldType, generateAttachmentId } from '@teable/core';
6
import { PrismaService } from '@teable/db-main-prisma';
7
import type { IBaseJson } from '@teable/openapi';
8
import { UploadType } from '@teable/openapi';
9
import { Queue, Job } from 'bullmq';
10
import * as csvParser from 'csv-parser';
11
import { Knex } from 'knex';
12
import { InjectModel } from 'nest-knexjs';
13
import * as unzipper from 'unzipper';
14
import { InjectDbProvider } from '../../../db-provider/db.provider';
15
import { IDbProvider } from '../../../db-provider/db.provider.interface';
16
import StorageAdapter from '../../attachments/plugins/adapter';
17
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
18
import { BatchProcessor } from '../BatchProcessor.class';
19
import { EXCLUDE_SYSTEM_FIELDS } from '../constant';
20
import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.processor';
21
interface IBaseImportCsvJob {
22
  path: string;
23
  userId: string;
24
  tableIdMap: Record<string, string>;
25
  fieldIdMap: Record<string, string>;
26
  viewIdMap: Record<string, string>;
27
  fkMap: Record<string, string>;
28
  structure: IBaseJson;
29
}
30

31
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
4✔
32

33
@Injectable()
34
@Processor(BASE_IMPORT_CSV_QUEUE)
35
export class BaseImportCsvQueueProcessor extends WorkerHost {
4✔
36
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
125✔
37

38
  private processedJobs = new Set<string>();
125✔
39

40
  constructor(
125✔
41
    private readonly prismaService: PrismaService,
125✔
42
    private readonly baseImportJunctionCsvQueueProcessor: BaseImportJunctionCsvQueueProcessor,
125✔
43
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
125✔
44
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
125✔
45
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>,
125✔
46
    @InjectDbProvider() private readonly dbProvider: IDbProvider
125✔
47
  ) {
125✔
48
    super();
125✔
49
  }
125✔
50

51
  public async process(job: Job<IBaseImportCsvJob>) {
125✔
52
    const jobId = String(job.id);
2✔
53
    if (this.processedJobs.has(jobId)) {
2✔
54
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
55
      return;
×
56
    }
×
57

58
    this.processedJobs.add(jobId);
2✔
59

60
    try {
2✔
61
      await this.handleBaseImportCsv(job);
2✔
62
      this.logger.log('import csv parser job completed');
2✔
63
    } catch (error) {
2✔
64
      this.logger.error(
×
65
        `Process base import csv failed: ${(error as Error)?.message}`,
×
66
        (error as Error)?.stack
×
67
      );
68
    }
×
69
  }
2✔
70

71
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>) {
125✔
72
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure, fkMap } = job.data;
2✔
73
    const csvStream = await this.storageAdapter.downloadFile(
2✔
74
      StorageAdapter.getBucket(UploadType.Import),
2✔
75
      path
2✔
76
    );
77

78
    const parser = unzipper.Parse();
2✔
79
    csvStream.pipe(parser);
2✔
80

81
    return new Promise<{ success: boolean }>((resolve, reject) => {
2✔
82
      parser.on('entry', (entry) => {
2✔
83
        const filePath = entry.path;
10✔
84
        const isTable = filePath.startsWith('tables/') && entry.type !== 'Directory';
10✔
85
        const isJunction = filePath.includes('junction_');
10✔
86

87
        if (isTable && !isJunction) {
10✔
88
          const tableId = filePath.replace('tables/', '').split('.')[0];
4✔
89
          const table = structure.tables.find((table) => table.id === tableId);
4✔
90
          const attachmentsFields =
4✔
91
            table?.fields
4✔
92
              ?.filter(({ type }) => type === FieldType.Attachment)
4✔
93
              .map(({ dbFieldName, id }) => ({
4✔
94
                dbFieldName,
×
95
                id,
×
96
              })) || [];
4✔
97

98
          const batchProcessor = new BatchProcessor<Record<string, unknown>>((chunk) =>
4✔
99
            this.handleChunk(chunk, {
4✔
100
              tableId: tableIdMap[tableId],
4✔
101
              userId,
4✔
102
              fieldIdMap,
4✔
103
              viewIdMap,
4✔
104
              fkMap,
4✔
105
              attachmentsFields,
4✔
106
            })
4✔
107
          );
108

109
          entry
4✔
110
            .pipe(
4✔
111
              csvParser.default({
4✔
112
                // strict: true,
4✔
113
                mapValues: ({ value }) => {
4✔
114
                  return value;
1,052✔
115
                },
1,052✔
116
                mapHeaders: ({ header }) => {
4✔
117
                  if (header.startsWith('__row_') && viewIdMap[header.slice(6)]) {
48✔
118
                    return `__row_${viewIdMap[header.slice(6)]}`;
×
119
                  }
×
120

121
                  // special case for cross base link fields, there is no map causing the old error link config
48✔
122
                  if (header.startsWith('__fk_')) {
48✔
123
                    return fieldIdMap[header.slice(5)]
×
124
                      ? `__fk_${fieldIdMap[header.slice(5)]}`
×
125
                      : fkMap[header] || header;
×
126
                  }
×
127

128
                  return header;
48✔
129
                },
48✔
130
              })
4✔
131
            )
132
            .pipe(batchProcessor)
4✔
133
            .on('error', (error: Error) => {
4✔
UNCOV
134
              this.logger.error(`import csv import error: ${error.message}`, error.stack);
×
UNCOV
135
              reject(error);
×
UNCOV
136
            })
×
137
            .on('end', () => {
4✔
138
              this.logger.log(`csv ${tableId} finished`);
×
139
              resolve({ success: true });
×
140
            });
×
141
        } else {
10✔
142
          entry.autodrain();
6✔
143
        }
6✔
144
      });
10✔
145

146
      parser.on('close', () => {
2✔
147
        this.logger.log('import csv parser completed');
2✔
148
        resolve({ success: true });
2✔
149
      });
2✔
150

151
      parser.on('error', (error) => {
2✔
152
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
153
        reject(error);
×
154
      });
×
155
    });
2✔
156
  }
2✔
157

158
  private async handleChunk(
125✔
159
    results: Record<string, unknown>[],
4✔
160
    config: {
4✔
161
      tableId: string;
162
      userId: string;
163
      fieldIdMap: Record<string, string>;
164
      viewIdMap: Record<string, string>;
165
      fkMap: Record<string, string>;
166
      attachmentsFields: { dbFieldName: string; id: string }[];
167
    }
4✔
168
  ) {
4✔
169
    const { tableId, userId, fieldIdMap, attachmentsFields, fkMap } = config;
4✔
170
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
4✔
171
      where: { id: tableId },
4✔
172
      select: {
4✔
173
        dbTableName: true,
4✔
174
      },
4✔
175
    });
4✔
176

177
    const allForeignKeyInfos = [] as {
4✔
178
      constraint_name: string;
179
      column_name: string;
180
      referenced_table_schema: string;
181
      referenced_table_name: string;
182
      referenced_column_name: string;
183
      dbTableName: string;
184
    }[];
185

186
    await this.prismaService.$tx(async (prisma) => {
4✔
187
      // delete foreign keys if(exist) then duplicate table data
4✔
188
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(dbTableName);
4✔
189
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
4✔
190
        {
191
          constraint_name: string;
192
          column_name: string;
193
          referenced_table_schema: string;
194
          referenced_table_name: string;
195
          referenced_column_name: string;
196
        }[]
197
      >(foreignKeysInfoSql);
4✔
198
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
4✔
199
        ...info,
×
200
        dbTableName,
×
201
      }));
×
202
      allForeignKeyInfos.push(...newForeignKeyInfos);
4✔
203

204
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
4✔
205
        const dropForeignKeyQuery = this.knex.schema
×
206
          .alterTable(dbTableName, (table) => {
×
207
            table.dropForeign(column_name, constraint_name);
×
208
          })
×
209
          .toQuery();
×
210

211
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
212
      }
×
213

214
      const columnInfoQuery = this.dbProvider.columnInfo(dbTableName);
4✔
215
      const columnInfo = await prisma.$queryRawUnsafe<{ name: string }[]>(columnInfoQuery);
4✔
216

217
      const attachmentsTableData = [] as {
4✔
218
        attachmentId: string;
219
        name: string;
220
        token: string;
221
        tableId: string;
222
        recordId: string;
223
        fieldId: string;
224
      }[];
225

226
      const newResult = [...results].map((res) => {
4✔
227
        const newRes = { ...res };
88✔
228

229
        EXCLUDE_SYSTEM_FIELDS.forEach((header) => {
88✔
230
          delete newRes[header];
528✔
231
        });
528✔
232

233
        return newRes;
88✔
234
      });
88✔
235

236
      const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
4✔
237

238
      const fkColumns = columnInfo
4✔
239
        .filter(({ name }) => name.startsWith('__fk_'))
4✔
240
        .map(({ name }) => {
4✔
241
          return fieldIdMap[name.slice(5)]
×
242
            ? `__fk_${fieldIdMap[name.slice(5)]}`
×
243
            : fkMap[name] || name;
×
244
        });
×
245

246
      const recordsToInsert = newResult.map((result) => {
4✔
247
        const res = { ...result };
88✔
248
        Object.entries(res).forEach(([key, value]) => {
88✔
249
          if (res[key] === '') {
1,052✔
250
            res[key] = null;
402✔
251
          }
402✔
252

253
          // filter unnecessary columns
1,052✔
254
          if (key.startsWith('__fk_') && !fkColumns.includes(key)) {
1,052✔
255
            delete res[key];
×
256
          }
×
257

258
          // attachment field should add info to attachments table
1,052✔
259
          if (attachmentsDbFieldNames.includes(key) && value) {
1,052✔
260
            const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
261
            const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
262
            attValues.forEach((att) => {
×
263
              const attachmentId = generateAttachmentId();
×
264
              attachmentsTableData.push({
×
265
                attachmentId,
×
266
                name: att.name,
×
267
                token: att.token,
×
268
                tableId: tableId,
×
269
                recordId: res['__id'] as string,
×
270
                fieldId: fieldIdMap[fieldId!],
×
271
              });
×
272
            });
×
273
          }
×
274
        });
1,052✔
275

276
        // default value set
88✔
277
        res['__created_by'] = userId;
88✔
278
        res['__version'] = 1;
88✔
279
        return res;
88✔
280
      });
88✔
281

282
      // add lacking view order field
4✔
283
      if (recordsToInsert.length) {
4✔
284
        const sourceColumns = Object.keys(recordsToInsert[0]);
4✔
285
        const lackingColumns = sourceColumns
4✔
286
          .filter((column) => !columnInfo.map(({ name }) => name).includes(column))
4✔
287
          .filter((name) => name.startsWith('__row_'));
4✔
288

289
        for (const name of lackingColumns) {
4✔
290
          const sql = this.knex.schema
×
291
            .alterTable(dbTableName, (table) => {
×
292
              table.double(name);
×
293
            })
×
294
            .toQuery();
×
295
          await prisma.$executeRawUnsafe(sql);
×
296
        }
×
297
      }
4✔
298

299
      const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
4✔
300
      await prisma.$executeRawUnsafe(sql);
4✔
301
      await this.updateAttachmentTable(userId, attachmentsTableData);
4✔
302
    });
4✔
303

304
    // add foreign keys, do not in one transaction with deleting foreign keys
4✔
305
    for (const {
4✔
306
      constraint_name,
4✔
307
      column_name,
4✔
308
      dbTableName,
4✔
309
      referenced_table_schema: referencedTableSchema,
4✔
310
      referenced_table_name: referencedTableName,
4✔
311
      referenced_column_name: referencedColumnName,
4✔
312
    } of allForeignKeyInfos) {
4✔
313
      const addForeignKeyQuery = this.knex.schema
×
314
        .alterTable(dbTableName, (table) => {
×
315
          table
×
316
            .foreign(column_name, constraint_name)
×
317
            .references(referencedColumnName)
×
318
            .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
319
        })
×
320
        .toQuery();
×
321
      await this.prismaService.$executeRawUnsafe(addForeignKeyQuery);
×
322
    }
×
323
  }
4✔
324

325
  // when insert table data relative to attachment, we need to update the attachment table
125✔
326
  private async updateAttachmentTable(
125✔
327
    userId: string,
4✔
328
    attachmentsTableData: {
4✔
329
      attachmentId: string;
330
      name: string;
331
      token: string;
332
      tableId: string;
333
      recordId: string;
334
      fieldId: string;
335
    }[]
4✔
336
  ) {
4✔
337
    await this.prismaService.txClient().attachmentsTable.createMany({
4✔
338
      data: attachmentsTableData.map((a) => ({
4✔
339
        ...a,
×
340
        createdBy: userId,
×
341
      })),
×
342
    });
4✔
343
  }
4✔
344

345
  @OnWorkerEvent('completed')
125✔
346
  async onCompleted(job: Job) {
×
347
    const { fieldIdMap, path, structure, userId } = job.data;
×
348
    await this.baseImportJunctionCsvQueueProcessor.queue.add(
×
349
      'import_base_junction_csv',
×
350
      {
×
351
        fieldIdMap,
×
352
        path,
×
353
        structure,
×
354
      },
×
355
      {
×
356
        jobId: `import_base_junction_csv_${path}_${userId}`,
×
357
        delay: 2000,
×
358
      }
×
359
    );
360
  }
×
361
}
125✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc