• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10006

06 Sep 2023 05:15AM CUT coverage: 47.697% (+0.006%) from 47.691%
#10006

push

travis_ci

web-flow
[RatisConsensus] retry cache expiration time should be longer than retriable-client wait duration (#11045) (#11052)

13 of 13 new or added lines in 1 file covered. (100.0%)

80213 of 168172 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tool;
21

22
import org.apache.iotdb.cli.utils.IoTPrinter;
23
import org.apache.iotdb.cli.utils.JlineUtils;
24
import org.apache.iotdb.exception.ArgsErrorException;
25
import org.apache.iotdb.isession.SessionDataSet;
26
import org.apache.iotdb.rpc.IoTDBConnectionException;
27
import org.apache.iotdb.rpc.StatementExecutionException;
28
import org.apache.iotdb.session.Session;
29
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
30
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
31
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
32
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
33
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
34
import org.apache.iotdb.tsfile.read.common.Field;
35
import org.apache.iotdb.tsfile.read.common.Path;
36
import org.apache.iotdb.tsfile.read.common.RowRecord;
37
import org.apache.iotdb.tsfile.write.TsFileWriter;
38
import org.apache.iotdb.tsfile.write.record.Tablet;
39
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
40

41
import org.apache.commons.cli.CommandLine;
42
import org.apache.commons.cli.CommandLineParser;
43
import org.apache.commons.cli.DefaultParser;
44
import org.apache.commons.cli.HelpFormatter;
45
import org.apache.commons.cli.Option;
46
import org.apache.commons.cli.Options;
47
import org.apache.commons.cli.ParseException;
48
import org.jline.reader.LineReader;
49

50
import java.io.BufferedReader;
51
import java.io.File;
52
import java.io.FileReader;
53
import java.io.IOException;
54
import java.nio.file.Files;
55
import java.util.ArrayList;
56
import java.util.HashSet;
57
import java.util.LinkedHashMap;
58
import java.util.List;
59
import java.util.Map;
60

61
public class ExportTsFile extends AbstractTsFileTool {
×
62

63
  private static final String TARGET_DIR_ARGS = "td";
64
  private static final String TARGET_DIR_NAME = "targetDirectory";
65
  private static final String TARGET_FILE_ARGS = "f";
66
  private static final String TARGET_FILE_NAME = "targetFile";
67

68
  private static final String SQL_FILE_ARGS = "s";
69
  private static final String SQL_FILE_NAME = "sqlfile";
70
  private static final String QUERY_COMMAND_ARGS = "q";
71
  private static final String QUERY_COMMAND_NAME = "queryCommand";
72
  private static final String DUMP_FILE_NAME_DEFAULT = "dump";
73
  private static final String TSFILEDB_CLI_PREFIX = "ExportTsFile";
74
  private static String targetDirectory;
75
  private static String targetFile = DUMP_FILE_NAME_DEFAULT;
×
76
  private static String queryCommand;
77

78
  private static long timeout = -1;
×
79

80
  @SuppressWarnings({
81
    "squid:S3776",
82
    "squid:S2093"
83
  }) // Suppress high Cognitive Complexity warning, ignore try-with-resources
84
  /** main function of export tsFile tool. */
85
  public static void main(String[] args) {
86
    Options options = createOptions();
×
87
    HelpFormatter hf = new HelpFormatter();
×
88
    CommandLine commandLine = null;
×
89
    CommandLineParser parser = new DefaultParser();
×
90
    hf.setOptionComparator(null); // avoid reordering
×
91
    hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
×
92

93
    if (args == null || args.length == 0) {
×
94
      IoTPrinter.println("Too few params input, please check the following hint.");
×
95
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
96
      System.exit(CODE_ERROR);
×
97
    }
98

99
    try {
100
      commandLine = parser.parse(options, args);
×
101
    } catch (ParseException e) {
×
102
      IoTPrinter.println(e.getMessage());
×
103
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
104
      System.exit(CODE_ERROR);
×
105
    }
×
106
    if (commandLine.hasOption(HELP_ARGS)) {
×
107
      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
×
108
      System.exit(CODE_ERROR);
×
109
    }
110

111
    int exitCode = CODE_OK;
×
112
    try {
113
      parseBasicParams(commandLine);
×
114
      parseSpecialParams(commandLine);
×
115

116
      session = new Session(host, Integer.parseInt(port), username, password);
×
117
      session.open(false);
×
118

119
      if (queryCommand == null) {
×
120
        String sqlFile = commandLine.getOptionValue(SQL_FILE_ARGS);
×
121
        String sql;
122

123
        if (sqlFile == null) {
×
124
          LineReader lineReader = JlineUtils.getLineReader(username, host, port);
×
125
          sql = lineReader.readLine(TSFILEDB_CLI_PREFIX + "> please input query: ");
×
126
          IoTPrinter.println(sql);
×
127
          String[] values = sql.trim().split(";");
×
128
          for (int i = 0; i < values.length; i++) {
×
129
            legalCheck(values[i]);
×
130
            dumpResult(values[i], i);
×
131
          }
132

133
        } else {
×
134
          dumpFromSqlFile(sqlFile);
×
135
        }
136
      } else {
×
137
        legalCheck(queryCommand);
×
138
        dumpResult(queryCommand, 0);
×
139
      }
140

141
    } catch (IOException e) {
×
142
      IoTPrinter.println("Failed to operate on file, because " + e.getMessage());
×
143
      exitCode = CODE_ERROR;
×
144
    } catch (ArgsErrorException e) {
×
145
      IoTPrinter.println("Invalid args: " + e.getMessage());
×
146
      exitCode = CODE_ERROR;
×
147
    } catch (IoTDBConnectionException e) {
×
148
      IoTPrinter.println("Connect failed because " + e.getMessage());
×
149
      exitCode = CODE_ERROR;
×
150
    } finally {
151
      if (session != null) {
×
152
        try {
153
          session.close();
×
154
        } catch (IoTDBConnectionException e) {
×
155
          exitCode = CODE_ERROR;
×
156
          IoTPrinter.println(
×
157
              "Encounter an error when closing session, error is: " + e.getMessage());
×
158
        }
×
159
      }
160
    }
161
    System.exit(exitCode);
×
162
  }
×
163

164
  private static void legalCheck(String sql) {
165
    String sqlLower = sql.toLowerCase();
×
166
    if (sqlLower.contains("count(")
×
167
        || sqlLower.contains("sum(")
×
168
        || sqlLower.contains("avg(")
×
169
        || sqlLower.contains("extreme(")
×
170
        || sqlLower.contains("max_value(")
×
171
        || sqlLower.contains("min_value(")
×
172
        || sqlLower.contains("first_value(")
×
173
        || sqlLower.contains("last_value(")
×
174
        || sqlLower.contains("max_time(")
×
175
        || sqlLower.contains("min_time(")) {
×
176
      IoTPrinter.println("The sql you entered is invalid, please don't use aggregate query.");
×
177
      System.exit(CODE_ERROR);
×
178
    }
179
  }
×
180

181
  private static void parseSpecialParams(CommandLine commandLine) throws ArgsErrorException {
182
    targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, commandLine);
×
183
    queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
×
184
    targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
×
185
    String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
×
186
    if (timeoutString != null) {
×
187
      timeout = Long.parseLong(timeoutString);
×
188
    }
189
    if (targetFile == null) {
×
190
      targetFile = DUMP_FILE_NAME_DEFAULT;
×
191
    }
192

193
    if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
×
194
      targetDirectory += File.separator;
×
195
    }
196
  }
×
197

198
  /**
199
   * commandline option create.
200
   *
201
   * @return object Options
202
   */
203
  private static Options createOptions() {
204
    Options options = createNewOptions();
×
205

206
    Option opTargetFile =
×
207
        Option.builder(TARGET_DIR_ARGS)
×
208
            .required()
×
209
            .argName(TARGET_DIR_NAME)
×
210
            .hasArg()
×
211
            .desc("Target File Directory (required)")
×
212
            .build();
×
213
    options.addOption(opTargetFile);
×
214

215
    Option targetFileName =
×
216
        Option.builder(TARGET_FILE_ARGS)
×
217
            .argName(TARGET_FILE_NAME)
×
218
            .hasArg()
×
219
            .desc("Export file name (optional)")
×
220
            .build();
×
221
    options.addOption(targetFileName);
×
222

223
    Option opSqlFile =
×
224
        Option.builder(SQL_FILE_ARGS)
×
225
            .argName(SQL_FILE_NAME)
×
226
            .hasArg()
×
227
            .desc("SQL File Path (optional)")
×
228
            .build();
×
229
    options.addOption(opSqlFile);
×
230

231
    Option opQuery =
×
232
        Option.builder(QUERY_COMMAND_ARGS)
×
233
            .argName(QUERY_COMMAND_NAME)
×
234
            .hasArg()
×
235
            .desc("The query command that you want to execute. (optional)")
×
236
            .build();
×
237
    options.addOption(opQuery);
×
238

239
    Option opHelp =
×
240
        Option.builder(HELP_ARGS)
×
241
            .longOpt(HELP_ARGS)
×
242
            .hasArg(false)
×
243
            .desc("Display help information")
×
244
            .build();
×
245
    options.addOption(opHelp);
×
246

247
    Option opTimeout =
×
248
        Option.builder(TIMEOUT_ARGS)
×
249
            .longOpt(TIMEOUT_NAME)
×
250
            .hasArg()
×
251
            .desc("Timeout for session query")
×
252
            .build();
×
253
    options.addOption(opTimeout);
×
254
    return options;
×
255
  }
256

257
  /**
258
   * This method will be called, if the query commands are written in a sql file.
259
   *
260
   * @param filePath:file path
261
   * @throws IOException: exception
262
   */
263
  private static void dumpFromSqlFile(String filePath) throws IOException {
264
    try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
×
265
      String sql;
266
      int i = 0;
×
267
      while ((sql = reader.readLine()) != null) {
×
268
        legalCheck(sql);
×
269
        dumpResult(sql, i++);
×
270
      }
271
    }
272
  }
×
273

274
  /**
275
   * Dump files from database to tsFile.
276
   *
277
   * @param sql export the result of executing the sql
278
   */
279
  private static void dumpResult(String sql, int index) {
280
    final String path = targetDirectory + targetFile + index + ".tsfile";
×
281
    try {
282
      SessionDataSet sessionDataSet = session.executeQueryStatement(sql, timeout);
×
283
      long start = System.currentTimeMillis();
×
284
      writeTsFileFile(sessionDataSet, path);
×
285
      sessionDataSet.closeOperationHandle();
×
286
      long end = System.currentTimeMillis();
×
287
      IoTPrinter.println("Export completely!cost: " + (end - start) + " ms.");
×
288
    } catch (StatementExecutionException
×
289
        | IoTDBConnectionException
290
        | IOException
291
        | WriteProcessException e) {
292
      IoTPrinter.println("Cannot dump result because: " + e.getMessage());
×
293
    }
×
294
  }
×
295

296
  @SuppressWarnings({
297
    "squid:S3776",
298
    "squid:S6541"
299
  }) // Suppress high Cognitive Complexity warning, Suppress many task in one method warning
300
  public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePath)
301
      throws IOException, IoTDBConnectionException, StatementExecutionException,
302
          WriteProcessException {
303
    List<String> columnNames = sessionDataSet.getColumnNames();
×
304
    List<String> columnTypes = sessionDataSet.getColumnTypes();
×
305
    File f = FSFactoryProducer.getFSFactory().getFile(filePath);
×
306
    if (f.exists()) {
×
307
      Files.delete(f.toPath());
×
308
    }
309
    HashSet<String> deviceFilterSet = new HashSet<>();
×
310
    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
×
311
      Map<String, List<MeasurementSchema>> schemaMap = new LinkedHashMap<>();
×
312
      for (int i = 0; i < columnNames.size(); i++) {
×
313
        String column = columnNames.get(i);
×
314
        if (!column.startsWith("root.")) {
×
315
          continue;
×
316
        }
317
        TSDataType tsDataType = getTsDataType(columnTypes.get(i));
×
318
        Path path = new Path(column, true);
×
319
        String deviceId = path.getDevice();
×
320
        List<Field> deviceList =
×
321
            session.executeQueryStatement("show devices " + deviceId, timeout).next().getFields();
×
322
        if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) {
×
323
          deviceFilterSet.add(deviceId);
×
324
        }
325
        MeasurementSchema measurementSchema =
×
326
            new MeasurementSchema(path.getMeasurement(), tsDataType);
×
327

328
        List<Field> seriesList =
×
329
            session.executeQueryStatement("show timeseries " + column, timeout).next().getFields();
×
330

331
        measurementSchema.setEncoding(
×
332
            TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize());
×
333
        measurementSchema.setCompressor(
×
334
            CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize());
×
335
        schemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema);
×
336
      }
337
      List<Tablet> tabletList = new ArrayList<>();
×
338
      for (Map.Entry<String, List<MeasurementSchema>> stringListEntry : schemaMap.entrySet()) {
×
339
        String deviceId = stringListEntry.getKey();
×
340
        List<MeasurementSchema> schemaList = stringListEntry.getValue();
×
341
        Tablet tablet = new Tablet(deviceId, schemaList);
×
342
        tablet.initBitMaps();
×
343
        Path path = new Path(tablet.deviceId);
×
344
        if (deviceFilterSet.contains(tablet.deviceId)) {
×
345
          tsFileWriter.registerAlignedTimeseries(path, schemaList);
×
346
        } else {
347
          tsFileWriter.registerTimeseries(path, schemaList);
×
348
        }
349
        tabletList.add(tablet);
×
350
      }
×
351
      if (tabletList.isEmpty()) {
×
352
        IoTPrinter.println("!!!Warning:Tablet is empty,no data can be exported.");
×
353
        System.exit(CODE_ERROR);
×
354
      }
355
      while (sessionDataSet.hasNext()) {
×
356
        RowRecord rowRecord = sessionDataSet.next();
×
357
        List<Field> fields = rowRecord.getFields();
×
358
        int i = 0;
×
359
        while (i < fields.size()) {
×
360
          for (Tablet tablet : tabletList) {
×
361
            int rowIndex = tablet.rowSize++;
×
362
            tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
×
363
            List<MeasurementSchema> schemas = tablet.getSchemas();
×
364
            for (int j = 0; j < schemas.size(); j++) {
×
365
              MeasurementSchema measurementSchema = schemas.get(j);
×
366
              Object value = fields.get(i).getObjectValue(measurementSchema.getType());
×
367
              if (value == null) {
×
368
                tablet.bitMaps[j].mark(rowIndex);
×
369
              }
370
              tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
×
371
              i++;
×
372
            }
373
            if (tablet.rowSize == tablet.getMaxRowNumber()) {
×
374
              writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
×
375
              tablet.initBitMaps();
×
376
              tablet.reset();
×
377
            }
378
          }
×
379
        }
380
      }
×
381
      for (Tablet tablet : tabletList) {
×
382
        if (tablet.rowSize != 0) {
×
383
          writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
×
384
        }
385
      }
×
386
      tsFileWriter.flushAllChunkGroups();
×
387
    }
388
  }
×
389

390
  private static void writeToTsfile(
391
      HashSet<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
392
      throws IOException, WriteProcessException {
393
    if (deviceFilterSet.contains(tablet.deviceId)) {
×
394
      tsFileWriter.writeAligned(tablet);
×
395
    } else {
396
      tsFileWriter.write(tablet);
×
397
    }
398
  }
×
399

400
  private static TSDataType getTsDataType(String type) {
401
    switch (type) {
×
402
      case "INT64":
403
        return TSDataType.INT64;
×
404
      case "INT32":
405
        return TSDataType.INT32;
×
406
      case "FLOAT":
407
        return TSDataType.FLOAT;
×
408
      case "DOUBLE":
409
        return TSDataType.DOUBLE;
×
410
      case "TEXT":
411
        return TSDataType.TEXT;
×
412
      case "BOOLEAN":
413
        return TSDataType.BOOLEAN;
×
414
      default:
415
        throw new IllegalArgumentException("Invalid input: " + type);
×
416
    }
417
  }
418
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc