• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

teableio / teable / 14295824870

06 Apr 2025 07:21PM UTC coverage: 81.147% (+0.4%) from 80.787%
14295824870

push

github

web-flow
feat: support template (#1413)

* feat: support base export and import

* feat: support space template feture

* feat: add link junction table relative import and export

* fix: duplicate table lookup losing link field

* fix: duplicate lookup field error

* chore: add template db-migration

* fix: package error

* feat: use html tag for export notification

* fix: template relative

* fix: lint error

* fix: template e2e

* feat: base export auth

* fix: delete previous snapshot base when create new snapshot

* feat: optimise template category creating process

* fix: import base zip field error

* fix: import base error

* fix: date field type record insert

* fix: view options duplicate

* fix: view share enable duplicate

* fix: view duplicate

* refactor: simplify duplicate logic

* fix: some duplicate base bugs and e2e

* fix: duplicate view with share relative

* feat: update template relative api auth setting

* fix: attachment job judgement when import base

* fix: base import task

* fix: import attachment losing mimetype when import base

* feat: add shareid unique constraint

* fix: get mimetype from file suffix when import base

7453 of 7915 branches covered (94.16%)

2377 of 3218 new or added lines in 28 files covered. (73.87%)

41 existing lines in 6 files now uncovered.

35880 of 44216 relevant lines covered (81.15%)

1764.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.65
/apps/nestjs-backend/src/features/base/base-import-csv.processor.ts
1
import type { TransformCallback } from 'stream';
4✔
2
import { Transform } from 'stream';
3
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
4
import { Injectable, Logger } from '@nestjs/common';
5
import {
6
  PrismaClientKnownRequestError,
7
  PrismaClientUnknownRequestError,
8
} from '@prisma/client/runtime/library';
9
import type { IAttachmentCellValue, ILinkFieldOptions } from '@teable/core';
10
import { FieldType } from '@teable/core';
11
import { PrismaService } from '@teable/db-main-prisma';
12
import type { IBaseJson } from '@teable/openapi';
13
import { UploadType } from '@teable/openapi';
14
import { Queue, QueueEvents, Job } from 'bullmq';
15
import * as csvParser from 'csv-parser';
16
import { Knex } from 'knex';
17
import { InjectModel } from 'nest-knexjs';
18
import * as unzipper from 'unzipper';
19
import StorageAdapter from '../attachments/plugins/adapter';
20
import { InjectStorageAdapter } from '../attachments/plugins/storage';
21
import { createFieldInstanceByRaw } from '../field/model/factory';
22
import { RecordOpenApiService } from '../record/open-api/record-open-api.service';
23
import { EXCLUDE_SYSTEM_FIELDS } from './constant';
24

25
interface IBaseImportCsvJob {
26
  path: string;
27
  userId: string;
28
  tableIdMap: Record<string, string>;
29
  fieldIdMap: Record<string, string>;
30
  viewIdMap: Record<string, string>;
31
  structure: IBaseJson;
32
}
33

34
const chunkSize = 1000;
4✔
35

36
export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';
4✔
37

38
@Injectable()
39
@Processor(BASE_IMPORT_CSV_QUEUE)
40
export class BaseImportCsvQueueProcessor extends WorkerHost {
4✔
41
  private logger = new Logger(BaseImportCsvQueueProcessor.name);
125✔
42
  readonly queueEvents = new QueueEvents(BASE_IMPORT_CSV_QUEUE);
125✔
43

44
  private processedJobs = new Set<string>();
125✔
45

46
  constructor(
125✔
47
    private readonly prismaService: PrismaService,
125✔
48
    private readonly recordOpenApiService: RecordOpenApiService,
125✔
49
    @InjectModel('CUSTOM_KNEX') private readonly knex: Knex,
125✔
50
    @InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
125✔
51
    @InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>
125✔
52
  ) {
125✔
53
    super();
125✔
54
  }
125✔
55

56
  public async process(job: Job<IBaseImportCsvJob>) {
125✔
57
    const jobId = String(job.id);
6✔
58
    if (this.processedJobs.has(jobId)) {
6✔
59
      this.logger.log(`Job ${jobId} already processed, skipping`);
4✔
60
      return;
4✔
61
    }
4✔
62

63
    this.processedJobs.add(jobId);
2✔
64

65
    try {
2✔
66
      await this.handleBaseImportCsv(job);
2✔
67
    } catch (error) {
6✔
NEW
68
      this.logger.error(
×
NEW
69
        `Process base import csv failed: ${(error as Error)?.message}`,
×
NEW
70
        (error as Error)?.stack
×
71
      );
NEW
72
    }
×
73
  }
6✔
74

75
  private async handleBaseImportCsv(job: Job<IBaseImportCsvJob>) {
125✔
76
    const { path, userId, tableIdMap, fieldIdMap, viewIdMap, structure } = job.data;
2✔
77
    const csvStream = await this.storageAdapter.downloadFile(
2✔
78
      StorageAdapter.getBucket(UploadType.Import),
2✔
79
      path
2✔
80
    );
81
    const processedFiles = new Set<string>();
2✔
82

83
    const parser = unzipper.Parse();
2✔
84
    csvStream.pipe(parser);
2✔
85

86
    return new Promise<{ success: boolean }>((resolve, reject) => {
2✔
87
      parser.on('entry', (entry) => {
2✔
88
        const filePath = entry.path;
10✔
89

90
        if (processedFiles.has(filePath)) {
10✔
91
          this.logger.warn(`warning: duplicate process file: ${filePath}`);
2✔
92
        }
2✔
93
        processedFiles.add(filePath);
10✔
94

95
        if (
10✔
96
          filePath.startsWith('tables/') &&
10✔
97
          entry.type !== 'Directory' &&
8✔
98
          // exclude junction table
8✔
99
          !filePath.includes('junction_')
8✔
100
        ) {
10✔
101
          const tableId = filePath.replace('tables/', '').split('.')[0];
4✔
102
          const table = structure.tables.find((table) => table.id === tableId);
4✔
103
          const attachmentsFields =
4✔
104
            table?.fields
4✔
105
              ?.filter(({ type }) => type === FieldType.Attachment)
4✔
106
              .map(({ dbFieldName, id }) => ({
4✔
NEW
107
                dbFieldName,
×
NEW
108
                id,
×
109
              })) || [];
4✔
110

111
          const batchProcessor = new BatchProcessor(
4✔
112
            chunkSize,
4✔
113
            this.handleChunk.bind(this),
4✔
114
            tableIdMap[tableId],
4✔
115
            userId,
4✔
116
            fieldIdMap,
4✔
117
            viewIdMap,
4✔
118
            attachmentsFields
4✔
119
          );
120

121
          entry
4✔
122
            .pipe(
4✔
123
              csvParser.default({
4✔
124
                // strict: true,
4✔
125
                mapValues: ({ value }) => {
4✔
126
                  return value;
1,052✔
127
                },
1,052✔
128
                mapHeaders: ({ header }) => {
4✔
129
                  if (header.startsWith('__row_')) {
48✔
NEW
130
                    return `__row_${viewIdMap[header.slice(5)]}`;
×
NEW
131
                  }
×
132
                  if (header.startsWith('__fk_')) {
48✔
NEW
133
                    return `__fk_${fieldIdMap[header.slice(5)]}`;
×
NEW
134
                  }
×
135
                  return header;
48✔
136
                },
48✔
137
              })
4✔
138
            )
139
            .pipe(batchProcessor)
4✔
140
            .on('error', (error: Error) => {
4✔
NEW
141
              this.logger.error(`process csv import error: ${error.message}`, error.stack);
×
NEW
142
              reject(error);
×
NEW
143
            })
×
144
            .on('end', () => {
4✔
NEW
145
              this.logger.log(`csv ${tableId} finished`);
×
NEW
146
              resolve({ success: true });
×
NEW
147
            });
×
148
        } else {
10✔
149
          entry.autodrain();
6✔
150
        }
6✔
151
      });
10✔
152

153
      parser.on('close', () => {
2✔
154
        this.logger.log('import csv completed');
2✔
155
        resolve({ success: true });
2✔
156
      });
2✔
157

158
      parser.on('error', (error) => {
2✔
NEW
159
        this.logger.error(`ZIP parser error: ${error.message}`, error.stack);
×
NEW
160
        reject(error);
×
NEW
161
      });
×
162
    });
2✔
163
  }
2✔
164

165
  private async handleChunk(
125✔
166
    results: Record<string, unknown>[],
4✔
167
    tableId: string,
4✔
168
    userId: string,
4✔
169
    fieldIdMap: Record<string, string>,
4✔
170
    viewIdMap: Record<string, string>,
4✔
171
    attachmentsFields: { dbFieldName: string; id: string }[]
4✔
172
  ) {
4✔
173
    const { dbTableName } = await this.prismaService.tableMeta.findUniqueOrThrow({
4✔
174
      where: { id: tableId },
4✔
175
      select: {
4✔
176
        dbTableName: true,
4✔
177
      },
4✔
178
    });
4✔
179

180
    const attachmentsTableData = [] as {
4✔
181
      attachmentId: string;
182
      name: string;
183
      token: string;
184
      tableId: string;
185
      recordId: string;
186
      fieldId: string;
187
    }[];
188

189
    const newResult = [...results].map((res) => {
4✔
190
      const newRes = { ...res };
88✔
191
      const keys = Object.keys(newRes);
88✔
192
      const rawKeys = keys.filter((key) => key.startsWith('__row_'));
88✔
193
      const fkKeys = keys.filter((key) => key.startsWith('__fk_fld'));
88✔
194

195
      rawKeys.forEach((key) => {
88✔
NEW
196
        const value = res[key];
×
NEW
197
        const fieldId = key.slice(5);
×
NEW
198
        const newKey = fieldIdMap[fieldId] ? `__fk_${viewIdMap[fieldId]}` : key;
×
NEW
199
        newRes[newKey] = value;
×
NEW
200
        delete newRes[key];
×
NEW
201
      });
×
202

203
      fkKeys.forEach((key) => {
88✔
NEW
204
        const value = res[key];
×
NEW
205
        const viewId = key.slice(6);
×
NEW
206
        const newKey = viewIdMap[viewId] ? `__row_${viewIdMap[viewId]}` : key;
×
NEW
207
        newRes[newKey] = value;
×
NEW
208
        delete newRes[key];
×
NEW
209
      });
×
210

211
      EXCLUDE_SYSTEM_FIELDS.forEach((header) => {
88✔
212
        delete newRes[header];
528✔
213
      });
528✔
214

215
      return newRes;
88✔
216
    });
88✔
217

218
    const attachmentsDbFieldNames = attachmentsFields.map(({ dbFieldName }) => dbFieldName);
4✔
219

220
    const recordsToInsert = newResult.map((result) => {
4✔
221
      const res = { ...result };
88✔
222
      Object.entries(res).forEach(([key, value]) => {
88✔
223
        if (res[key] === '') {
1,052✔
224
          res[key] = null;
402✔
225
        }
402✔
226

227
        // attachment field should add info to attachments table
1,052✔
228
        if (attachmentsDbFieldNames.includes(key) && value) {
1,052✔
NEW
229
          const attValues = JSON.parse(value as string) as IAttachmentCellValue;
×
NEW
230
          const fieldId = attachmentsFields.find(({ dbFieldName }) => dbFieldName === key)?.id;
×
NEW
231
          attValues.forEach((att) => {
×
NEW
232
            attachmentsTableData.push({
×
NEW
233
              attachmentId: att.id,
×
NEW
234
              name: att.name,
×
NEW
235
              token: att.token,
×
NEW
236
              tableId: tableId,
×
NEW
237
              recordId: res['__id'] as string,
×
NEW
238
              fieldId: fieldIdMap[fieldId!],
×
NEW
239
            });
×
NEW
240
          });
×
NEW
241
        }
×
242
      });
1,052✔
243

244
      // default value set
88✔
245
      res['__created_by'] = userId;
88✔
246
      res['__version'] = 1;
88✔
247
      return res;
88✔
248
    });
88✔
249

250
    const sql = this.knex.table(dbTableName).insert(recordsToInsert).toQuery();
4✔
251
    await this.prismaService.txClient().$executeRawUnsafe(sql);
4✔
252
    await this.updateAttachmentTable(userId, attachmentsTableData);
4✔
253
  }
4✔
254

255
  // when insert table data relative to attachment, we need to update the attachment table
125✔
256
  private async updateAttachmentTable(
125✔
257
    userId: string,
4✔
258
    attachmentsTableData: {
4✔
259
      attachmentId: string;
260
      name: string;
261
      token: string;
262
      tableId: string;
263
      recordId: string;
264
      fieldId: string;
265
    }[]
4✔
266
  ) {
4✔
267
    await this.prismaService.txClient().attachmentsTable.createMany({
4✔
268
      data: attachmentsTableData.map((a) => ({ ...a, createdBy: userId })),
4✔
269
    });
4✔
270
  }
4✔
271

272
  private async importJunctionChunk(
125✔
NEW
273
    path: string,
×
NEW
274
    fieldIdMap: Record<string, string>,
×
NEW
275
    structure: IBaseJson
×
NEW
276
  ) {
×
NEW
277
    const csvStream = await this.storageAdapter.downloadFile(
×
NEW
278
      StorageAdapter.getBucket(UploadType.Import),
×
NEW
279
      path
×
280
    );
281

NEW
282
    const sourceLinkFields = structure.tables
×
NEW
283
      .map(({ fields }) => fields)
×
NEW
284
      .flat()
×
NEW
285
      .filter((f) => f.type === FieldType.Link && !f.isLookup);
×
286

NEW
287
    const linkFieldRaws = await this.prismaService.field.findMany({
×
NEW
288
      where: {
×
NEW
289
        id: {
×
NEW
290
          in: Object.values(fieldIdMap),
×
NEW
291
        },
×
NEW
292
        type: FieldType.Link,
×
NEW
293
        isLookup: null,
×
NEW
294
      },
×
NEW
295
    });
×
296

NEW
297
    const junctionDbTableNameMap = {} as Record<
×
298
      string,
299
      {
300
        sourceSelfKeyName: string;
301
        sourceForeignKeyName: string;
302
        targetSelfKeyName: string;
303
        targetForeignKeyName: string;
304
        targetFkHostTableName: string;
305
      }
306
    >;
307

NEW
308
    const linkFieldInstances = linkFieldRaws.map((f) => createFieldInstanceByRaw(f));
×
309

NEW
310
    for (const sourceField of sourceLinkFields) {
×
NEW
311
      const { options: sourceOptions } = sourceField;
×
NEW
312
      const {
×
NEW
313
        fkHostTableName: sourceFkHostTableName,
×
NEW
314
        selfKeyName: sourceSelfKeyName,
×
NEW
315
        foreignKeyName: sourceForeignKeyName,
×
NEW
316
      } = sourceOptions as ILinkFieldOptions;
×
NEW
317
      const targetField = linkFieldInstances.find((f) => f.id === fieldIdMap[sourceField.id])!;
×
NEW
318
      const { options: targetOptions } = targetField;
×
NEW
319
      const {
×
NEW
320
        fkHostTableName: targetFkHostTableName,
×
NEW
321
        selfKeyName: targetSelfKeyName,
×
NEW
322
        foreignKeyName: targetForeignKeyName,
×
NEW
323
      } = targetOptions as ILinkFieldOptions;
×
NEW
324
      if (sourceFkHostTableName.includes('junction_')) {
×
NEW
325
        junctionDbTableNameMap[sourceFkHostTableName] = {
×
NEW
326
          sourceSelfKeyName,
×
NEW
327
          sourceForeignKeyName,
×
NEW
328
          targetSelfKeyName,
×
NEW
329
          targetForeignKeyName,
×
NEW
330
          targetFkHostTableName,
×
NEW
331
        };
×
NEW
332
      }
×
NEW
333
    }
×
334

NEW
335
    const parser = unzipper.Parse();
×
NEW
336
    csvStream.pipe(parser);
×
337

NEW
338
    const processedFiles = new Set<string>();
×
339

NEW
340
    return new Promise<{ success: boolean }>((resolve, reject) => {
×
NEW
341
      parser.on('entry', (entry) => {
×
NEW
342
        const filePath = entry.path;
×
343

NEW
344
        if (processedFiles.has(filePath)) {
×
NEW
345
          entry.autodrain();
×
NEW
346
          return;
×
NEW
347
        }
×
NEW
348
        processedFiles.add(filePath);
×
349

NEW
350
        if (
×
NEW
351
          filePath.startsWith('tables/') &&
×
NEW
352
          entry.type !== 'Directory' &&
×
NEW
353
          filePath.includes('junction_')
×
NEW
354
        ) {
×
NEW
355
          const name = filePath.replace('tables/', '').split('.');
×
NEW
356
          name.pop();
×
NEW
357
          const junctionTableName = name.join('.');
×
NEW
358
          const junctionInfo = junctionDbTableNameMap[junctionTableName];
×
359

NEW
360
          const {
×
NEW
361
            sourceForeignKeyName,
×
NEW
362
            targetForeignKeyName,
×
NEW
363
            sourceSelfKeyName,
×
NEW
364
            targetSelfKeyName,
×
NEW
365
            targetFkHostTableName,
×
NEW
366
          } = junctionInfo;
×
367

NEW
368
          const batchProcessor = new JunctionBatchProcessor(
×
NEW
369
            chunkSize,
×
NEW
370
            this.handleJunctionChunk.bind(this),
×
NEW
371
            targetFkHostTableName
×
372
          );
373

NEW
374
          entry
×
NEW
375
            .pipe(
×
NEW
376
              csvParser.default({
×
NEW
377
                // strict: true,
×
NEW
378
                mapValues: ({ value }) => {
×
NEW
379
                  return value;
×
NEW
380
                },
×
NEW
381
                mapHeaders: ({ header }) => {
×
NEW
382
                  return header
×
NEW
383
                    .replaceAll(sourceForeignKeyName, targetForeignKeyName)
×
NEW
384
                    .replaceAll(sourceSelfKeyName, targetSelfKeyName);
×
NEW
385
                },
×
NEW
386
              })
×
387
            )
NEW
388
            .pipe(batchProcessor)
×
NEW
389
            .on('error', (error: Error) => {
×
NEW
390
              this.logger.error(`process csv import error: ${error.message}`, error.stack);
×
NEW
391
              reject(error);
×
NEW
392
            })
×
NEW
393
            .on('end', () => {
×
NEW
394
              this.logger.log(`csv ${junctionTableName} finished`);
×
NEW
395
              resolve({ success: true });
×
NEW
396
            });
×
NEW
397
        } else {
×
NEW
398
          entry.autodrain();
×
NEW
399
        }
×
NEW
400
      });
×
401

NEW
402
      parser.on('close', () => {
×
NEW
403
        this.logger.log('import csv junction completed');
×
NEW
404
        resolve({ success: true });
×
NEW
405
      });
×
406

NEW
407
      parser.on('error', (error) => {
×
NEW
408
        this.logger.error(`import csv junction parser error: ${error.message}`, error.stack);
×
NEW
409
        reject(error);
×
NEW
410
      });
×
NEW
411
    });
×
NEW
412
  }
×
413

414
  private async handleJunctionChunk(
125✔
NEW
415
    results: Record<string, unknown>[],
×
NEW
416
    targetFkHostTableName: string
×
NEW
417
  ) {
×
NEW
418
    const sql = this.knex.table(targetFkHostTableName).insert(results).toQuery();
×
NEW
419
    try {
×
NEW
420
      await this.prismaService.txClient().$executeRawUnsafe(sql);
×
NEW
421
    } catch (error) {
×
NEW
422
      if (error instanceof PrismaClientKnownRequestError) {
×
NEW
423
        this.logger.error(
×
NEW
424
          `exc junction import task known error: (${error.code}): ${error.message}`,
×
NEW
425
          error.stack
×
426
        );
NEW
427
      } else if (error instanceof PrismaClientUnknownRequestError) {
×
NEW
428
        this.logger.error(`exc junction import task unknown error: ${error.message}`, error.stack);
×
NEW
429
      } else {
×
NEW
430
        this.logger.error(
×
NEW
431
          `exc junction import task error: ${(error as Error)?.message}`,
×
NEW
432
          (error as Error)?.stack
×
433
        );
NEW
434
      }
×
NEW
435
    }
×
NEW
436
  }
×
437

438
  @OnWorkerEvent('completed')
125✔
NEW
439
  async onCompleted(job: Job) {
×
NEW
440
    const { fieldIdMap, path, structure } = job.data;
×
NEW
441
    await this.importJunctionChunk(path, fieldIdMap, structure);
×
NEW
442
  }
×
443
}
125✔
444

445
class BatchProcessor extends Transform {
4✔
446
  private buffer: Record<string, unknown>[] = [];
4✔
447
  private totalProcessed = 0;
4✔
448

449
  constructor(
4✔
450
    private readonly batchSize: number,
4✔
451
    private readonly processBatch: (
4✔
452
      batch: Record<string, unknown>[],
453
      tableId: string,
454
      userId: string,
455
      fieldIdMap: Record<string, string>,
456
      viewIdMap: Record<string, string>,
457
      attachmentsFields: { dbFieldName: string; id: string }[]
458
    ) => Promise<void>,
4✔
459
    private tableId: string,
4✔
460
    private userId: string,
4✔
461
    private fieldIdMap: Record<string, string>,
4✔
462
    private viewIdMap: Record<string, string>,
4✔
463
    private attachmentsFields: { dbFieldName: string; id: string }[]
4✔
464
  ) {
4✔
465
    super({ objectMode: true });
4✔
466
  }
4✔
467

468
  // eslint-disable-next-line @typescript-eslint/naming-convention
4✔
469
  _transform(
4✔
470
    chunk: Record<string, unknown>,
88✔
471
    encoding: BufferEncoding,
88✔
472
    callback: TransformCallback
88✔
473
  ): void {
88✔
474
    this.buffer.push(chunk);
88✔
475
    this.totalProcessed++;
88✔
476

477
    if (this.buffer.length >= this.batchSize) {
88!
NEW
478
      const currentBatch = [...this.buffer];
×
NEW
479
      this.buffer = [];
×
480

NEW
481
      this.processBatch(
×
NEW
482
        currentBatch,
×
NEW
483
        this.tableId,
×
NEW
484
        this.userId,
×
NEW
485
        this.fieldIdMap,
×
NEW
486
        this.viewIdMap,
×
NEW
487
        this.attachmentsFields
×
488
      )
NEW
489
        .then(() => {
×
NEW
490
          this.emit('progress', { processed: this.totalProcessed });
×
NEW
491
          callback();
×
NEW
492
        })
×
NEW
493
        .catch((err: Error) => callback(err));
×
494
    } else {
88✔
495
      callback();
88✔
496
    }
88✔
497
  }
88✔
498

499
  // eslint-disable-next-line @typescript-eslint/naming-convention
4✔
500
  _flush(callback: TransformCallback): void {
4✔
501
    if (this.buffer.length > 0) {
4✔
502
      this.processBatch(
4✔
503
        this.buffer,
4✔
504
        this.tableId,
4✔
505
        this.userId,
4✔
506
        this.fieldIdMap,
4✔
507
        this.viewIdMap,
4✔
508
        this.attachmentsFields
4✔
509
      )
510
        .then(() => {
4✔
511
          this.emit('progress', { processed: this.totalProcessed });
4✔
512
          callback();
4✔
513
        })
4✔
514
        .catch((err: Error) => callback(err));
4✔
515
    } else {
4!
NEW
516
      callback();
×
NEW
517
    }
×
518
  }
4✔
519
}
4✔
520

NEW
521
class JunctionBatchProcessor extends Transform {
×
NEW
522
  private buffer: Record<string, unknown>[] = [];
×
NEW
523
  private totalProcessed = 0;
×
524

NEW
525
  constructor(
×
NEW
526
    private readonly batchSize: number,
×
NEW
527
    private readonly processBatch: (
×
528
      batch: Record<string, unknown>[],
529
      targetFkHostTableName: string
NEW
530
    ) => Promise<void>,
×
NEW
531
    private targetFkHostTableName: string
×
NEW
532
  ) {
×
NEW
533
    super({ objectMode: true });
×
NEW
534
  }
×
535

NEW
536
  // eslint-disable-next-line @typescript-eslint/naming-convention
×
NEW
537
  _transform(
×
NEW
538
    chunk: Record<string, unknown>,
×
NEW
539
    encoding: BufferEncoding,
×
NEW
540
    callback: TransformCallback
×
NEW
541
  ): void {
×
NEW
542
    this.buffer.push(chunk);
×
NEW
543
    this.totalProcessed++;
×
544

NEW
545
    if (this.buffer.length >= this.batchSize) {
×
NEW
546
      const currentBatch = [...this.buffer];
×
NEW
547
      this.buffer = [];
×
548

NEW
549
      this.processBatch(currentBatch, this.targetFkHostTableName)
×
NEW
550
        .then(() => {
×
NEW
551
          this.emit('progress', { processed: this.totalProcessed });
×
NEW
552
          callback();
×
NEW
553
        })
×
NEW
554
        .catch((err: Error) => callback(err));
×
NEW
555
    } else {
×
NEW
556
      callback();
×
NEW
557
    }
×
NEW
558
  }
×
559

NEW
560
  // eslint-disable-next-line @typescript-eslint/naming-convention
×
NEW
561
  _flush(callback: TransformCallback): void {
×
NEW
562
    if (this.buffer.length > 0) {
×
NEW
563
      this.processBatch(this.buffer, this.targetFkHostTableName)
×
NEW
564
        .then(() => {
×
NEW
565
          this.emit('progress', { processed: this.totalProcessed });
×
NEW
566
          callback();
×
NEW
567
        })
×
NEW
568
        .catch((err: Error) => callback(err));
×
NEW
569
    } else {
×
NEW
570
      callback();
×
NEW
571
    }
×
NEW
572
  }
×
NEW
573
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc