• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

visgl / loaders.gl / 24839896359

23 Apr 2026 02:06PM UTC coverage: 59.334% (-0.3%) from 59.627%
24839896359

push

github

web-flow
fix(json) Only emit batches when we have complete elements (#3400)

11234 of 20699 branches covered (54.27%)

Branch coverage included in aggregate %.

24 of 25 new or added lines in 1 file covered. (96.0%)

123 existing lines in 8 files now uncovered.

23043 of 37071 relevant lines covered (62.16%)

16510.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.24
/modules/csv/src/csv-loader.ts
1
// loaders.gl
2
// SPDX-License-Identifier: MIT
3
// Copyright (c) vis.gl contributors
4

5
import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
6
import type {
7
  Schema,
8
  ArrayRowTable,
9
  ColumnarTable,
10
  ColumnarTableBatch,
11
  ObjectRowTable,
12
  TableBatch,
13
  ArrowTable,
14
  ArrowTableBatch
15
} from '@loaders.gl/schema';
16

17
import {toArrayBufferIterator} from '@loaders.gl/loader-utils';
18
import {
19
  AsyncQueue,
20
  TableBatchBuilder,
21
  convertToArrayRow,
22
  convertToObjectRow
23
} from '@loaders.gl/schema-utils';
24
import Papa from './papaparse/papaparse';
25
import AsyncIteratorStreamer from './papaparse/async-iterator-streamer';
26
import {CSVFormat} from './csv-format';
27
import {DEFAULT_CSV_OPTIONS, DEFAULT_CSV_SHAPE} from './lib/csv-default-options';
28
import {
29
  parseCSVArrayBufferAsArrow,
30
  parseCSVInArrowBatches,
31
  parseCSVTextAsArrow
32
} from './csv-arrow-loader';
33
import {
34
  deduceCSVSchemaFromRows,
35
  detectGeometryColumns,
36
  MAX_GEOMETRY_SNIFF_ROWS,
37
  normalizeGeometryArrayRow,
38
  normalizeGeometryObjectRow,
39
  shouldFinalizeGeometryDetection
40
} from './lib/csv-geometry';
41

42
// __VERSION__ is injected by babel-plugin-version-inline
43
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
44
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
14!
45

46
/** Options for parsing CSV input into row tables or Arrow tables. */
47
export type CSVLoaderOptions = LoaderOptions & {
48
  csv?: {
49
    /** Selects row-table output or Arrow columnar output. */
50
    shape?: 'array-row-table' | 'object-row-table' | 'columnar-table' | 'arrow-table';
51
    /** Optimizes memory usage but increases parsing time. */
52
    optimizeMemoryUsage?: boolean;
53
    /** Prefix for generated column names when headers are absent. */
54
    columnPrefix?: string;
55
    /** Controls whether the first row is treated as headers. */
56
    header?: boolean | 'auto';
57

58
    // CSV options (papaparse)
59
    // delimiter: auto
60
    // newline: auto
61
    /** Character used to quote CSV fields. */
62
    quoteChar?: string;
63
    /** Character used to escape quoted CSV fields. */
64
    escapeChar?: string;
65
    /** Converts numbers and booleans and, for Arrow output, can infer dates. */
66
    dynamicTyping?: boolean;
67
    /** Enables comment line parsing. */
68
    comments?: boolean;
69
    /** Skips empty rows. */
70
    skipEmptyLines?: boolean | 'greedy';
71
    // transform: null?
72
    /** Candidate delimiters for automatic detection. */
73
    delimitersToGuess?: string[];
74
    detectGeometryColumns?: boolean;
75
    // fastMode: auto
76
  };
77
};
78

79
/** Loader for CSV and other delimiter-separated tabular text formats. */
80
export const CSVLoader = {
14✔
81
  ...CSVFormat,
82

83
  dataType: null as unknown as ObjectRowTable | ArrayRowTable | ColumnarTable | ArrowTable,
84
  batchType: null as unknown as TableBatch | ColumnarTableBatch | ArrowTableBatch,
85
  version: VERSION,
86
  parse: async (arrayBuffer: ArrayBuffer, options?: CSVLoaderOptions) =>
87
    options?.csv?.shape === 'arrow-table'
2!
88
      ? parseCSVArrayBufferAsArrow(arrayBuffer, options)
89
      : parseCSV(new TextDecoder().decode(arrayBuffer), options),
90
  parseText: (text: string, options?: CSVLoaderOptions) =>
91
    options?.csv?.shape === 'arrow-table'
48✔
92
      ? parseCSVTextAsArrow(text, options)
93
      : parseCSV(text, options),
94
  parseInBatches: (asyncIterator, options?: CSVLoaderOptions) =>
95
    options?.csv?.shape === 'arrow-table'
46✔
96
      ? parseCSVInArrowBatches(asyncIterator, options)
97
      : parseCSVInBatches(asyncIterator, options),
98
  // @ts-ignore
99
  // testText: null,
100
  options: {
101
    csv: DEFAULT_CSV_OPTIONS
102
  }
103
} as const satisfies LoaderWithParser<
104
  ObjectRowTable | ArrayRowTable | ColumnarTable | ArrowTable,
105
  TableBatch | ColumnarTableBatch | ArrowTableBatch,
106
  CSVLoaderOptions
107
>;
108

109
async function parseCSV(
110
  csvText: string,
111
  options?: CSVLoaderOptions
112
): Promise<ObjectRowTable | ArrayRowTable> {
113
  // Apps can call the parse method directly, so we apply default options here
114
  const csvOptions = {...CSVLoader.options.csv, ...options?.csv};
48✔
115

116
  const firstRow = readFirstRow(csvText);
48✔
117
  const header: boolean =
118
    csvOptions.header === 'auto' ? isHeaderRow(firstRow) : Boolean(csvOptions.header);
48✔
119

120
  const parseWithHeader = header;
48✔
121

122
  const papaparseConfig = {
48✔
123
    // dynamicTyping: true,
124
    ...csvOptions,
125
    header: parseWithHeader,
126
    download: false, // We handle loading, no need for papaparse to do it for us
127
    transformHeader: parseWithHeader ? duplicateColumnTransformer() : undefined,
48✔
128
    error: e => {
UNCOV
129
      throw new Error(e);
×
130
    }
131
  };
132

133
  const result = Papa.parse(csvText, papaparseConfig);
48✔
134
  const rows = result.data as any[];
48✔
135

136
  const headerRow = result.meta.fields || generateHeader(csvOptions.columnPrefix, firstRow.length);
48✔
137

138
  const shape = csvOptions.shape || DEFAULT_CSV_SHAPE;
48!
139
  let table: ArrayRowTable | ObjectRowTable;
140
  switch (shape) {
48!
141
    case 'object-row-table':
142
      table = {
38✔
143
        shape: 'object-row-table',
144
        data: rows.map(row => (Array.isArray(row) ? convertToObjectRow(row, headerRow) : row))
174,894✔
145
      };
146
      break;
38✔
147
    case 'array-row-table':
148
      table = {
8✔
149
        shape: 'array-row-table',
150
        data: rows.map(row => (Array.isArray(row) ? row : convertToArrayRow(row, headerRow)))
22✔
151
      };
152
      break;
8✔
153
    default:
UNCOV
154
      throw new Error(shape);
×
155
  }
156
  const detectedGeometryColumns = csvOptions.detectGeometryColumns
46✔
157
    ? detectGeometryColumns(
158
        headerRow,
159
        rows.map(row => (Array.isArray(row) ? row : convertToArrayRow(row, headerRow)))
12!
160
      )
161
    : [];
162

163
  if (detectedGeometryColumns.length > 0) {
48✔
164
    table =
4✔
165
      table.shape === 'array-row-table'
4✔
166
        ? {
167
            ...table,
168
            data: table.data.map(row => normalizeGeometryArrayRow(row, detectedGeometryColumns))
6✔
169
          }
170
        : {
171
            ...table,
172
            data: table.data.map(row => normalizeGeometryObjectRow(row, detectedGeometryColumns))
6✔
173
          };
174
  }
175

176
  table.schema = deduceCSVSchemaFromRows(table.data, headerRow, detectedGeometryColumns);
46✔
177
  return table;
46✔
178
}
179

180
// TODO - support batch size 0 = no batching/single batch?
181
function parseCSVInBatches(
182
  asyncIterator:
183
    | AsyncIterable<ArrayBufferLike | ArrayBufferView>
184
    | Iterable<ArrayBufferLike | ArrayBufferView>,
185
  options?: CSVLoaderOptions
186
): AsyncIterable<TableBatch> {
187
  // Papaparse does not support standard batch size handling
188
  // TODO - investigate papaparse chunks mode
189
  options = {...options};
44✔
190
  if (options?.core?.batchSize === 'auto') {
44✔
191
    options.core.batchSize = 4000;
36✔
192
  }
193

194
  // Apps can call the parse method directly, we so apply default options here
195
  const csvOptions = {...CSVLoader.options.csv, ...options?.csv};
44✔
196

197
  const asyncQueue = new AsyncQueue<TableBatch>();
44✔
198

199
  let isFirstRow: boolean = true;
44✔
200
  let headerRow: string[] | null = null;
44✔
201
  let tableBatchBuilder: TableBatchBuilder | null = null;
44✔
202
  let schema: Schema | null = null;
44✔
203
  let sniffedRows: unknown[][] = [];
44✔
204
  let detectedGeometryColumns = [] as ReturnType<typeof detectGeometryColumns>;
44✔
205
  let geometryDetectionFinalized = !csvOptions.detectGeometryColumns;
44✔
206

207
  const config = {
44✔
208
    // dynamicTyping: true, // Convert numbers and boolean values in rows from strings,
209
    ...csvOptions,
210
    header: false, // Unfortunately, header detection is not automatic and does not infer shapes
211
    download: false, // We handle loading, no need for papaparse to do it for us
212
    // chunkSize is set to 5MB explicitly (same as Papaparse default) due to a bug where the
213
    // streaming parser gets stuck if skipEmptyLines and a step callback are both supplied.
214
    // See https://github.com/mholt/PapaParse/issues/465
215
    chunkSize: 1024 * 1024 * 5,
216
    // skipEmptyLines is set to a boolean value if supplied. Greedy is set to true
217
    // skipEmptyLines is handled manually given two bugs where the streaming parser gets stuck if
218
    // both of the skipEmptyLines and step callback options are provided:
219
    // - true doesn't work unless chunkSize is set: https://github.com/mholt/PapaParse/issues/465
220
    // - greedy doesn't work: https://github.com/mholt/PapaParse/issues/825
221
    skipEmptyLines: false,
222

223
    // step is called on every row
224
    // eslint-disable-next-line complexity, max-statements
225
    step(results) {
226
      let row = results.data;
14,510✔
227

228
      if (csvOptions.skipEmptyLines === 'greedy') {
14,510✔
229
        // Manually reject lines that are empty
230
        const collapsedRow = row.flat().join('').trim();
16✔
231
        if (collapsedRow === '') {
16✔
232
          return;
10✔
233
        }
234
      } else if (csvOptions.skipEmptyLines === true) {
14,494!
235
        row = normalizePapaStreamingRow(row);
14,494✔
236
        if (row.length === 1 && row[0] === null) {
14,494✔
237
          return;
6✔
238
        }
239
      }
240
      const bytesUsed = results.meta.cursor;
14,494✔
241

242
      // Check if we need to save a header row
243
      if (isFirstRow && !headerRow) {
14,494✔
244
        // Auto detects or can be forced with csvOptions.header
245
        const header = csvOptions.header === 'auto' ? isHeaderRow(row) : Boolean(csvOptions.header);
44✔
246
        if (header) {
44✔
247
          headerRow = row.map(duplicateColumnTransformer());
26✔
248
          return;
26✔
249
        }
250
      }
251

252
      // If first data row, we can deduce the schema
253
      if (isFirstRow) {
14,468✔
254
        if (!headerRow) {
48✔
255
          headerRow = generateHeader(csvOptions.columnPrefix, row.length);
18✔
256
        }
257
      }
258

259
      if (csvOptions.optimizeMemoryUsage) {
14,468!
260
        // A workaround to allocate new strings and don't retain pointers to original strings.
261
        // https://bugs.chromium.org/p/v8/issues/detail?id=2869
262
        row = JSON.parse(JSON.stringify(row));
×
263
      }
264

265
      const shape = getBatchShape();
14,468✔
266

267
      if (!geometryDetectionFinalized && headerRow) {
14,468✔
268
        sniffedRows.push(row);
6✔
269
        geometryDetectionFinalized = shouldFinalizeGeometryDetection(
6✔
270
          headerRow,
271
          sniffedRows,
272
          MAX_GEOMETRY_SNIFF_ROWS
273
        );
274
        if (geometryDetectionFinalized) {
6!
UNCOV
275
          detectedGeometryColumns = detectGeometryColumns(headerRow, sniffedRows);
×
UNCOV
276
          const normalizedSniffedRows = sniffedRows.map(sniffedRow =>
×
277
            normalizeGeometryArrayRow(sniffedRow, detectedGeometryColumns)
278
          );
UNCOV
279
          schema = deduceCSVSchemaFromRows(
×
280
            normalizedSniffedRows,
281
            headerRow,
282
            detectedGeometryColumns
283
          );
UNCOV
284
          isFirstRow = false;
×
UNCOV
285
          for (const normalizedSniffedRow of normalizedSniffedRows) {
×
UNCOV
286
            addCSVBatchRow(normalizedSniffedRow, shape, bytesUsed);
×
287
          }
UNCOV
288
          sniffedRows = [];
×
289
        }
290
        return;
6✔
291
      }
292

293
      if (isFirstRow) {
14,462✔
294
        if (!headerRow) {
42!
UNCOV
295
          return;
×
296
        }
297
        schema = deduceCSVSchemaFromRows(
42✔
298
          [normalizeGeometryArrayRow(row, detectedGeometryColumns)],
299
          headerRow,
300
          detectedGeometryColumns
301
        );
302
        isFirstRow = false;
42✔
303
      }
304

305
      const normalizedRow = normalizeGeometryArrayRow(row, detectedGeometryColumns);
14,462✔
306
      addCSVBatchRow(normalizedRow, shape, bytesUsed);
14,462✔
307
    },
308

309
    // complete is called when all rows have been read
310
    complete(results) {
311
      try {
44✔
312
        if (!geometryDetectionFinalized && headerRow) {
44✔
313
          detectedGeometryColumns = detectGeometryColumns(headerRow, sniffedRows);
2✔
314
          const normalizedSniffedRows = sniffedRows.map(row =>
4✔
315
            normalizeGeometryArrayRow(row, detectedGeometryColumns)
3✔
316
          );
317
          schema = deduceCSVSchemaFromRows(
2✔
318
            normalizedSniffedRows,
319
            headerRow,
320
            detectedGeometryColumns
321
          );
322
          const shape = getBatchShape();
2✔
323
          tableBatchBuilder =
2✔
324
            tableBatchBuilder ||
4✔
325
            new TableBatchBuilder(schema, {
326
              ...(options?.core || {}),
2!
327
              shape
328
            });
329
          for (const normalizedSniffedRow of normalizedSniffedRows) {
2✔
330
            const batchRow =
331
              shape === 'object-row-table' && normalizedSniffedRow.length > headerRow.length
6!
332
                ? convertToPapaObjectRow(normalizedSniffedRow, headerRow)
333
                : normalizedSniffedRow;
334
            tableBatchBuilder.addRow(batchRow);
6✔
335
          }
336
        }
337
        const bytesUsed = results.meta.cursor;
44✔
338
        // Ensure any final (partial) batch gets emitted
339
        const batch = tableBatchBuilder && tableBatchBuilder.getFinalBatch({bytesUsed});
44✔
340
        if (batch) {
44✔
341
          asyncQueue.enqueue(batch);
40✔
342
        }
343
      } catch (error) {
UNCOV
344
        asyncQueue.enqueue(error as Error);
×
345
      }
346

347
      asyncQueue.close();
44✔
348
    }
349
  };
350

351
  Papa.parse(toArrayBufferIterator(asyncIterator), config, AsyncIteratorStreamer);
44✔
352

353
  // TODO - Does it matter if we return asyncIterable or asyncIterator
354
  // return asyncQueue[Symbol.asyncIterator]();
355
  return asyncQueue;
44✔
356

357
  function addCSVBatchRow(rowToAdd: unknown[], shape: CSVBatchShape, bytesUsed: number): void {
358
    let batchRow: unknown[] | {[columnName: string]: unknown} = rowToAdd;
14,462✔
359
    if (shape === 'object-row-table' && headerRow && rowToAdd.length > headerRow.length) {
14,462✔
360
      batchRow = convertToPapaObjectRow(rowToAdd, headerRow);
4✔
361
    }
362

363
    tableBatchBuilder =
14,462✔
364
      tableBatchBuilder ||
14,504✔
365
      new TableBatchBuilder(schema!, {
366
        ...(options?.core || {}),
44✔
367
        shape
368
      });
369

370
    try {
14,462✔
371
      tableBatchBuilder.addRow(batchRow);
14,462✔
372
      const batch = tableBatchBuilder && tableBatchBuilder.getFullBatch({bytesUsed});
14,462✔
373
      if (batch) {
14,462✔
374
        asyncQueue.enqueue(batch);
168✔
375
      }
376
    } catch (error) {
UNCOV
377
      asyncQueue.enqueue(error as Error);
×
378
    }
379
  }
380

381
  function getBatchShape(): CSVBatchShape {
382
    const deprecatedShape = (options as {shape?: CSVBatchShape} | undefined)?.shape;
14,470✔
383
    const shape = deprecatedShape || csvOptions.shape || DEFAULT_CSV_SHAPE;
14,470!
384
    switch (shape) {
14,470✔
385
      case 'array-row-table':
386
      case 'columnar-table':
387
        return shape;
4,034✔
388
      default:
389
        return DEFAULT_CSV_SHAPE;
10,436✔
390
    }
391
  }
392
}
393

394
type CSVBatchShape = 'array-row-table' | 'object-row-table' | 'columnar-table';
395

396
/**
397
 * Checks if a certain row is a header row
398
 * @param row the row to check
399
 * @returns true if the row looks like a header
400
 */
401
function isHeaderRow(row: string[]): boolean {
402
  return row && row.every(value => typeof value === 'string');
286✔
403
}
404

405
/**
406
 * Reads, parses, and returns the first row of a CSV text
407
 * @param csvText the csv text to parse
408
 * @returns the first row
409
 */
410
function readFirstRow(csvText: string): any[] {
411
  const result = Papa.parse(csvText, {
48✔
412
    dynamicTyping: true,
413
    preview: 1
414
  });
415
  return result.data[0];
48✔
416
}
417

418
/**
419
 * Creates a transformer that renames duplicate columns. This is needed as Papaparse doesn't handle
420
 * duplicate header columns and would use the latest occurrence by default.
421
 * See the header option in https://www.papaparse.com/docs#config
422
 * @returns a transform function that returns sanitized names for duplicate fields
423
 */
424
function duplicateColumnTransformer(): (column: string) => string {
425
  const observedColumns = new Set<string>();
58✔
426
  return col => {
58✔
427
    let colName = col;
320✔
428
    let counter = 1;
320✔
429
    while (observedColumns.has(colName)) {
320✔
430
      colName = `${col}.${counter}`;
50✔
431
      counter++;
50✔
432
    }
433
    observedColumns.add(colName);
320✔
434
    return colName;
320✔
435
  };
436
}
437

438
/**
439
 * Generates the header of a CSV given a prefix and a column count
440
 * @param columnPrefix the columnPrefix to use
441
 * @param count the count of column names to generate
442
 * @returns an array of column names
443
 */
444
function generateHeader(columnPrefix: string, count: number = 0): string[] {
32✔
445
  const headers: string[] = [];
32✔
446
  for (let i = 0; i < count; i++) {
32✔
447
    headers.push(`${columnPrefix}${i + 1}`);
102✔
448
  }
449
  return headers;
32✔
450
}
451

452
function normalizePapaStreamingRow(row: unknown[]): unknown[] {
453
  return row.map(value => (Array.isArray(value) && value.length === 0 ? null : value));
57,510✔
454
}
455

456
function convertToPapaObjectRow(
457
  row: unknown[],
458
  headerRow: string[]
459
): {[columnName: string]: unknown} {
460
  const objectRow = convertToObjectRow(row, headerRow);
4✔
461
  const parsedExtra = row.slice(headerRow.length);
4✔
462
  if (parsedExtra.length > 0) {
4!
463
    objectRow.__parsed_extra = parsedExtra;
4✔
464
  }
465
  return objectRow;
4✔
466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc