• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9811

pending completion
#9811

push

travis_ci

web-flow
[To rel/1.2] Fixed the prompt after CSV file import (#10836)

1 of 1 new or added line in 1 file covered. (100.0%)

79681 of 165738 relevant lines covered (48.08%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tool;
21

22
import org.apache.iotdb.cli.utils.IoTPrinter;
23
import org.apache.iotdb.commons.exception.IllegalPathException;
24
import org.apache.iotdb.commons.utils.PathUtils;
25
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
26
import org.apache.iotdb.db.utils.DateTimeUtils;
27
import org.apache.iotdb.db.utils.constant.SqlConstant;
28
import org.apache.iotdb.exception.ArgsErrorException;
29
import org.apache.iotdb.isession.SessionDataSet;
30
import org.apache.iotdb.rpc.IoTDBConnectionException;
31
import org.apache.iotdb.rpc.StatementExecutionException;
32
import org.apache.iotdb.session.Session;
33
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
34
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
35
import org.apache.iotdb.tsfile.read.common.Field;
36
import org.apache.iotdb.tsfile.read.common.RowRecord;
37

38
import org.apache.commons.cli.CommandLine;
39
import org.apache.commons.cli.CommandLineParser;
40
import org.apache.commons.cli.DefaultParser;
41
import org.apache.commons.cli.HelpFormatter;
42
import org.apache.commons.cli.Option;
43
import org.apache.commons.cli.Options;
44
import org.apache.commons.csv.CSVFormat;
45
import org.apache.commons.csv.CSVParser;
46
import org.apache.commons.csv.CSVRecord;
47
import org.apache.commons.lang3.StringUtils;
48
import org.apache.thrift.annotation.Nullable;
49

50
import java.io.File;
51
import java.io.FileInputStream;
52
import java.io.IOException;
53
import java.io.InputStreamReader;
54
import java.util.ArrayList;
55
import java.util.Arrays;
56
import java.util.HashMap;
57
import java.util.HashSet;
58
import java.util.List;
59
import java.util.Map;
60
import java.util.Objects;
61
import java.util.Set;
62
import java.util.concurrent.atomic.AtomicInteger;
63
import java.util.concurrent.atomic.AtomicReference;
64
import java.util.regex.Matcher;
65
import java.util.regex.Pattern;
66
import java.util.stream.Collectors;
67
import java.util.stream.Stream;
68

69
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
70
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.DOUBLE;
71
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.FLOAT;
72
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT32;
73
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.INT64;
74
import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.TEXT;
75

76
public class ImportCsv extends AbstractCsvTool {
×
77

78
  private static final String FILE_ARGS = "f";
79
  private static final String FILE_NAME = "file or folder";
80

81
  private static final String FAILED_FILE_ARGS = "fd";
82
  private static final String FAILED_FILE_NAME = "failed file directory";
83

84
  private static final String BATCH_POINT_SIZE_ARGS = "batch";
85
  private static final String BATCH_POINT_SIZE_NAME = "batch point size";
86

87
  private static final String ALIGNED_ARGS = "aligned";
88
  private static final String ALIGNED_NAME = "use the aligned interface";
89

90
  private static final String CSV_SUFFIXS = "csv";
91
  private static final String TXT_SUFFIXS = "txt";
92

93
  private static final String TIMESTAMP_PRECISION_ARGS = "tp";
94
  private static final String TIMESTAMP_PRECISION_NAME = "timestamp precision (ms/us/ns)";
95

96
  private static final String TYPE_INFER_ARGS = "typeInfer";
97
  private static final String TYPE_INFER_ARGS_NAME = "type infer";
98

99
  private static final String LINES_PER_FAILED_FILE_ARGS = "linesPerFailedFile";
100
  private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "Lines Per FailedFile";
101

102
  private static final String TSFILEDB_CLI_PREFIX = "ImportCsv";
103

104
  private static String targetPath;
105
  private static String failedFileDirectory = null;
×
106
  private static int linesPerFailedFile = 10000;
×
107
  private static Boolean aligned = false;
×
108

109
  private static String timeColumn = "Time";
×
110
  private static String deviceColumn = "Device";
×
111

112
  private static int batchPointSize = 100_000;
×
113

114
  private static String timestampPrecision = "ms";
×
115

116
  private static final String DATATYPE_BOOLEAN = "boolean";
117
  private static final String DATATYPE_INT = "int";
118
  private static final String DATATYPE_LONG = "long";
119
  private static final String DATATYPE_FLOAT = "float";
120
  private static final String DATATYPE_DOUBLE = "double";
121
  private static final String DATATYPE_NAN = "NaN";
122
  private static final String DATATYPE_TEXT = "text";
123

124
  private static final String DATATYPE_NULL = "null";
125

126
  private static final String INSERT_CSV_MEET_ERROR_MSG = "Meet error when insert csv because ";
127

128
  private static final Map<String, TSDataType> TYPE_INFER_KEY_DICT = new HashMap<>();
×
129

130
  static {
131
    TYPE_INFER_KEY_DICT.put(DATATYPE_BOOLEAN, TSDataType.BOOLEAN);
×
132
    TYPE_INFER_KEY_DICT.put(DATATYPE_INT, TSDataType.FLOAT);
×
133
    TYPE_INFER_KEY_DICT.put(DATATYPE_LONG, TSDataType.DOUBLE);
×
134
    TYPE_INFER_KEY_DICT.put(DATATYPE_FLOAT, TSDataType.FLOAT);
×
135
    TYPE_INFER_KEY_DICT.put(DATATYPE_DOUBLE, TSDataType.DOUBLE);
×
136
    TYPE_INFER_KEY_DICT.put(DATATYPE_NAN, TSDataType.DOUBLE);
×
137
  }
138

139
  private static final Map<String, TSDataType> TYPE_INFER_VALUE_DICT = new HashMap<>();
×
140

141
  static {
142
    TYPE_INFER_VALUE_DICT.put(DATATYPE_BOOLEAN, TSDataType.BOOLEAN);
×
143
    TYPE_INFER_VALUE_DICT.put(DATATYPE_INT, TSDataType.INT32);
×
144
    TYPE_INFER_VALUE_DICT.put(DATATYPE_LONG, TSDataType.INT64);
×
145
    TYPE_INFER_VALUE_DICT.put(DATATYPE_FLOAT, TSDataType.FLOAT);
×
146
    TYPE_INFER_VALUE_DICT.put(DATATYPE_DOUBLE, TSDataType.DOUBLE);
×
147
    TYPE_INFER_VALUE_DICT.put(DATATYPE_TEXT, TSDataType.TEXT);
×
148
  }
×
149

150
  /**
151
   * create the commandline options.
152
   *
153
   * @return object Options
154
   */
155
  private static Options createOptions() {
156
    Options options = createNewOptions();
×
157

158
    Option opFile =
×
159
        Option.builder(FILE_ARGS)
×
160
            .required()
×
161
            .argName(FILE_NAME)
×
162
            .hasArg()
×
163
            .desc(
×
164
                "If input a file path, load a csv file, "
165
                    + "otherwise load all csv file under this directory (required)")
166
            .build();
×
167
    options.addOption(opFile);
×
168

169
    Option opFailedFile =
×
170
        Option.builder(FAILED_FILE_ARGS)
×
171
            .argName(FAILED_FILE_NAME)
×
172
            .hasArg()
×
173
            .desc(
×
174
                "Specifying a directory to save failed file, default YOUR_CSV_FILE_PATH (optional)")
175
            .build();
×
176
    options.addOption(opFailedFile);
×
177

178
    Option opAligned =
×
179
        Option.builder(ALIGNED_ARGS)
×
180
            .argName(ALIGNED_NAME)
×
181
            .hasArg()
×
182
            .desc("Whether to use the interface of aligned (optional)")
×
183
            .build();
×
184
    options.addOption(opAligned);
×
185

186
    Option opHelp =
×
187
        Option.builder(HELP_ARGS)
×
188
            .longOpt(HELP_ARGS)
×
189
            .hasArg(false)
×
190
            .desc("Display help information")
×
191
            .build();
×
192
    options.addOption(opHelp);
×
193

194
    Option opTimeZone =
×
195
        Option.builder(TIME_ZONE_ARGS)
×
196
            .argName(TIME_ZONE_NAME)
×
197
            .hasArg()
×
198
            .desc("Time Zone eg. +08:00 or -01:00 (optional)")
×
199
            .build();
×
200
    options.addOption(opTimeZone);
×
201

202
    Option opBatchPointSize =
×
203
        Option.builder(BATCH_POINT_SIZE_ARGS)
×
204
            .argName(BATCH_POINT_SIZE_NAME)
×
205
            .hasArg()
×
206
            .desc("100000 (optional)")
×
207
            .build();
×
208
    options.addOption(opBatchPointSize);
×
209

210
    Option opTimestampPrecision =
×
211
        Option.builder(TIMESTAMP_PRECISION_ARGS)
×
212
            .argName(TIMESTAMP_PRECISION_NAME)
×
213
            .hasArg()
×
214
            .desc("Timestamp precision (ms/us/ns)")
×
215
            .build();
×
216

217
    options.addOption(opTimestampPrecision);
×
218

219
    Option opTypeInfer =
×
220
        Option.builder(TYPE_INFER_ARGS)
×
221
            .argName(TYPE_INFER_ARGS_NAME)
×
222
            .numberOfArgs(5)
×
223
            .hasArgs()
×
224
            .valueSeparator(',')
×
225
            .desc("Define type info by option:\"boolean=text,int=long, ...")
×
226
            .build();
×
227
    options.addOption(opTypeInfer);
×
228

229
    Option opFailedLinesPerFile =
×
230
        Option.builder(LINES_PER_FAILED_FILE_ARGS)
×
231
            .argName(LINES_PER_FAILED_FILE_ARGS_NAME)
×
232
            .hasArgs()
×
233
            .desc("Lines per failedfile")
×
234
            .build();
×
235
    options.addOption(opFailedLinesPerFile);
×
236

237
    return options;
×
238
  }
239

240
  /**
241
   * parse optional params
242
   *
243
   * @param commandLine
244
   */
245
  private static void parseSpecialParams(CommandLine commandLine) throws ArgsErrorException {
246
    timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS);
×
247
    targetPath = commandLine.getOptionValue(FILE_ARGS);
×
248
    if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
×
249
      batchPointSize = Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
×
250
    }
251
    if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
×
252
      failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
×
253
      File file = new File(failedFileDirectory);
×
254
      if (!file.isDirectory()) {
×
255
        file.mkdir();
×
256
        failedFileDirectory = file.getAbsolutePath() + File.separator;
×
257
      }
258
    }
259
    if (commandLine.getOptionValue(ALIGNED_ARGS) != null) {
×
260
      aligned = Boolean.valueOf(commandLine.getOptionValue(ALIGNED_ARGS));
×
261
    }
262

263
    if (commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS) != null) {
×
264
      timestampPrecision = commandLine.getOptionValue(TIMESTAMP_PRECISION_ARGS);
×
265
    }
266
    final String[] opTypeInferValues = commandLine.getOptionValues(TYPE_INFER_ARGS);
×
267
    if (opTypeInferValues != null && opTypeInferValues.length > 0) {
×
268
      for (String opTypeInferValue : opTypeInferValues) {
×
269
        if (opTypeInferValue.contains("=")) {
×
270
          final String[] typeInfoExpressionArr = opTypeInferValue.split("=");
×
271
          final String key = typeInfoExpressionArr[0];
×
272
          final String value = typeInfoExpressionArr[1];
×
273
          applyTypeInferArgs(key, value);
×
274
        }
275
      }
276
    }
277
    if (commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS) != null) {
×
278
      linesPerFailedFile = Integer.parseInt(commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS));
×
279
    }
280
  }
×
281

282
  private static void applyTypeInferArgs(String key, String value) throws ArgsErrorException {
283
    if (!TYPE_INFER_KEY_DICT.containsKey(key)) {
×
284
      throw new ArgsErrorException("Unknown type infer key: " + key);
×
285
    }
286
    if (!TYPE_INFER_VALUE_DICT.containsKey(value)) {
×
287
      throw new ArgsErrorException("Unknown type infer value: " + value);
×
288
    }
289
    if (key.equals(DATATYPE_NAN)
×
290
        && !(value.equals(DATATYPE_FLOAT)
×
291
            || value.equals(DATATYPE_DOUBLE)
×
292
            || value.equals(DATATYPE_TEXT))) {
×
293
      throw new ArgsErrorException("NaN can not convert to " + value);
×
294
    }
295
    if (key.equals(DATATYPE_BOOLEAN)
×
296
        && !(value.equals(DATATYPE_BOOLEAN) || value.equals(DATATYPE_TEXT))) {
×
297
      throw new ArgsErrorException("Boolean can not convert to " + value);
×
298
    }
299
    final TSDataType srcType = TYPE_INFER_VALUE_DICT.get(key);
×
300
    final TSDataType dstType = TYPE_INFER_VALUE_DICT.get(value);
×
301
    if (dstType.getType() < srcType.getType()) {
×
302
      throw new ArgsErrorException(key + " can not convert to " + value);
×
303
    }
304
    TYPE_INFER_KEY_DICT.put(key, TYPE_INFER_VALUE_DICT.get(value));
×
305
  }
×
306

307
  public static void main(String[] args) throws IoTDBConnectionException {
308
    Options options = createOptions();
×
309
    HelpFormatter hf = new HelpFormatter();
×
310
    hf.setOptionComparator(null);
×
311
    hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
×
312
    CommandLine commandLine = null;
×
313
    CommandLineParser parser = new DefaultParser();
×
314

315
    if (args == null || args.length == 0) {
×
316
      IoTPrinter.println("Too few params input, please check the following hint.");
×
317
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
318
      System.exit(CODE_ERROR);
×
319
    }
320
    try {
321
      commandLine = parser.parse(options, args);
×
322
    } catch (org.apache.commons.cli.ParseException e) {
×
323
      IoTPrinter.println("Parse error: " + e.getMessage());
×
324
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
325
      System.exit(CODE_ERROR);
×
326
    }
×
327
    if (commandLine.hasOption(HELP_ARGS)) {
×
328
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
329
      System.exit(CODE_ERROR);
×
330
    }
331

332
    try {
333
      parseBasicParams(commandLine);
×
334
      String filename = commandLine.getOptionValue(FILE_ARGS);
×
335
      if (filename == null) {
×
336
        hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
337
        System.exit(CODE_ERROR);
×
338
      }
339
      parseSpecialParams(commandLine);
×
340
    } catch (ArgsErrorException e) {
×
341
      IoTPrinter.println("Args error: " + e.getMessage());
×
342
      System.exit(CODE_ERROR);
×
343
    } catch (Exception e) {
×
344
      IoTPrinter.println("Encounter an error, because: " + e.getMessage());
×
345
      System.exit(CODE_ERROR);
×
346
    }
×
347

348
    System.exit(
×
349
        importFromTargetPath(
×
350
            host, Integer.parseInt(port), username, password, targetPath, timeZoneID));
×
351
  }
×
352

353
  /**
354
   * Specifying a CSV file or a directory including CSV files that you want to import. This method
355
   * can be offered to console cli to implement importing CSV file by command.
356
   *
357
   * @param host
358
   * @param port
359
   * @param username
360
   * @param password
361
   * @param targetPath a CSV file or a directory including CSV files
362
   * @param timeZone
363
   * @return the status code
364
   * @throws IoTDBConnectionException
365
   */
366
  @SuppressWarnings({"squid:S2093"}) // ignore try-with-resources
367
  public static int importFromTargetPath(
368
      String host, int port, String username, String password, String targetPath, String timeZone)
369
      throws IoTDBConnectionException {
370
    try {
371
      session = new Session(host, port, username, password, false);
×
372
      session.open(false);
×
373
      timeZoneID = timeZone;
×
374
      setTimeZone();
×
375

376
      File file = new File(targetPath);
×
377
      if (file.isFile()) {
×
378
        importFromSingleFile(file);
×
379
      } else if (file.isDirectory()) {
×
380
        File[] files = file.listFiles();
×
381
        if (files == null) {
×
382
          return CODE_OK;
×
383
        }
384

385
        for (File subFile : files) {
×
386
          if (subFile.isFile()) {
×
387
            importFromSingleFile(subFile);
×
388
          }
389
        }
390
      } else {
×
391
        IoTPrinter.println("File not found!");
×
392
        return CODE_ERROR;
×
393
      }
394
    } catch (IoTDBConnectionException | StatementExecutionException e) {
×
395
      IoTPrinter.println("Encounter an error when connecting to server, because " + e.getMessage());
×
396
      return CODE_ERROR;
×
397
    } finally {
398
      if (session != null) {
×
399
        session.close();
×
400
      }
401
    }
402
    return CODE_OK;
×
403
  }
404

405
  /**
406
   * import the CSV file and load headers and records.
407
   *
408
   * @param file the File object of the CSV file that you want to import.
409
   */
410
  private static void importFromSingleFile(File file) {
411
    if (file.getName().endsWith(CSV_SUFFIXS) || file.getName().endsWith(TXT_SUFFIXS)) {
×
412
      try {
413
        CSVParser csvRecords = readCsvFile(file.getAbsolutePath());
×
414
        List<String> headerNames = csvRecords.getHeaderNames();
×
415
        Stream<CSVRecord> records = csvRecords.stream();
×
416
        if (headerNames.isEmpty()) {
×
417
          IoTPrinter.println("Empty file!");
×
418
          return;
×
419
        }
420
        if (!timeColumn.equalsIgnoreCase(headerNames.get(0))) {
×
421
          IoTPrinter.println("No headers!");
×
422
          return;
×
423
        }
424
        String failedFilePath = null;
×
425
        if (failedFileDirectory == null) {
×
426
          failedFilePath = file.getAbsolutePath() + ".failed";
×
427
        } else {
428
          failedFilePath = failedFileDirectory + file.getName() + ".failed";
×
429
        }
430
        if (!deviceColumn.equalsIgnoreCase(headerNames.get(1))) {
×
431
          writeDataAlignedByTime(headerNames, records, failedFilePath);
×
432
        } else {
433
          writeDataAlignedByDevice(headerNames, records, failedFilePath);
×
434
        }
435
      } catch (IOException | IllegalPathException e) {
×
436
        IoTPrinter.println("CSV file read exception because: " + e.getMessage());
×
437
      }
×
438
    } else {
439
      IoTPrinter.println("The file name must end with \"csv\" or \"txt\"!");
×
440
    }
441
  }
×
442

443
  /**
444
   * if the data is aligned by time, the data will be written by this method.
445
   *
446
   * @param headerNames the header names of CSV file
447
   * @param records the records of CSV file
448
   * @param failedFilePath the directory to save the failed files
449
   */
450
  @SuppressWarnings("squid:S3776")
451
  private static void writeDataAlignedByTime(
452
      List<String> headerNames, Stream<CSVRecord> records, String failedFilePath)
453
      throws IllegalPathException {
454
    HashMap<String, List<String>> deviceAndMeasurementNames = new HashMap<>();
×
455
    HashMap<String, TSDataType> headerTypeMap = new HashMap<>();
×
456
    HashMap<String, String> headerNameMap = new HashMap<>();
×
457
    parseHeaders(headerNames, deviceAndMeasurementNames, headerTypeMap, headerNameMap);
×
458

459
    Set<String> devices = deviceAndMeasurementNames.keySet();
×
460
    if (headerTypeMap.isEmpty()) {
×
461
      try {
462
        queryType(devices, headerTypeMap, "Time");
×
463
      } catch (IoTDBConnectionException e) {
×
464
        IoTPrinter.printException(e);
×
465
      }
×
466
    }
467

468
    List<String> deviceIds = new ArrayList<>();
×
469
    List<Long> times = new ArrayList<>();
×
470
    List<List<String>> measurementsList = new ArrayList<>();
×
471
    List<List<TSDataType>> typesList = new ArrayList<>();
×
472
    List<List<Object>> valuesList = new ArrayList<>();
×
473

474
    AtomicReference<Boolean> hasStarted = new AtomicReference<>(false);
×
475
    AtomicInteger pointSize = new AtomicInteger(0);
×
476

477
    ArrayList<List<Object>> failedRecords = new ArrayList<>();
×
478

479
    records.forEach(
×
480
        recordObj -> {
481
          if (Boolean.FALSE.equals(hasStarted.get())) {
×
482
            hasStarted.set(true);
×
483
          } else if (pointSize.get() >= batchPointSize) {
×
484
            writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
×
485
            pointSize.set(0);
×
486
          }
487

488
          boolean isFail = false;
×
489

490
          for (Map.Entry<String, List<String>> entry : deviceAndMeasurementNames.entrySet()) {
×
491
            String deviceId = entry.getKey();
×
492
            List<String> measurementNames = entry.getValue();
×
493
            ArrayList<TSDataType> types = new ArrayList<>();
×
494
            ArrayList<Object> values = new ArrayList<>();
×
495
            ArrayList<String> measurements = new ArrayList<>();
×
496
            for (String measurement : measurementNames) {
×
497
              String header = deviceId + "." + measurement;
×
498
              String value = recordObj.get(headerNameMap.get(header));
×
499
              if (!"".equals(value)) {
×
500
                TSDataType type;
501
                if (!headerTypeMap.containsKey(header)) {
×
502
                  type = typeInfer(value);
×
503
                  if (type != null) {
×
504
                    headerTypeMap.put(header, type);
×
505
                  } else {
506
                    IoTPrinter.printf(
×
507
                        "Line '%s', column '%s': '%s' unknown type%n",
508
                        recordObj.getRecordNumber(), header, value);
×
509
                    isFail = true;
×
510
                  }
511
                }
512
                type = headerTypeMap.get(header);
×
513
                if (type != null) {
×
514
                  Object valueTrans = typeTrans(value, type);
×
515
                  if (valueTrans == null) {
×
516
                    isFail = true;
×
517
                    IoTPrinter.printf(
×
518
                        "Line '%s', column '%s': '%s' can't convert to '%s'%n",
519
                        recordObj.getRecordNumber(), header, value, type);
×
520
                  } else {
521
                    measurements.add(header.replace(deviceId + '.', ""));
×
522
                    types.add(type);
×
523
                    values.add(valueTrans);
×
524
                    pointSize.getAndIncrement();
×
525
                  }
526
                }
527
              }
528
            }
×
529
            if (!measurements.isEmpty()) {
×
530
              times.add(parseTimestamp(recordObj.get(timeColumn)));
×
531
              deviceIds.add(deviceId);
×
532
              typesList.add(types);
×
533
              valuesList.add(values);
×
534
              measurementsList.add(measurements);
×
535
            }
536
          }
×
537
          if (isFail) {
×
538
            failedRecords.add(recordObj.stream().collect(Collectors.toList()));
×
539
          }
540
        });
×
541
    if (!deviceIds.isEmpty()) {
×
542
      writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
×
543
      pointSize.set(0);
×
544
    }
545

546
    if (!failedRecords.isEmpty()) {
×
547
      writeFailedLinesFile(headerNames, failedFilePath, failedRecords);
×
548
    }
549
    if (Boolean.TRUE.equals(hasStarted.get())) {
×
550
      IoTPrinter.println("Import completely!");
×
551
    } else {
552
      IoTPrinter.println("No records!");
×
553
    }
554
  }
×
555

556
  /**
557
   * if the data is aligned by device, the data will be written by this method.
558
   *
559
   * @param headerNames the header names of CSV file
560
   * @param records the records of CSV file
561
   * @param failedFilePath the directory to save the failed files
562
   */
563
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
564
  private static void writeDataAlignedByDevice(
565
      List<String> headerNames, Stream<CSVRecord> records, String failedFilePath)
566
      throws IllegalPathException {
567
    HashMap<String, TSDataType> headerTypeMap = new HashMap<>();
×
568
    HashMap<String, String> headerNameMap = new HashMap<>();
×
569
    parseHeaders(headerNames, null, headerTypeMap, headerNameMap);
×
570

571
    AtomicReference<String> deviceName = new AtomicReference<>(null);
×
572

573
    HashSet<String> typeQueriedDevice = new HashSet<>();
×
574

575
    // the data that interface need
576
    List<Long> times = new ArrayList<>();
×
577
    List<List<TSDataType>> typesList = new ArrayList<>();
×
578
    List<List<Object>> valuesList = new ArrayList<>();
×
579
    List<List<String>> measurementsList = new ArrayList<>();
×
580

581
    AtomicInteger pointSize = new AtomicInteger(0);
×
582

583
    ArrayList<List<Object>> failedRecords = new ArrayList<>();
×
584

585
    records.forEach(
×
586
        recordObj -> {
587
          // only run in first record
588
          if (deviceName.get() == null) {
×
589
            deviceName.set(recordObj.get(1));
×
590
          } else if (!Objects.equals(deviceName.get(), recordObj.get(1))) {
×
591
            // if device changed
592
            writeAndEmptyDataSet(
×
593
                deviceName.get(), times, typesList, valuesList, measurementsList, 3);
×
594
            deviceName.set(recordObj.get(1));
×
595
            pointSize.set(0);
×
596
          } else if (pointSize.get() >= batchPointSize) {
×
597
            // insert a batch
598
            writeAndEmptyDataSet(
×
599
                deviceName.get(), times, typesList, valuesList, measurementsList, 3);
×
600
            pointSize.set(0);
×
601
          }
602

603
          // the data of the record
604
          ArrayList<TSDataType> types = new ArrayList<>();
×
605
          ArrayList<Object> values = new ArrayList<>();
×
606
          ArrayList<String> measurements = new ArrayList<>();
×
607

608
          AtomicReference<Boolean> isFail = new AtomicReference<>(false);
×
609

610
          // read data from record
611
          for (Map.Entry<String, String> headerNameEntry : headerNameMap.entrySet()) {
×
612
            // headerNameWithoutType is equal to headerName if the CSV column do not have data type.
613
            String headerNameWithoutType = headerNameEntry.getKey();
×
614
            String headerName = headerNameEntry.getValue();
×
615
            String value = recordObj.get(headerName);
×
616
            if (!"".equals(value)) {
×
617
              TSDataType type;
618
              // Get the data type directly if the CSV column have data type.
619
              if (!headerTypeMap.containsKey(headerNameWithoutType)) {
×
620
                boolean hasResult = false;
×
621
                // query the data type in iotdb
622
                if (!typeQueriedDevice.contains(deviceName.get())) {
×
623
                  try {
624
                    if (headerTypeMap.isEmpty()) {
×
625
                      Set<String> devices = new HashSet<>();
×
626
                      devices.add(deviceName.get());
×
627
                      hasResult = queryType(devices, headerTypeMap, deviceColumn);
×
628
                    }
629
                    typeQueriedDevice.add(deviceName.get());
×
630
                  } catch (IoTDBConnectionException e) {
×
631
                    IoTPrinter.printException(e);
×
632
                  }
×
633
                }
634
                if (!hasResult) {
×
635
                  type = typeInfer(value);
×
636
                  if (type != null) {
×
637
                    headerTypeMap.put(headerNameWithoutType, type);
×
638
                  } else {
639
                    IoTPrinter.printf(
×
640
                        "Line '%s', column '%s': '%s' unknown type%n",
641
                        recordObj.getRecordNumber(), headerNameWithoutType, value);
×
642
                    isFail.set(true);
×
643
                  }
644
                }
645
              }
646
              type = headerTypeMap.get(headerNameWithoutType);
×
647
              if (type != null) {
×
648
                Object valueTrans = typeTrans(value, type);
×
649
                if (valueTrans == null) {
×
650
                  isFail.set(true);
×
651
                  IoTPrinter.printf(
×
652
                      "Line '%s', column '%s': '%s' can't convert to '%s'%n",
653
                      recordObj.getRecordNumber(), headerNameWithoutType, value, type);
×
654
                } else {
655
                  values.add(valueTrans);
×
656
                  measurements.add(headerNameWithoutType);
×
657
                  types.add(type);
×
658
                  pointSize.getAndIncrement();
×
659
                }
660
              }
661
            }
662
          }
×
663
          if (Boolean.TRUE.equals(isFail.get())) {
×
664
            failedRecords.add(recordObj.stream().collect(Collectors.toList()));
×
665
          }
666
          if (!measurements.isEmpty()) {
×
667
            times.add(parseTimestamp(recordObj.get(timeColumn)));
×
668
            typesList.add(types);
×
669
            valuesList.add(values);
×
670
            measurementsList.add(measurements);
×
671
          }
672
        });
×
673
    if (times.isEmpty()) {
×
674
      writeAndEmptyDataSet(deviceName.get(), times, typesList, valuesList, measurementsList, 3);
×
675
      pointSize.set(0);
×
676
    }
677
    if (!failedRecords.isEmpty()) {
×
678
      writeFailedLinesFile(headerNames, failedFilePath, failedRecords);
×
679
    }
680
    IoTPrinter.println("Import completely!");
×
681
  }
×
682

683
  private static void writeFailedLinesFile(
684
      List<String> headerNames, String failedFilePath, ArrayList<List<Object>> failedRecords) {
685
    int fileIndex = 0;
×
686
    int from = 0;
×
687
    int failedRecordsSize = failedRecords.size();
×
688
    int restFailedRecords = failedRecordsSize;
×
689
    while (from < failedRecordsSize) {
×
690
      int step = Math.min(restFailedRecords, linesPerFailedFile);
×
691
      writeCsvFile(
×
692
          headerNames,
693
          failedRecords.subList(from, from + step),
×
694
          failedFilePath + "_" + fileIndex++);
695
      from += step;
×
696
      restFailedRecords -= step;
×
697
    }
×
698
  }
×
699

700
  private static void writeAndEmptyDataSet(
701
      String device,
702
      List<Long> times,
703
      List<List<TSDataType>> typesList,
704
      List<List<Object>> valuesList,
705
      List<List<String>> measurementsList,
706
      int retryTime) {
707
    try {
708
      if (Boolean.FALSE.equals(aligned)) {
×
709
        session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList);
×
710
      } else {
711
        session.insertAlignedRecordsOfOneDevice(
×
712
            device, times, measurementsList, typesList, valuesList);
713
      }
714
    } catch (IoTDBConnectionException e) {
×
715
      if (retryTime > 0) {
×
716
        try {
717
          session.open();
×
718
        } catch (IoTDBConnectionException ex) {
×
719
          IoTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
×
720
        }
×
721
        writeAndEmptyDataSet(device, times, typesList, valuesList, measurementsList, --retryTime);
×
722
      }
723
    } catch (StatementExecutionException e) {
×
724
      IoTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
×
725
    } finally {
726
      times.clear();
×
727
      typesList.clear();
×
728
      valuesList.clear();
×
729
      measurementsList.clear();
×
730
    }
731
  }
×
732

733
  private static void writeAndEmptyDataSet(
734
      List<String> deviceIds,
735
      List<Long> times,
736
      List<List<TSDataType>> typesList,
737
      List<List<Object>> valuesList,
738
      List<List<String>> measurementsList,
739
      int retryTime) {
740
    try {
741
      if (Boolean.FALSE.equals(aligned)) {
×
742
        session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
×
743
      } else {
744
        session.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
×
745
      }
746
    } catch (IoTDBConnectionException e) {
×
747
      if (retryTime > 0) {
×
748
        try {
749
          session.open();
×
750
        } catch (IoTDBConnectionException ex) {
×
751
          IoTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
×
752
        }
×
753
        writeAndEmptyDataSet(
×
754
            deviceIds, times, typesList, valuesList, measurementsList, --retryTime);
755
      }
756
    } catch (StatementExecutionException e) {
×
757
      IoTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
×
758
    } finally {
759
      deviceIds.clear();
×
760
      times.clear();
×
761
      typesList.clear();
×
762
      valuesList.clear();
×
763
      measurementsList.clear();
×
764
    }
765
  }
×
766

767
  /**
768
   * read data from the CSV file
769
   *
770
   * @param path
771
   * @return CSVParser csv parser
772
   * @throws IOException when reading the csv file failed.
773
   */
774
  private static CSVParser readCsvFile(String path) throws IOException {
775
    return CSVFormat.Builder.create(CSVFormat.DEFAULT)
×
776
        .setHeader()
×
777
        .setSkipHeaderRecord(true)
×
778
        .setQuote('`')
×
779
        .setEscape('\\')
×
780
        .setIgnoreEmptyLines(true)
×
781
        .build()
×
782
        .parse(new InputStreamReader(new FileInputStream(path)));
×
783
  }
784

785
  /**
786
   * parse deviceNames, measurementNames(aligned by time), headerType from headers
787
   *
788
   * @param headerNames
789
   * @param deviceAndMeasurementNames
790
   * @param headerTypeMap
791
   * @param headerNameMap
792
   */
793
  @SuppressWarnings(
794
      "squid:S135") // ignore for loops should not contain more than a single "break" or "continue"
795
  // statement
796
  private static void parseHeaders(
797
      List<String> headerNames,
798
      @Nullable HashMap<String, List<String>> deviceAndMeasurementNames,
799
      HashMap<String, TSDataType> headerTypeMap,
800
      HashMap<String, String> headerNameMap)
801
      throws IllegalPathException {
802
    String regex = "(?<=\\()\\S+(?=\\))";
×
803
    Pattern pattern = Pattern.compile(regex);
×
804
    for (String headerName : headerNames) {
×
805
      if ("Time".equalsIgnoreCase(headerName)) {
×
806
        timeColumn = headerName;
×
807
        continue;
×
808
      } else if ("Device".equalsIgnoreCase(headerName)) {
×
809
        deviceColumn = headerName;
×
810
        continue;
×
811
      }
812
      Matcher matcher = pattern.matcher(headerName);
×
813
      String type;
814
      String headerNameWithoutType;
815
      if (matcher.find()) {
×
816
        type = matcher.group();
×
817
        headerNameWithoutType = headerName.replace("(" + type + ")", "").replaceAll("\\s+", "");
×
818
        headerNameMap.put(headerNameWithoutType, headerName);
×
819
        headerTypeMap.put(headerNameWithoutType, getType(type));
×
820
      } else {
821
        headerNameWithoutType = headerName;
×
822
        headerNameMap.put(headerName, headerName);
×
823
      }
824
      String[] split = PathUtils.splitPathToDetachedNodes(headerNameWithoutType);
×
825
      String measurementName = split[split.length - 1];
×
826
      String deviceName = StringUtils.join(Arrays.copyOfRange(split, 0, split.length - 1), '.');
×
827
      if (deviceAndMeasurementNames != null) {
×
828
        deviceAndMeasurementNames.putIfAbsent(deviceName, new ArrayList<>());
×
829
        deviceAndMeasurementNames.get(deviceName).add(measurementName);
×
830
      }
831
    }
×
832
  }
×
833

834
  /**
835
   * query data type of timeseries from IoTDB
836
   *
837
   * @param deviceNames
838
   * @param headerTypeMap
839
   * @param alignedType
840
   * @throws IoTDBConnectionException
841
   * @throws StatementExecutionException
842
   */
843
  private static boolean queryType(
844
      Set<String> deviceNames, HashMap<String, TSDataType> headerTypeMap, String alignedType)
845
      throws IoTDBConnectionException {
846
    boolean hasResult = false;
×
847
    for (String deviceName : deviceNames) {
×
848
      String sql = "show timeseries " + deviceName + ".*";
×
849
      SessionDataSet sessionDataSet = null;
×
850
      try {
851
        sessionDataSet = session.executeQueryStatement(sql);
×
852
        int tsIndex = sessionDataSet.getColumnNames().indexOf(ColumnHeaderConstant.TIMESERIES);
×
853
        int dtIndex = sessionDataSet.getColumnNames().indexOf(ColumnHeaderConstant.DATATYPE);
×
854
        while (sessionDataSet.hasNext()) {
×
855
          hasResult = true;
×
856
          RowRecord rowRecord = sessionDataSet.next();
×
857
          List<Field> fields = rowRecord.getFields();
×
858
          String timeseries = fields.get(tsIndex).getStringValue();
×
859
          String dataType = fields.get(dtIndex).getStringValue();
×
860
          if (Objects.equals(alignedType, "Time")) {
×
861
            headerTypeMap.put(timeseries, getType(dataType));
×
862
          } else if (Objects.equals(alignedType, deviceColumn)) {
×
863
            String[] split = PathUtils.splitPathToDetachedNodes(timeseries);
×
864
            String measurement = split[split.length - 1];
×
865
            headerTypeMap.put(measurement, getType(dataType));
×
866
          }
867
        }
×
868
      } catch (StatementExecutionException | IllegalPathException e) {
×
869
        IoTPrinter.println(
×
870
            "Meet error when query the type of timeseries because " + e.getMessage());
×
871
        return false;
×
872
      }
×
873
    }
×
874
    return hasResult;
×
875
  }
876

877
  /**
878
   * return the TSDataType
879
   *
880
   * @param typeStr
881
   * @return
882
   */
883
  private static TSDataType getType(String typeStr) {
884
    switch (typeStr) {
×
885
      case "TEXT":
886
        return TEXT;
×
887
      case "BOOLEAN":
888
        return BOOLEAN;
×
889
      case "INT32":
890
        return INT32;
×
891
      case "INT64":
892
        return INT64;
×
893
      case "FLOAT":
894
        return FLOAT;
×
895
      case "DOUBLE":
896
        return DOUBLE;
×
897
      default:
898
        return null;
×
899
    }
900
  }
901

902
  /**
903
   * if data type of timeseries is not defined in headers of schema, this method will be called to
904
   * do type inference
905
   *
906
   * @param strValue
907
   * @return
908
   */
909
  private static TSDataType typeInfer(String strValue) {
910
    if (strValue.contains("\"")) {
×
911
      return TEXT;
×
912
    }
913
    if (isBoolean(strValue)) {
×
914
      return TYPE_INFER_KEY_DICT.get(DATATYPE_BOOLEAN);
×
915
    } else if (isNumber(strValue)) {
×
916
      if (!strValue.contains(TsFileConstant.PATH_SEPARATOR)) {
×
917
        if (isConvertFloatPrecisionLack(StringUtils.trim(strValue))) {
×
918
          return TYPE_INFER_KEY_DICT.get(DATATYPE_LONG);
×
919
        }
920
        return TYPE_INFER_KEY_DICT.get(DATATYPE_INT);
×
921
      } else {
922
        return TYPE_INFER_KEY_DICT.get(DATATYPE_FLOAT);
×
923
      }
924
    } else if (DATATYPE_NULL.equals(strValue) || DATATYPE_NULL.toUpperCase().equals(strValue)) {
×
925
      return null;
×
926
      // "NaN" is returned if the NaN Literal is given in Parser
927
    } else if (DATATYPE_NAN.equals(strValue)) {
×
928
      return TYPE_INFER_KEY_DICT.get(DATATYPE_NAN);
×
929
    } else {
930
      return TSDataType.TEXT;
×
931
    }
932
  }
933

934
  static boolean isNumber(String s) {
935
    if (s == null || s.equals(DATATYPE_NAN)) {
×
936
      return false;
×
937
    }
938
    try {
939
      Double.parseDouble(s);
×
940
    } catch (NumberFormatException e) {
×
941
      return false;
×
942
    }
×
943
    return true;
×
944
  }
945

946
  private static boolean isBoolean(String s) {
947
    return s.equalsIgnoreCase(SqlConstant.BOOLEAN_TRUE)
×
948
        || s.equalsIgnoreCase(SqlConstant.BOOLEAN_FALSE);
×
949
  }
950

951
  private static boolean isConvertFloatPrecisionLack(String s) {
952
    return Long.parseLong(s) > (2 << 24);
×
953
  }
954

955
  /**
956
   * @param value
957
   * @param type
958
   * @return
959
   */
960
  private static Object typeTrans(String value, TSDataType type) {
961
    try {
962
      switch (type) {
×
963
        case TEXT:
964
          if (value.startsWith("\"") && value.endsWith("\"")) {
×
965
            return value.substring(1, value.length() - 1);
×
966
          }
967
          return value;
×
968
        case BOOLEAN:
969
          if (!"true".equalsIgnoreCase(value) && !"false".equalsIgnoreCase(value)) {
×
970
            return null;
×
971
          }
972
          return Boolean.parseBoolean(value);
×
973
        case INT32:
974
          return Integer.parseInt(value);
×
975
        case INT64:
976
          return Long.parseLong(value);
×
977
        case FLOAT:
978
          return Float.parseFloat(value);
×
979
        case DOUBLE:
980
          return Double.parseDouble(value);
×
981
        default:
982
          return null;
×
983
      }
984
    } catch (NumberFormatException e) {
×
985
      return null;
×
986
    }
987
  }
988

989
  private static long parseTimestamp(String str) {
990
    long timestamp;
991
    try {
992
      timestamp = Long.parseLong(str);
×
993
    } catch (NumberFormatException e) {
×
994
      timestamp = DateTimeUtils.convertDatetimeStrToLong(str, zoneId, timestampPrecision);
×
995
    }
×
996
    return timestamp;
×
997
  }
998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc