• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

keplergl / kepler.gl / 12031095165

26 Nov 2024 12:57PM UTC coverage: 69.321% (+22.9%) from 46.466%
12031095165

push

github

web-flow
[feat] create new dataset action (#2778)

* [feat] create new dataset action

- createNewDataEntry now returns a react-palm task to create or update a dataset asynchronously.
- updateVisDataUpdater now returns tasks to create or update a dataset asynchronously, and once done triggers createNewDatasetSuccess action.
- refactoring of demo-app App and Container to functional components

Signed-off-by: Ihor Dykhta <dikhta.igor@gmail.com>
Co-authored-by: Shan He <heshan0131@gmail.com>

5436 of 9079 branches covered (59.87%)

Branch coverage included in aggregate %.

91 of 111 new or added lines in 13 files covered. (81.98%)

8 existing lines in 3 files now uncovered.

11368 of 15162 relevant lines covered (74.98%)

95.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.33
/src/processors/src/file-handler.ts
1
// SPDX-License-Identifier: MIT
2
// Copyright contributors to the kepler.gl project
3

4
import * as arrow from 'apache-arrow';
5
import {parseInBatches} from '@loaders.gl/core';
6
import {JSONLoader, _JSONPath} from '@loaders.gl/json';
7
import {CSVLoader} from '@loaders.gl/csv';
8
import {GeoArrowLoader} from '@loaders.gl/arrow';
9
import {ParquetWasmLoader} from '@loaders.gl/parquet';
10
import {Loader} from '@loaders.gl/loader-utils';
11
import {
12
  isPlainObject,
13
  generateHashIdFromString,
14
  getApplicationConfig,
15
  getError
16
} from '@kepler.gl/utils';
17
import {generateHashId} from '@kepler.gl/common-utils';
18
import {DATASET_FORMATS} from '@kepler.gl/constants';
19
import {AddDataToMapPayload, Feature, LoadedMap, ProcessorResult} from '@kepler.gl/types';
20
import {KeplerTable} from '@kepler.gl/table';
21
import {FeatureCollection} from '@turf/helpers';
22

23
import {
24
  processArrowBatches,
25
  processGeojson,
26
  processKeplerglJSON,
27
  processRowObject
28
} from './data-processor';
29

30
import {FileCacheItem, ValidKeplerGlMap} from './types';
31

32
const BATCH_TYPE = {
11✔
33
  METADATA: 'metadata',
34
  PARTIAL_RESULT: 'partial-result',
35
  FINAL_RESULT: 'final-result'
36
};
37

38
const CSV_LOADER_OPTIONS = {
11✔
39
  shape: 'object-row-table',
40
  dynamicTyping: false // not working for now
41
};
42

43
const ARROW_LOADER_OPTIONS = {
11✔
44
  shape: 'arrow-table',
45
  batchDebounceMs: 10 // time to delay between batches, for incremental loading
46
};
47

48
const PARQUET_LOADER_OPTIONS = {
11✔
49
  shape: 'arrow-table'
50
};
51

52
const JSON_LOADER_OPTIONS = {
11✔
53
  shape: 'object-row-table',
54
  // instruct loaders.gl on what json paths to stream
55
  jsonpaths: [
56
    '$', // JSON Row array
57
    '$.features', // GeoJSON
58
    '$.datasets' // KeplerGL JSON
59
  ]
60
};
61

62
export type ProcessFileDataContent = {
63
  data: unknown;
64
  fileName: string;
65
  length?: number;
66
  progress?: {rowCount?: number; rowCountInBatch?: number; percent?: number};
67
  /**  metadata e.g. for arrow data, metadata could be the schema.fields */
68
  metadata?: Map<string, string>;
69
};
70

71
/**
72
 * check if table is an ArrowTable object
73
 * @param table - object to check
74
 * @returns {boolean} - true if table is an ArrowTable object type guarded
75
 */
76
export function isArrowTable(table: any): table is arrow.Table {
77
  return Boolean(table instanceof arrow.Table);
12✔
78
}
79

80
/**
81
 * check if data is an ArrowData object, which is an array of RecordBatch
82
 * @param data - object to check
83
 * @returns {boolean} - true if data is an ArrowData object type guarded
84
 */
85
export function isArrowData(data: any): boolean {
86
  return Array.isArray(data) && Boolean(data.length && data[0].data && data[0].schema);
5!
87
}
88

89
export function isGeoJson(json: unknown): json is Feature | FeatureCollection {
90
  // json can be feature collection
91
  // or single feature
92
  return isPlainObject(json) && (isFeature(json) || isFeatureCollection(json));
2✔
93
}
94

95
export function isFeature(json: unknown): json is Feature {
96
  return isPlainObject(json) && json.type === 'Feature' && Boolean(json.geometry);
2✔
97
}
98

99
export function isFeatureCollection(json: unknown): json is FeatureCollection {
100
  return isPlainObject(json) && json.type === 'FeatureCollection' && Boolean(json.features);
1✔
101
}
102

103
export function isRowObject(json: any): boolean {
104
  return Array.isArray(json) && isPlainObject(json[0]);
4✔
105
}
106

107
export function isKeplerGlMap(json: unknown): json is ValidKeplerGlMap {
108
  return Boolean(
8✔
109
    isPlainObject(json) &&
22✔
110
      json.datasets &&
111
      json.config &&
112
      json.info &&
113
      isPlainObject(json.info) &&
114
      json.info.app === 'kepler.gl'
115
  );
116
}
117

118
export async function* makeProgressIterator(
119
  asyncIterator: AsyncIterable<any>,
120
  info: {size: number}
121
): AsyncGenerator {
122
  let rowCount = 0;
6✔
123

124
  for await (const batch of asyncIterator) {
6✔
125
    // the length could be stored in `batch.length` for arrow batch
126
    const rowCountInBatch = (batch.data && (batch.data.length || batch.length)) || 0;
18✔
127
    rowCount += rowCountInBatch;
18✔
128
    const percent = Number.isFinite(batch.bytesUsed) ? batch.bytesUsed / info.size : null;
18✔
129

130
    // Update progress object
131
    const progress = {
18✔
132
      rowCount,
133
      rowCountInBatch,
134
      ...(Number.isFinite(percent) ? {percent} : {})
18✔
135
    };
136

137
    yield {...batch, progress};
18✔
138
  }
139
}
140

141
// eslint-disable-next-line complexity
142
export async function* readBatch(
143
  asyncIterator: AsyncIterable<any>,
144
  fileName: string
145
): AsyncGenerator {
146
  let result = null;
5✔
147
  const batches = <any>[];
5✔
148
  for await (const batch of asyncIterator) {
5✔
149
    // Last batch will have this special type and will provide all the root
150
    // properties of the parsed document.
151
    // Only json parse will have `FINAL_RESULT`
152
    if (batch.batchType === BATCH_TYPE.FINAL_RESULT) {
16✔
153
      if (batch.container) {
4!
154
        result = {...batch.container};
4✔
155
      }
156
      // Set the streamed data correctly is Batch json path is set
157
      // and the path streamed is not the top level object (jsonpath = '$')
158
      if (batch.jsonpath && batch.jsonpath.length > 1) {
4✔
159
        const streamingPath = new _JSONPath(batch.jsonpath);
2✔
160
        streamingPath.setFieldAtPath(result, batches);
2✔
161
      } else if (batch.jsonpath && batch.jsonpath.length === 1) {
2✔
162
        // The streamed object is a ROW JSON-batch (jsonpath = '$')
163
        // row objects
164
        result = batches;
1✔
165
      }
166
    } else {
167
      const batchData = isArrowTable(batch.data) ? batch.data.batches : batch.data;
12!
168
      for (let i = 0; i < batchData?.length; i++) {
12✔
169
        batches.push(batchData[i]);
48✔
170
      }
171
    }
172

173
    yield {
16✔
174
      ...batch,
175
      ...(batch.schema ? {headers: Object.keys(batch.schema)} : {}),
16✔
176
      fileName,
177
      // if dataset is CSV, data is set to the raw batches
178
      data: result ? result : batches
16✔
179
    };
180
  }
181
}
182

183
export async function readFileInBatches({
184
  file,
185
  loaders = [],
5✔
186
  loadOptions = {}
5✔
187
}: {
188
  file: File;
189
  fileCache: FileCacheItem[];
190
  loaders: Loader[];
191
  loadOptions: any;
192
}): Promise<AsyncGenerator> {
193
  loaders = [JSONLoader, CSVLoader, GeoArrowLoader, ParquetWasmLoader, ...loaders];
5✔
194
  loadOptions = {
5✔
195
    csv: CSV_LOADER_OPTIONS,
196
    arrow: ARROW_LOADER_OPTIONS,
197
    json: JSON_LOADER_OPTIONS,
198
    parquet: PARQUET_LOADER_OPTIONS,
199
    metadata: true,
200
    ...loadOptions
201
  };
202

203
  const batchIterator = await parseInBatches(file, loaders, loadOptions);
5✔
204
  const progressIterator = makeProgressIterator(batchIterator, {size: file.size});
5✔
205

206
  return readBatch(progressIterator, file.name);
5✔
207
}
208

209
export async function processFileData({
210
  content,
211
  fileCache
212
}: {
213
  content: ProcessFileDataContent;
214
  fileCache: FileCacheItem[];
215
}): Promise<FileCacheItem[]> {
216
  const {fileName, data} = content;
5✔
217
  let format: string | undefined;
218
  let processor: ((data: any) => ProcessorResult | LoadedMap | null) | undefined;
219
  console.log('Processing file', fileName);
5✔
220
  // generate unique id with length of 4 using fileName string
221
  const id = generateHashIdFromString(fileName);
5✔
222
  // decide on which table class to use based on application config
223
  const table = getApplicationConfig().table ?? KeplerTable;
5✔
224

225
  if (typeof table.getFileProcessor === 'function') {
5!
226
    // use custom processors from table class
NEW
227
    const processorResult = table.getFileProcessor(data);
×
NEW
228
    format = processorResult.format;
×
NEW
229
    processor = processorResult.processor;
×
230
  } else {
231
    // use default processors
232
    if (isArrowData(data)) {
5!
233
      format = DATASET_FORMATS.arrow;
×
234
      processor = processArrowBatches;
×
235
    } else if (isKeplerGlMap(data)) {
5✔
236
      format = DATASET_FORMATS.keplergl;
1✔
237
      processor = processKeplerglJSON;
1✔
238
    } else if (isRowObject(data)) {
4✔
239
      // csv file goes here
240
      format = DATASET_FORMATS.row;
2✔
241
      processor = processRowObject;
2✔
242
    } else if (isGeoJson(data)) {
2!
243
      format = DATASET_FORMATS.geojson;
2✔
244
      processor = processGeojson;
2✔
245
    }
246
  }
247
  if (format && processor) {
5!
248
    // eslint-disable-next-line no-useless-catch
249
    let result;
250
    try {
5✔
251
      result = await processor(data);
5✔
252
    } catch (error) {
NEW
253
      throw new Error(`Can not process uploaded file, ${getError(error as Error)}`);
×
254
    }
255

256
    return [
5✔
257
      ...fileCache,
258
      {
259
        data: result,
260
        info: {
261
          id,
262
          label: content.fileName,
263
          format
264
        }
265
      }
266
    ];
267
  } else {
NEW
268
    throw new Error('Can not process uploaded file, unknown file format');
×
269
  }
270
}
271

272
export function filesToDataPayload(fileCache: FileCacheItem[]): AddDataToMapPayload[] {
273
  // seperate out files which could be a single datasets. or a keplergl map json
274
  const collection = fileCache.reduce<{
1✔
275
    datasets: FileCacheItem[];
276
    keplerMaps: AddDataToMapPayload[];
277
  }>(
278
    (accu, file) => {
279
      const {data, info} = file;
2✔
280
      if (info?.format === DATASET_FORMATS.keplergl) {
2✔
281
        // if file contains a single kepler map dataset & config
282
        accu.keplerMaps.push({
1✔
283
          ...data,
284
          options: {
285
            centerMap: !(data.config && data.config.mapState)
2✔
286
          }
287
        });
288
      } else if (DATASET_FORMATS[info?.format]) {
1!
289
        // if file contains only data
290
        const newDataset = {
1✔
291
          data,
292
          info: {
293
            id: info?.id || generateHashId(4),
2✔
294
            ...(info || {})
1!
295
          }
296
        };
297
        accu.datasets.push(newDataset);
1✔
298
      }
299
      return accu;
2✔
300
    },
301
    {datasets: [], keplerMaps: []}
302
  );
303

304
  // add kepler map first with config
305
  // add datasets later in one add data call
306
  return collection.keplerMaps.concat({datasets: collection.datasets});
1✔
307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc