• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14401627835

11 Apr 2025 10:59AM UTC coverage: 80.478% (-0.2%) from 80.639%
14401627835

push

github

web-flow
fix: duplicate primary dependent field base (#1432)

* fix: import base foreign key records insert

* fix: duplicate error when primary field is formula

* chore: update i18n template relative

* fix: export base email format

* perf: adjust export base notification css

* fix: template table css error

7631 of 8095 branches covered (94.27%)

259 of 392 new or added lines in 7 files covered. (66.07%)

67 existing lines in 4 files now uncovered.

36302 of 45108 relevant lines covered (80.48%)

1755.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.22
/apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction.processor.ts
1
/* eslint-disable @typescript-eslint/naming-convention */
4✔
2
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
3
import { Injectable, Logger } from '@nestjs/common';
4
import {
5
  PrismaClientKnownRequestError,
6
  PrismaClientUnknownRequestError,
7
} from '@prisma/client/runtime/library';
8
import type { ILinkFieldOptions } from '@teable/core';
9
import { FieldType } from '@teable/core';
10
import { PrismaService } from '@teable/db-main-prisma';
11
import type { IBaseJson } from '@teable/openapi';
12
import { UploadType } from '@teable/openapi';
13
import type { Job } from 'bullmq';
14
import { Queue } from 'bullmq';
15
import * as csvParser from 'csv-parser';
16
import { Knex } from 'knex';
17
import { InjectModel } from 'nest-knexjs';
18
import * as unzipper from 'unzipper';
19
import { InjectDbProvider } from '../../../db-provider/db.provider';
20
import { IDbProvider } from '../../../db-provider/db.provider.interface';
21
import StorageAdapter from '../../attachments/plugins/adapter';
22
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
23
import { createFieldInstanceByRaw } from '../../field/model/factory';
24
import { BatchProcessor } from '../BatchProcessor.class';
25

26
interface IBaseImportJunctionCsvJob {
27
  path: string;
28
  fieldIdMap: Record<string, string>;
29
  structure: IBaseJson;
30
}
31

32
export const BASE_IMPORT_JUNCTION_CSV_QUEUE = 'base-import-junction-csv-queue';
4✔
33

34
@Injectable()
35
@Processor(BASE_IMPORT_JUNCTION_CSV_QUEUE)
36
export class BaseImportJunctionCsvQueueProcessor extends WorkerHost {
4✔
37
  private logger = new Logger(BaseImportJunctionCsvQueueProcessor.name);
125✔
38
  private processedJobs = new Set<string>();
125✔
39

40
  constructor(
125✔
41
    private readonly prismaService: PrismaService,
125✔
42
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
125✔
43
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
125✔
44
    @InjectQueue(BASE_IMPORT_JUNCTION_CSV_QUEUE)
125✔
45
    public readonly queue: Queue<IBaseImportJunctionCsvJob>,
125✔
46
    @InjectDbProvider() private readonly dbProvider: IDbProvider
125✔
47
  ) {
125✔
48
    super();
125✔
49
  }
125✔
50

51
  public async process(job: Job<IBaseImportJunctionCsvJob>) {
125✔
52
    const jobId = String(job.id);
×
53
    if (this.processedJobs.has(jobId)) {
×
54
      this.logger.log(`Job ${jobId} already processed, skipping`);
×
55
      return;
×
56
    }
×
57

58
    this.processedJobs.add(jobId);
×
59

60
    const { path, fieldIdMap, structure } = job.data;
×
61

62
    try {
×
63
      await this.importJunctionChunk(path, fieldIdMap, structure);
×
64
    } catch (error) {
×
65
      this.logger.error(
×
66
        `Process base import junction csv failed: ${(error as Error)?.message}`,
×
67
        (error as Error)?.stack
×
68
      );
69
    }
×
70
  }
×
71

72
  private async importJunctionChunk(
125✔
73
    path: string,
×
74
    fieldIdMap: Record<string, string>,
×
75
    structure: IBaseJson
×
76
  ) {
×
77
    const csvStream = await this.storageAdapter.downloadFile(
×
78
      StorageAdapter.getBucket(UploadType.Import),
×
79
      path
×
80
    );
81

82
    const sourceLinkFields = structure.tables
×
83
      .map(({ fields }) => fields)
×
84
      .flat()
×
85
      .filter((f) => f.type === FieldType.Link && !f.isLookup);
×
86

87
    const linkFieldRaws = await this.prismaService.field.findMany({
×
88
      where: {
×
89
        id: {
×
90
          in: Object.values(fieldIdMap),
×
91
        },
×
92
        type: FieldType.Link,
×
93
        isLookup: null,
×
94
      },
×
95
    });
×
96

97
    const junctionDbTableNameMap = {} as Record<
×
98
      string,
99
      {
100
        sourceSelfKeyName: string;
101
        sourceForeignKeyName: string;
102
        targetSelfKeyName: string;
103
        targetForeignKeyName: string;
104
        targetFkHostTableName: string;
105
      }
106
    >;
107

108
    const linkFieldInstances = linkFieldRaws.map((f) => createFieldInstanceByRaw(f));
×
109

110
    for (const sourceField of sourceLinkFields) {
×
111
      const { options: sourceOptions } = sourceField;
×
112
      const {
×
113
        fkHostTableName: sourceFkHostTableName,
×
114
        selfKeyName: sourceSelfKeyName,
×
115
        foreignKeyName: sourceForeignKeyName,
×
116
      } = sourceOptions as ILinkFieldOptions;
×
117
      const targetField = linkFieldInstances.find((f) => f.id === fieldIdMap[sourceField.id])!;
×
118
      const { options: targetOptions } = targetField;
×
119
      const {
×
120
        fkHostTableName: targetFkHostTableName,
×
121
        selfKeyName: targetSelfKeyName,
×
122
        foreignKeyName: targetForeignKeyName,
×
123
      } = targetOptions as ILinkFieldOptions;
×
124
      if (sourceFkHostTableName.includes('junction_')) {
×
125
        junctionDbTableNameMap[sourceFkHostTableName] = {
×
126
          sourceSelfKeyName,
×
127
          sourceForeignKeyName,
×
128
          targetSelfKeyName,
×
129
          targetForeignKeyName,
×
130
          targetFkHostTableName,
×
131
        };
×
132
      }
×
133
    }
×
134

135
    const parser = unzipper.Parse();
×
136
    csvStream.pipe(parser);
×
137

138
    const processedFiles = new Set<string>();
×
139

140
    return new Promise<{ success: boolean }>((resolve, reject) => {
×
141
      parser.on('entry', (entry) => {
×
142
        const filePath = entry.path;
×
143

144
        if (processedFiles.has(filePath)) {
×
145
          entry.autodrain();
×
146
          return;
×
147
        }
×
148
        processedFiles.add(filePath);
×
149

150
        if (
×
151
          filePath.startsWith('tables/') &&
×
152
          entry.type !== 'Directory' &&
×
153
          filePath.includes('junction_')
×
154
        ) {
×
155
          const name = filePath.replace('tables/', '').split('.');
×
156
          name.pop();
×
157
          const junctionTableName = name.join('.');
×
158
          const junctionInfo = junctionDbTableNameMap[junctionTableName];
×
159

160
          const {
×
161
            sourceForeignKeyName,
×
162
            targetForeignKeyName,
×
163
            sourceSelfKeyName,
×
164
            targetSelfKeyName,
×
165
            targetFkHostTableName,
×
166
          } = junctionInfo;
×
167

168
          const batchProcessor = new BatchProcessor<Record<string, unknown>>((chunk) =>
×
169
            this.handleJunctionChunk(chunk, targetFkHostTableName)
×
170
          );
171

172
          entry
×
173
            .pipe(
×
174
              csvParser.default({
×
175
                // strict: true,
×
176
                mapValues: ({ value }) => {
×
177
                  return value;
×
178
                },
×
179
                mapHeaders: ({ header }) => {
×
180
                  return header
×
181
                    .replaceAll(sourceForeignKeyName, targetForeignKeyName)
×
182
                    .replaceAll(sourceSelfKeyName, targetSelfKeyName);
×
183
                },
×
184
              })
×
185
            )
186
            .pipe(batchProcessor)
×
187
            .on('error', (error: Error) => {
×
188
              this.logger.error(`process csv import error: ${error.message}`, error.stack);
×
189
              reject(error);
×
190
            })
×
191
            .on('end', () => {
×
192
              this.logger.log(`csv ${junctionTableName} finished`);
×
193
              resolve({ success: true });
×
194
            });
×
195
        } else {
×
196
          entry.autodrain();
×
197
        }
×
198
      });
×
199

200
      parser.on('close', () => {
×
201
        this.logger.log('import csv junction completed');
×
202
        resolve({ success: true });
×
203
      });
×
204

205
      parser.on('error', (error) => {
×
206
        this.logger.error(`import csv junction parser error: ${error.message}`, error.stack);
×
207
        reject(error);
×
208
      });
×
209
    });
×
210
  }
×
211

212
  private async handleJunctionChunk(
125✔
213
    results: Record<string, unknown>[],
×
214
    targetFkHostTableName: string
×
215
  ) {
×
216
    const allForeignKeyInfos = [] as {
×
217
      constraint_name: string;
218
      column_name: string;
219
      referenced_table_schema: string;
220
      referenced_table_name: string;
221
      referenced_column_name: string;
222
      dbTableName: string;
223
    }[];
224

NEW
225
    await this.prismaService.$tx(async (prisma) => {
×
NEW
226
      // delete foreign keys if(exist) then duplicate table data
×
NEW
227
      const foreignKeysInfoSql = this.dbProvider.getForeignKeysInfo(targetFkHostTableName);
×
NEW
228
      const foreignKeysInfo = await prisma.$queryRawUnsafe<
×
229
        {
230
          constraint_name: string;
231
          column_name: string;
232
          referenced_table_schema: string;
233
          referenced_table_name: string;
234
          referenced_column_name: string;
235
        }[]
NEW
236
      >(foreignKeysInfoSql);
×
NEW
237
      const newForeignKeyInfos = foreignKeysInfo.map((info) => ({
×
NEW
238
        ...info,
×
NEW
239
        dbTableName: targetFkHostTableName,
×
NEW
240
      }));
×
NEW
241
      allForeignKeyInfos.push(...newForeignKeyInfos);
×
242

NEW
243
      for (const { constraint_name, column_name, dbTableName } of allForeignKeyInfos) {
×
NEW
244
        const dropForeignKeyQuery = this.knex.schema
×
NEW
245
          .alterTable(dbTableName, (table) => {
×
NEW
246
            table.dropForeign(column_name, constraint_name);
×
NEW
247
          })
×
NEW
248
          .toQuery();
×
249

NEW
250
        await prisma.$executeRawUnsafe(dropForeignKeyQuery);
×
NEW
251
      }
×
252

NEW
253
      const sql = this.knex.table(targetFkHostTableName).insert(results).toQuery();
×
NEW
254
      try {
×
NEW
255
        await prisma.$executeRawUnsafe(sql);
×
NEW
256
      } catch (error) {
×
NEW
257
        if (error instanceof PrismaClientKnownRequestError) {
×
NEW
258
          this.logger.error(
×
NEW
259
            `exc junction import task known error: (${error.code}): ${error.message}`,
×
NEW
260
            error.stack
×
261
          );
NEW
262
        } else if (error instanceof PrismaClientUnknownRequestError) {
×
NEW
263
          this.logger.error(
×
NEW
264
            `exc junction import task unknown error: ${error.message}`,
×
NEW
265
            error.stack
×
266
          );
NEW
267
        } else {
×
NEW
268
          this.logger.error(
×
NEW
269
            `exc junction import task error: ${(error as Error)?.message}`,
×
NEW
270
            (error as Error)?.stack
×
271
          );
NEW
272
        }
×
273
      }
×
274

NEW
275
      // add foreign keys
×
NEW
276
      for (const {
×
NEW
277
        constraint_name,
×
NEW
278
        column_name,
×
NEW
279
        dbTableName,
×
NEW
280
        referenced_table_schema: referencedTableSchema,
×
NEW
281
        referenced_table_name: referencedTableName,
×
NEW
282
        referenced_column_name: referencedColumnName,
×
NEW
283
      } of allForeignKeyInfos) {
×
NEW
284
        const addForeignKeyQuery = this.knex.schema
×
NEW
285
          .alterTable(dbTableName, (table) => {
×
NEW
286
            table
×
NEW
287
              .foreign(column_name, constraint_name)
×
NEW
288
              .references(referencedColumnName)
×
NEW
289
              .inTable(`${referencedTableSchema}.${referencedTableName}`);
×
NEW
290
          })
×
NEW
291
          .toQuery();
×
NEW
292
        await prisma.$executeRawUnsafe(addForeignKeyQuery);
×
NEW
293
      }
×
NEW
294
    });
×
UNCOV
295
  }
×
296
}
125✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc