• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9634

pending completion
#9634

push

travis_ci

web-flow
Add session interface of faster last query in one device

190 of 190 new or added lines in 5 files covered. (100.0%)

79025 of 165424 relevant lines covered (47.77%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.87
/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.session;
21

22
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.isession.ISession;
25
import org.apache.iotdb.isession.SessionConfig;
26
import org.apache.iotdb.isession.SessionDataSet;
27
import org.apache.iotdb.isession.template.Template;
28
import org.apache.iotdb.isession.util.Version;
29
import org.apache.iotdb.rpc.BatchExecutionException;
30
import org.apache.iotdb.rpc.IoTDBConnectionException;
31
import org.apache.iotdb.rpc.NoValidValueException;
32
import org.apache.iotdb.rpc.RedirectException;
33
import org.apache.iotdb.rpc.StatementExecutionException;
34
import org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
35
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
36
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
37
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
38
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
39
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
40
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
41
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
42
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
43
import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq;
44
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
45
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
46
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
47
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
48
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
49
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
50
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
51
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
52
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
53
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
54
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
55
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
56
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
57
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
58
import org.apache.iotdb.session.template.MeasurementNode;
59
import org.apache.iotdb.session.template.TemplateQueryType;
60
import org.apache.iotdb.session.util.SessionUtils;
61
import org.apache.iotdb.session.util.ThreadUtils;
62
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
63
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
64
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
65
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
66
import org.apache.iotdb.tsfile.utils.Binary;
67
import org.apache.iotdb.tsfile.utils.BitMap;
68
import org.apache.iotdb.tsfile.utils.Pair;
69
import org.apache.iotdb.tsfile.write.record.Tablet;
70
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
71
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
72

73
import org.apache.thrift.TException;
74
import org.slf4j.Logger;
75
import org.slf4j.LoggerFactory;
76

77
import java.io.ByteArrayOutputStream;
78
import java.io.IOException;
79
import java.nio.ByteBuffer;
80
import java.time.ZoneId;
81
import java.util.ArrayList;
82
import java.util.Arrays;
83
import java.util.Collections;
84
import java.util.Comparator;
85
import java.util.HashMap;
86
import java.util.Iterator;
87
import java.util.List;
88
import java.util.Map;
89
import java.util.Map.Entry;
90
import java.util.concurrent.CompletableFuture;
91
import java.util.concurrent.CompletionException;
92
import java.util.concurrent.ConcurrentHashMap;
93
import java.util.concurrent.LinkedBlockingQueue;
94
import java.util.concurrent.ThreadPoolExecutor;
95
import java.util.concurrent.TimeUnit;
96
import java.util.concurrent.atomic.AtomicReference;
97
import java.util.stream.Collectors;
98

99
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
100
public class Session implements ISession {
101

102
  private static final Logger logger = LoggerFactory.getLogger(Session.class);
1✔
103
  protected static final TSProtocolVersion protocolVersion =
1✔
104
      TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
105
  public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:";
106
  public static final String MSG_DONOT_ENABLE_REDIRECT =
107
      "Query do not enable redirect," + " please confirm the session and server conf.";
108
  private static final ThreadPoolExecutor OPERATION_EXECUTOR =
1✔
109
      new ThreadPoolExecutor(
110
          SessionConfig.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
111
          SessionConfig.DEFAULT_SESSION_EXECUTOR_THREAD_NUM,
112
          0,
113
          TimeUnit.MILLISECONDS,
114
          new LinkedBlockingQueue<>(SessionConfig.DEFAULT_SESSION_EXECUTOR_TASK_NUM),
115
          ThreadUtils.createThreadFactory("SessionExecutor", true));
1✔
116
  protected List<String> nodeUrls;
117
  protected String username;
118
  protected String password;
119
  protected int fetchSize;
120
  /**
121
   * Timeout of query can be set by users. A negative number means using the default configuration
122
   * of server. And value 0 will disable the function of query timeout.
123
   */
124
  private long queryTimeoutInMs = -1;
1✔
125

126
  protected boolean enableRPCCompression;
127
  protected int connectionTimeoutInMs;
128
  protected ZoneId zoneId;
129

130
  protected int thriftDefaultBufferSize;
131
  protected int thriftMaxFrameSize;
132

133
  protected TEndPoint defaultEndPoint;
134
  protected SessionConnection defaultSessionConnection;
135
  private boolean isClosed = true;
1✔
136

137
  // Cluster version cache
138
  protected boolean enableRedirection;
139

140
  @SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile"
141
  protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
142

143
  @SuppressWarnings("squid:S3077") // Non-primitive fields should not be "volatile"
144
  protected volatile Map<TEndPoint, SessionConnection> endPointToSessionConnection;
145

146
  protected boolean enableQueryRedirection = false;
1✔
147

148
  // The version number of the client which used for compatibility in the server
149
  protected Version version;
150

151
  private static final String REDIRECT_TWICE = "redirect twice";
152

153
  private static final String REDIRECT_TWICE_RETRY = "redirect twice, please try again.";
154

155
  private static final String VALUES_SIZE_SHOULD_BE_EQUAL =
156
      "times, measurementsList and valuesList's size should be equal";
157

158
  private static final String SESSION_CANNOT_CONNECT = "Session can not connect to {}";
159

160
  private static final String ALL_VALUES_ARE_NULL =
161
      "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]";
162

163
  private static final String ALL_VALUES_ARE_NULL_WITH_TIME =
164
      "All values are null and this submission is ignored,deviceId is [{}],times are [{}],measurements are [{}]";
165
  private static final String ALL_VALUES_ARE_NULL_MULTI_DEVICES =
166
      "All values are null and this submission is ignored,deviceIds are [{}],times are [{}],measurements are [{}]";
167
  private static final String ALL_INSERT_DATA_IS_NULL = "All inserted data is null.";
168

169
  public Session(String host, int rpcPort) {
170
    this(
1✔
171
        host,
172
        rpcPort,
173
        SessionConfig.DEFAULT_USER,
174
        SessionConfig.DEFAULT_PASSWORD,
175
        SessionConfig.DEFAULT_FETCH_SIZE,
176
        null,
177
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
178
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
179
        SessionConfig.DEFAULT_REDIRECTION_MODE,
180
        SessionConfig.DEFAULT_VERSION);
181
  }
1✔
182

183
  public Session(String host, String rpcPort, String username, String password) {
184
    this(
×
185
        host,
186
        Integer.parseInt(rpcPort),
×
187
        username,
188
        password,
189
        SessionConfig.DEFAULT_FETCH_SIZE,
190
        null,
191
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
192
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
193
        SessionConfig.DEFAULT_REDIRECTION_MODE,
194
        SessionConfig.DEFAULT_VERSION);
195
  }
×
196

197
  public Session(String host, int rpcPort, String username, String password) {
198
    this(
×
199
        host,
200
        rpcPort,
201
        username,
202
        password,
203
        SessionConfig.DEFAULT_FETCH_SIZE,
204
        null,
205
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
206
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
207
        SessionConfig.DEFAULT_REDIRECTION_MODE,
208
        SessionConfig.DEFAULT_VERSION);
209
  }
×
210

211
  public Session(String host, int rpcPort, String username, String password, int fetchSize) {
212
    this(
×
213
        host,
214
        rpcPort,
215
        username,
216
        password,
217
        fetchSize,
218
        null,
219
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
220
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
221
        SessionConfig.DEFAULT_REDIRECTION_MODE,
222
        SessionConfig.DEFAULT_VERSION);
223
  }
×
224

225
  public Session(
226
      String host,
227
      int rpcPort,
228
      String username,
229
      String password,
230
      int fetchSize,
231
      long queryTimeoutInMs) {
232
    this(
×
233
        host,
234
        rpcPort,
235
        username,
236
        password,
237
        fetchSize,
238
        null,
239
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
240
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
241
        SessionConfig.DEFAULT_REDIRECTION_MODE,
242
        SessionConfig.DEFAULT_VERSION);
243
    this.queryTimeoutInMs = queryTimeoutInMs;
×
244
  }
×
245

246
  public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
247
    this(
×
248
        host,
249
        rpcPort,
250
        username,
251
        password,
252
        SessionConfig.DEFAULT_FETCH_SIZE,
253
        zoneId,
254
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
255
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
256
        SessionConfig.DEFAULT_REDIRECTION_MODE,
257
        SessionConfig.DEFAULT_VERSION);
258
  }
×
259

260
  public Session(
261
      String host, int rpcPort, String username, String password, boolean enableRedirection) {
262
    this(
×
263
        host,
264
        rpcPort,
265
        username,
266
        password,
267
        SessionConfig.DEFAULT_FETCH_SIZE,
268
        null,
269
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
270
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
271
        enableRedirection,
272
        SessionConfig.DEFAULT_VERSION);
273
  }
×
274

275
  public Session(
276
      String host,
277
      int rpcPort,
278
      String username,
279
      String password,
280
      int fetchSize,
281
      ZoneId zoneId,
282
      boolean enableRedirection) {
283
    this(
×
284
        host,
285
        rpcPort,
286
        username,
287
        password,
288
        fetchSize,
289
        zoneId,
290
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
291
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
292
        enableRedirection,
293
        SessionConfig.DEFAULT_VERSION);
294
  }
×
295

296
  @SuppressWarnings("squid:S107")
297
  public Session(
298
      String host,
299
      int rpcPort,
300
      String username,
301
      String password,
302
      int fetchSize,
303
      ZoneId zoneId,
304
      int thriftDefaultBufferSize,
305
      int thriftMaxFrameSize,
306
      boolean enableRedirection,
307
      Version version) {
1✔
308
    this.defaultEndPoint = new TEndPoint(host, rpcPort);
1✔
309
    this.username = username;
1✔
310
    this.password = password;
1✔
311
    this.fetchSize = fetchSize;
1✔
312
    this.zoneId = zoneId;
1✔
313
    this.thriftDefaultBufferSize = thriftDefaultBufferSize;
1✔
314
    this.thriftMaxFrameSize = thriftMaxFrameSize;
1✔
315
    this.enableRedirection = enableRedirection;
1✔
316
    this.version = version;
1✔
317
  }
1✔
318

319
  public Session(List<String> nodeUrls, String username, String password) {
320
    this(
×
321
        nodeUrls,
322
        username,
323
        password,
324
        SessionConfig.DEFAULT_FETCH_SIZE,
325
        null,
326
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
327
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
328
        SessionConfig.DEFAULT_REDIRECTION_MODE,
329
        SessionConfig.DEFAULT_VERSION);
330
  }
×
331

332
  /**
333
   * Multiple nodeUrl,If one node down, connect to the next one
334
   *
335
   * @param nodeUrls List<String> Multiple ip:rpcPort eg.127.0.0.1:9001
336
   */
337
  public Session(List<String> nodeUrls, String username, String password, int fetchSize) {
338
    this(
×
339
        nodeUrls,
340
        username,
341
        password,
342
        fetchSize,
343
        null,
344
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
345
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
346
        SessionConfig.DEFAULT_REDIRECTION_MODE,
347
        SessionConfig.DEFAULT_VERSION);
348
  }
×
349

350
  public Session(List<String> nodeUrls, String username, String password, ZoneId zoneId) {
351
    this(
×
352
        nodeUrls,
353
        username,
354
        password,
355
        SessionConfig.DEFAULT_FETCH_SIZE,
356
        zoneId,
357
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
358
        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
359
        SessionConfig.DEFAULT_REDIRECTION_MODE,
360
        SessionConfig.DEFAULT_VERSION);
361
  }
×
362

363
  public Session(
364
      List<String> nodeUrls,
365
      String username,
366
      String password,
367
      int fetchSize,
368
      ZoneId zoneId,
369
      int thriftDefaultBufferSize,
370
      int thriftMaxFrameSize,
371
      boolean enableRedirection,
372
      Version version) {
×
373
    this.nodeUrls = nodeUrls;
×
374
    this.username = username;
×
375
    this.password = password;
×
376
    this.fetchSize = fetchSize;
×
377
    this.zoneId = zoneId;
×
378
    this.thriftDefaultBufferSize = thriftDefaultBufferSize;
×
379
    this.thriftMaxFrameSize = thriftMaxFrameSize;
×
380
    this.enableRedirection = enableRedirection;
×
381
    this.version = version;
×
382
  }
×
383

384
  @Override
385
  public void setFetchSize(int fetchSize) {
386
    this.fetchSize = fetchSize;
×
387
  }
×
388

389
  @Override
390
  public int getFetchSize() {
391
    return this.fetchSize;
×
392
  }
393

394
  @Override
395
  public Version getVersion() {
396
    return version;
×
397
  }
398

399
  @Override
400
  public void setVersion(Version version) {
401
    this.version = version;
×
402
  }
×
403

404
  @Override
405
  public synchronized void open() throws IoTDBConnectionException {
406
    open(false, SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
1✔
407
  }
1✔
408

409
  @Override
410
  public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
411
    open(enableRPCCompression, SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
×
412
  }
×
413

414
  @Override
415
  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
416
      throws IoTDBConnectionException {
417
    if (!isClosed) {
1✔
418
      return;
×
419
    }
420

421
    this.enableRPCCompression = enableRPCCompression;
1✔
422
    this.connectionTimeoutInMs = connectionTimeoutInMs;
1✔
423
    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
1✔
424
    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
1✔
425
    isClosed = false;
1✔
426
    if (enableRedirection || enableQueryRedirection) {
1✔
427
      deviceIdToEndpoint = new ConcurrentHashMap<>();
1✔
428
      endPointToSessionConnection = new ConcurrentHashMap<>();
1✔
429
      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
1✔
430
    }
431
  }
1✔
432

433
  @Override
434
  public synchronized void open(
435
      boolean enableRPCCompression,
436
      int connectionTimeoutInMs,
437
      Map<String, TEndPoint> deviceIdToEndpoint)
438
      throws IoTDBConnectionException {
439
    if (!isClosed) {
×
440
      return;
×
441
    }
442

443
    this.enableRPCCompression = enableRPCCompression;
×
444
    this.connectionTimeoutInMs = connectionTimeoutInMs;
×
445
    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
×
446
    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
×
447
    isClosed = false;
×
448
    if (enableRedirection || enableQueryRedirection) {
×
449
      this.deviceIdToEndpoint = deviceIdToEndpoint;
×
450
      endPointToSessionConnection = new ConcurrentHashMap<>();
×
451
      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
×
452
    }
453
  }
×
454

455
  @Override
456
  public synchronized void close() throws IoTDBConnectionException {
457
    if (isClosed) {
1✔
458
      return;
×
459
    }
460
    try {
461
      if (enableRedirection) {
1✔
462
        for (SessionConnection sessionConnection : endPointToSessionConnection.values()) {
1✔
463
          sessionConnection.close();
1✔
464
        }
1✔
465
      } else {
466
        defaultSessionConnection.close();
1✔
467
      }
468
    } finally {
469
      isClosed = true;
1✔
470
    }
471
  }
1✔
472

473
  public SessionConnection constructSessionConnection(
474
      Session session, TEndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException {
475
    if (endpoint == null) {
×
476
      return new SessionConnection(session, zoneId);
×
477
    }
478
    return new SessionConnection(session, endpoint, zoneId);
×
479
  }
480

481
  @Override
482
  public synchronized String getTimeZone() {
483
    return defaultSessionConnection.getTimeZone();
×
484
  }
485

486
  @Override
487
  public synchronized void setTimeZone(String zoneId)
488
      throws StatementExecutionException, IoTDBConnectionException {
489
    defaultSessionConnection.setTimeZone(zoneId);
×
490
    this.zoneId = ZoneId.of(zoneId);
×
491
  }
×
492

493
  /** Only changes the member variable of the Session object without sending it to server. */
494
  @Override
495
  public void setTimeZoneOfSession(String zoneId) {
496
    defaultSessionConnection.setTimeZoneOfSession(zoneId);
×
497
    this.zoneId = ZoneId.of(zoneId);
×
498
  }
×
499

500
  @Override
501
  public void setStorageGroup(String storageGroup)
502
      throws IoTDBConnectionException, StatementExecutionException {
503
    defaultSessionConnection.setStorageGroup(storageGroup);
×
504
  }
×
505

506
  @Override
507
  public void deleteStorageGroup(String storageGroup)
508
      throws IoTDBConnectionException, StatementExecutionException {
509
    defaultSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
×
510
  }
×
511

512
  @Override
513
  public void deleteStorageGroups(List<String> storageGroups)
514
      throws IoTDBConnectionException, StatementExecutionException {
515
    defaultSessionConnection.deleteStorageGroups(storageGroups);
×
516
  }
×
517

518
  @Override
519
  public void createDatabase(String database)
520
      throws IoTDBConnectionException, StatementExecutionException {
521
    defaultSessionConnection.setStorageGroup(database);
×
522
  }
×
523

524
  @Override
525
  public void deleteDatabase(String database)
526
      throws IoTDBConnectionException, StatementExecutionException {
527
    defaultSessionConnection.deleteStorageGroups(Collections.singletonList(database));
×
528
  }
×
529

530
  @Override
531
  public void deleteDatabases(List<String> databases)
532
      throws IoTDBConnectionException, StatementExecutionException {
533
    defaultSessionConnection.deleteStorageGroups(databases);
×
534
  }
×
535

536
  @Override
537
  public void createTimeseries(
538
      String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
539
      throws IoTDBConnectionException, StatementExecutionException {
540
    TSCreateTimeseriesReq request =
×
541
        genTSCreateTimeseriesReq(path, dataType, encoding, compressor, null, null, null, null);
×
542
    defaultSessionConnection.createTimeseries(request);
×
543
  }
×
544

545
  @Override
546
  public void createTimeseries(
547
      String path,
548
      TSDataType dataType,
549
      TSEncoding encoding,
550
      CompressionType compressor,
551
      Map<String, String> props,
552
      Map<String, String> tags,
553
      Map<String, String> attributes,
554
      String measurementAlias)
555
      throws IoTDBConnectionException, StatementExecutionException {
556
    TSCreateTimeseriesReq request =
×
557
        genTSCreateTimeseriesReq(
×
558
            path, dataType, encoding, compressor, props, tags, attributes, measurementAlias);
559
    defaultSessionConnection.createTimeseries(request);
×
560
  }
×
561

562
  private TSCreateTimeseriesReq genTSCreateTimeseriesReq(
563
      String path,
564
      TSDataType dataType,
565
      TSEncoding encoding,
566
      CompressionType compressor,
567
      Map<String, String> props,
568
      Map<String, String> tags,
569
      Map<String, String> attributes,
570
      String measurementAlias) {
571
    TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
×
572
    request.setPath(path);
×
573
    request.setDataType(dataType.ordinal());
×
574
    request.setEncoding(encoding.ordinal());
×
575
    request.setCompressor(compressor.serialize());
×
576
    request.setProps(props);
×
577
    request.setTags(tags);
×
578
    request.setAttributes(attributes);
×
579
    request.setMeasurementAlias(measurementAlias);
×
580
    return request;
×
581
  }
582

583
  @Override
584
  public void createAlignedTimeseries(
585
      String deviceId,
586
      List<String> measurements,
587
      List<TSDataType> dataTypes,
588
      List<TSEncoding> encodings,
589
      List<CompressionType> compressors,
590
      List<String> measurementAliasList)
591
      throws IoTDBConnectionException, StatementExecutionException {
592
    TSCreateAlignedTimeseriesReq request =
×
593
        getTSCreateAlignedTimeseriesReq(
×
594
            deviceId,
595
            measurements,
596
            dataTypes,
597
            encodings,
598
            compressors,
599
            measurementAliasList,
600
            null,
601
            null);
602
    defaultSessionConnection.createAlignedTimeseries(request);
×
603
  }
×
604

605
  @Override
606
  public void createAlignedTimeseries(
607
      String deviceId,
608
      List<String> measurements,
609
      List<TSDataType> dataTypes,
610
      List<TSEncoding> encodings,
611
      List<CompressionType> compressors,
612
      List<String> measurementAliasList,
613
      List<Map<String, String>> tagsList,
614
      List<Map<String, String>> attributesList)
615
      throws IoTDBConnectionException, StatementExecutionException {
616
    TSCreateAlignedTimeseriesReq request =
×
617
        getTSCreateAlignedTimeseriesReq(
×
618
            deviceId,
619
            measurements,
620
            dataTypes,
621
            encodings,
622
            compressors,
623
            measurementAliasList,
624
            tagsList,
625
            attributesList);
626
    defaultSessionConnection.createAlignedTimeseries(request);
×
627
  }
×
628

629
  private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq(
630
      String prefixPath,
631
      List<String> measurements,
632
      List<TSDataType> dataTypes,
633
      List<TSEncoding> encodings,
634
      List<CompressionType> compressors,
635
      List<String> measurementAliasList,
636
      List<Map<String, String>> tagsList,
637
      List<Map<String, String>> attributesList) {
638
    TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq();
×
639
    request.setPrefixPath(prefixPath);
×
640
    request.setMeasurements(measurements);
×
641
    request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
×
642
    request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
×
643
    request.setCompressors(
×
644
        compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
×
645
    request.setMeasurementAlias(measurementAliasList);
×
646
    request.setTagsList(tagsList);
×
647
    request.setAttributesList(attributesList);
×
648
    return request;
×
649
  }
650

651
  @Override
652
  public void createMultiTimeseries(
653
      List<String> paths,
654
      List<TSDataType> dataTypes,
655
      List<TSEncoding> encodings,
656
      List<CompressionType> compressors,
657
      List<Map<String, String>> propsList,
658
      List<Map<String, String>> tagsList,
659
      List<Map<String, String>> attributesList,
660
      List<String> measurementAliasList)
661
      throws IoTDBConnectionException, StatementExecutionException {
662
    TSCreateMultiTimeseriesReq request =
×
663
        genTSCreateMultiTimeseriesReq(
×
664
            paths,
665
            dataTypes,
666
            encodings,
667
            compressors,
668
            propsList,
669
            tagsList,
670
            attributesList,
671
            measurementAliasList);
672
    defaultSessionConnection.createMultiTimeseries(request);
×
673
  }
×
674

675
  private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(
676
      List<String> paths,
677
      List<TSDataType> dataTypes,
678
      List<TSEncoding> encodings,
679
      List<CompressionType> compressors,
680
      List<Map<String, String>> propsList,
681
      List<Map<String, String>> tagsList,
682
      List<Map<String, String>> attributesList,
683
      List<String> measurementAliasList) {
684
    TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
×
685

686
    request.setPaths(paths);
×
687

688
    List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
×
689
    for (TSDataType dataType : dataTypes) {
×
690
      dataTypeOrdinals.add(dataType.ordinal());
×
691
    }
×
692
    request.setDataTypes(dataTypeOrdinals);
×
693

694
    List<Integer> encodingOrdinals = new ArrayList<>(dataTypes.size());
×
695
    for (TSEncoding encoding : encodings) {
×
696
      encodingOrdinals.add(encoding.ordinal());
×
697
    }
×
698
    request.setEncodings(encodingOrdinals);
×
699

700
    List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
×
701
    for (CompressionType compression : compressors) {
×
702
      compressionOrdinals.add((int) compression.serialize());
×
703
    }
×
704
    request.setCompressors(compressionOrdinals);
×
705

706
    request.setPropsList(propsList);
×
707
    request.setTagsList(tagsList);
×
708
    request.setAttributesList(attributesList);
×
709
    request.setMeasurementAliasList(measurementAliasList);
×
710

711
    return request;
×
712
  }
713

714
  @Override
715
  public boolean checkTimeseriesExists(String path)
716
      throws IoTDBConnectionException, StatementExecutionException {
717
    return defaultSessionConnection.checkTimeseriesExists(path, queryTimeoutInMs);
×
718
  }
719

720
  @Override
721
  public void setQueryTimeout(long timeoutInMs) {
722
    this.queryTimeoutInMs = timeoutInMs;
×
723
  }
×
724

725
  @Override
726
  public long getQueryTimeout() {
727
    return queryTimeoutInMs;
×
728
  }
729

730
  /**
731
   * execute query sql
732
   *
733
   * @param sql query statement
734
   * @return result set
735
   */
736
  @Override
737
  public SessionDataSet executeQueryStatement(String sql)
738
      throws StatementExecutionException, IoTDBConnectionException {
739
    return executeStatementMayRedirect(sql, queryTimeoutInMs);
×
740
  }
741

742
  /**
743
   * execute query sql with explicit timeout
744
   *
745
   * @param sql query statement
746
   * @param timeoutInMs the timeout of this query, in milliseconds
747
   * @return result set
748
   */
749
  @Override
750
  public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
751
      throws StatementExecutionException, IoTDBConnectionException {
752
    return executeStatementMayRedirect(sql, timeoutInMs);
×
753
  }
754

755
  /**
756
   * execute the query, may redirect query to other node.
757
   *
758
   * @param sql the query statement
759
   * @param timeoutInMs time in ms
760
   * @return data set
761
   * @throws StatementExecutionException statement is not right
762
   * @throws IoTDBConnectionException the network is not good
763
   */
764
  private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
765
      throws StatementExecutionException, IoTDBConnectionException {
766
    try {
767
      return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
×
768
    } catch (RedirectException e) {
×
769
      handleQueryRedirection(e.getEndPoint());
×
770
      if (enableQueryRedirection) {
×
771
        // retry
772
        try {
773
          return defaultSessionConnection.executeQueryStatement(sql, queryTimeoutInMs);
×
774
        } catch (RedirectException redirectException) {
×
775
          logger.error("{} redirect twice", sql, redirectException);
×
776
          throw new StatementExecutionException(sql + " redirect twice, please try again.");
×
777
        }
778
      } else {
779
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
780
      }
781
    }
782
  }
783

784
  /**
785
   * execute non query statement
786
   *
787
   * @param sql non query statement
788
   */
789
  @Override
790
  public void executeNonQueryStatement(String sql)
791
      throws IoTDBConnectionException, StatementExecutionException {
792
    defaultSessionConnection.executeNonQueryStatement(sql);
×
793
  }
×
794

795
  /**
796
   * query eg. select * from paths where time >= startTime and time < endTime time interval include
797
   * startTime and exclude endTime
798
   *
799
   * @param paths series path
800
   * @param startTime included
801
   * @param endTime excluded
802
   * @return data set
803
   * @throws StatementExecutionException statement is not right
804
   * @throws IoTDBConnectionException the network is not good
805
   */
806
  @Override
807
  public SessionDataSet executeRawDataQuery(
808
      List<String> paths, long startTime, long endTime, long timeOut)
809
      throws StatementExecutionException, IoTDBConnectionException {
810
    try {
811
      return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
×
812
    } catch (RedirectException e) {
×
813
      handleQueryRedirection(e.getEndPoint());
×
814
      if (enableQueryRedirection) {
×
815
        // retry
816
        try {
817
          return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
×
818
        } catch (RedirectException redirectException) {
×
819
          logger.error(REDIRECT_TWICE, redirectException);
×
820
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
821
        }
822
      } else {
823
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
824
      }
825
    }
826
  }
827

828
  @Override
829
  public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime)
830
      throws StatementExecutionException, IoTDBConnectionException {
831
    return executeRawDataQuery(paths, startTime, endTime, queryTimeoutInMs);
×
832
  }
833

834
  @Override
835
  public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime)
836
      throws StatementExecutionException, IoTDBConnectionException {
837
    return executeLastDataQuery(paths, lastTime, queryTimeoutInMs);
×
838
  }
839

840
  /**
841
   * query e.g. select last data from paths where time >= lastTime
842
   *
843
   * @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
844
   * @param lastTime get the last data, whose timestamp is greater than or equal lastTime e.g.
845
   *     1621326244168
846
   */
847
  @Override
848
  public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime, long timeOut)
849
      throws StatementExecutionException, IoTDBConnectionException {
850
    try {
851
      return defaultSessionConnection.executeLastDataQuery(paths, lastTime, timeOut);
×
852
    } catch (RedirectException e) {
×
853
      handleQueryRedirection(e.getEndPoint());
×
854
      if (enableQueryRedirection) {
×
855
        // retry
856
        try {
857
          return defaultSessionConnection.executeLastDataQuery(paths, lastTime, timeOut);
×
858
        } catch (RedirectException redirectException) {
×
859
          logger.error(REDIRECT_TWICE, redirectException);
×
860
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
861
        }
862
      } else {
863
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
864
      }
865
    }
866
  }
867

868
  /**
869
   * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + <suffixPath> = <TimeSeries>
870
   *
871
   * @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
872
   */
873
  @Override
874
  public SessionDataSet executeLastDataQuery(List<String> paths)
875
      throws StatementExecutionException, IoTDBConnectionException {
876
    long time = 0L;
×
877
    return executeLastDataQuery(paths, time, queryTimeoutInMs);
×
878
  }
879

880
  @Override
881
  public SessionDataSet executeLastDataQueryForOneDevice(
882
      String db, String device, List<String> sensors, boolean isLegalPathNodes)
883
      throws StatementExecutionException, IoTDBConnectionException {
884
    Pair<SessionDataSet, TEndPoint> pair;
885
    try {
886
      pair =
×
887
          getSessionConnection(device)
×
888
              .executeLastDataQueryForOneDevice(
×
889
                  db, device, sensors, isLegalPathNodes, queryTimeoutInMs);
890
      if (pair.right != null) {
×
891
        handleRedirection(device, pair.right);
×
892
      }
893
      return pair.left;
×
894
    } catch (IoTDBConnectionException e) {
×
895
      if (enableRedirection
×
896
          && !deviceIdToEndpoint.isEmpty()
×
897
          && deviceIdToEndpoint.get(device) != null) {
×
898
        logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(device));
×
899
        deviceIdToEndpoint.remove(device);
×
900

901
        // reconnect with default connection
902
        return defaultSessionConnection.executeLastDataQueryForOneDevice(
×
903
                db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
904
            .left;
905
      } else {
906
        throw e;
×
907
      }
908
    }
909
  }
910

911
  @Override
912
  public SessionDataSet executeAggregationQuery(
913
      List<String> paths, List<TAggregationType> aggregations)
914
      throws StatementExecutionException, IoTDBConnectionException {
915
    try {
916
      return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
×
917
    } catch (RedirectException e) {
×
918
      handleQueryRedirection(e.getEndPoint());
×
919
      if (enableQueryRedirection) {
×
920
        // retry
921
        try {
922
          return defaultSessionConnection.executeAggregationQuery(paths, aggregations);
×
923
        } catch (RedirectException redirectException) {
×
924
          logger.error(REDIRECT_TWICE, redirectException);
×
925
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
926
        }
927
      } else {
928
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
929
      }
930
    }
931
  }
932

933
  @Override
934
  public SessionDataSet executeAggregationQuery(
935
      List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime)
936
      throws StatementExecutionException, IoTDBConnectionException {
937
    try {
938
      return defaultSessionConnection.executeAggregationQuery(
×
939
          paths, aggregations, startTime, endTime);
940
    } catch (RedirectException e) {
×
941
      handleQueryRedirection(e.getEndPoint());
×
942
      if (enableQueryRedirection) {
×
943
        // retry
944
        try {
945
          return defaultSessionConnection.executeAggregationQuery(
×
946
              paths, aggregations, startTime, endTime);
947
        } catch (RedirectException redirectException) {
×
948
          logger.error(REDIRECT_TWICE, redirectException);
×
949
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
950
        }
951
      } else {
952
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
953
      }
954
    }
955
  }
956

957
  @Override
958
  public SessionDataSet executeAggregationQuery(
959
      List<String> paths,
960
      List<TAggregationType> aggregations,
961
      long startTime,
962
      long endTime,
963
      long interval)
964
      throws StatementExecutionException, IoTDBConnectionException {
965
    try {
966
      return defaultSessionConnection.executeAggregationQuery(
×
967
          paths, aggregations, startTime, endTime, interval);
968
    } catch (RedirectException e) {
×
969
      handleQueryRedirection(e.getEndPoint());
×
970
      if (enableQueryRedirection) {
×
971
        // retry
972
        try {
973
          return defaultSessionConnection.executeAggregationQuery(
×
974
              paths, aggregations, startTime, endTime, interval);
975
        } catch (RedirectException redirectException) {
×
976
          logger.error(REDIRECT_TWICE, redirectException);
×
977
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
978
        }
979
      } else {
980
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
981
      }
982
    }
983
  }
984

985
  @Override
986
  public SessionDataSet executeAggregationQuery(
987
      List<String> paths,
988
      List<TAggregationType> aggregations,
989
      long startTime,
990
      long endTime,
991
      long interval,
992
      long slidingStep)
993
      throws StatementExecutionException, IoTDBConnectionException {
994
    try {
995
      return defaultSessionConnection.executeAggregationQuery(
×
996
          paths, aggregations, startTime, endTime, interval, slidingStep);
997
    } catch (RedirectException e) {
×
998
      handleQueryRedirection(e.getEndPoint());
×
999
      if (enableQueryRedirection) {
×
1000
        // retry
1001
        try {
1002
          return defaultSessionConnection.executeAggregationQuery(
×
1003
              paths, aggregations, startTime, endTime, interval, slidingStep);
1004
        } catch (RedirectException redirectException) {
×
1005
          logger.error(REDIRECT_TWICE, redirectException);
×
1006
          throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
×
1007
        }
1008
      } else {
1009
        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
×
1010
      }
1011
    }
1012
  }
1013

1014
  /**
1015
   * insert data in one row, if you want to improve your performance, please use insertRecords
1016
   * method or insertTablet method
1017
   *
1018
   * @see Session#insertRecords(List, List, List, List, List)
1019
   * @see Session#insertTablet(Tablet)
1020
   */
1021
  @Override
1022
  public void insertRecord(
1023
      String deviceId,
1024
      long time,
1025
      List<String> measurements,
1026
      List<TSDataType> types,
1027
      Object... values)
1028
      throws IoTDBConnectionException, StatementExecutionException {
1029
    TSInsertRecordReq request;
1030
    try {
1031
      request =
×
1032
          filterAndGenTSInsertRecordReq(
×
1033
              deviceId, time, measurements, types, Arrays.asList(values), false);
×
1034
    } catch (NoValidValueException e) {
×
1035
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
1036
      return;
×
1037
    }
×
1038

1039
    insertRecord(deviceId, request);
×
1040
  }
×
1041

1042
  private void insertRecord(String prefixPath, TSInsertRecordReq request)
1043
      throws IoTDBConnectionException, StatementExecutionException {
1044
    try {
1045
      getSessionConnection(prefixPath).insertRecord(request);
×
1046
    } catch (RedirectException e) {
1✔
1047
      handleRedirection(prefixPath, e.getEndPoint());
1✔
1048
    } catch (IoTDBConnectionException e) {
×
1049
      if (enableRedirection
×
1050
          && !deviceIdToEndpoint.isEmpty()
×
1051
          && deviceIdToEndpoint.get(prefixPath) != null) {
×
1052
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(prefixPath));
×
1053
        deviceIdToEndpoint.remove(prefixPath);
×
1054

1055
        // reconnect with default connection
1056
        try {
1057
          defaultSessionConnection.insertRecord(request);
×
1058
        } catch (RedirectException ignored) {
×
1059
          logger.warn("session insertRecord fail:{}", ignored.getMessage());
×
1060
        }
×
1061
      } else {
1062
        throw e;
×
1063
      }
1064
    }
1✔
1065
  }
1✔
1066

1067
  private void insertRecord(String deviceId, TSInsertStringRecordReq request)
1068
      throws IoTDBConnectionException, StatementExecutionException {
1069
    try {
1070
      getSessionConnection(deviceId).insertRecord(request);
×
1071
    } catch (RedirectException e) {
1✔
1072
      handleRedirection(deviceId, e.getEndPoint());
1✔
1073
    } catch (IoTDBConnectionException e) {
×
1074
      if (enableRedirection
×
1075
          && !deviceIdToEndpoint.isEmpty()
×
1076
          && deviceIdToEndpoint.get(deviceId) != null) {
×
1077
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
×
1078
        deviceIdToEndpoint.remove(deviceId);
×
1079

1080
        // reconnect with default connection
1081
        try {
1082
          defaultSessionConnection.insertRecord(request);
×
1083
        } catch (RedirectException ignored) {
×
1084
          logger.warn("session insertRecord fail:{}", ignored.getMessage());
×
1085
        }
×
1086
      } else {
1087
        throw e;
×
1088
      }
1089
    }
1✔
1090
  }
1✔
1091

1092
  private SessionConnection getSessionConnection(String deviceId) {
1093
    TEndPoint endPoint;
1094
    if (enableRedirection
1✔
1095
        && !deviceIdToEndpoint.isEmpty()
1✔
1096
        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
1✔
1097
        && endPointToSessionConnection.containsKey(endPoint)) {
1✔
1098
      return endPointToSessionConnection.get(endPoint);
1✔
1099
    } else {
1100
      return defaultSessionConnection;
1✔
1101
    }
1102
  }
1103

1104
  @Override
1105
  public String getTimestampPrecision() throws TException {
1106
    return defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
×
1107
  }
1108

1109
  // TODO https://issues.apache.org/jira/browse/IOTDB-1399
1110
  private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
1111
    // remove the cached broken leader session
1112
    if (enableRedirection) {
1✔
1113
      TEndPoint endPoint = null;
1✔
1114
      for (Iterator<Entry<TEndPoint, SessionConnection>> it =
1✔
1115
              endPointToSessionConnection.entrySet().iterator();
1✔
1116
          it.hasNext(); ) {
1✔
1117
        Entry<TEndPoint, SessionConnection> entry = it.next();
1✔
1118
        if (entry.getValue().equals(sessionConnection)) {
1✔
1119
          endPoint = entry.getKey();
1✔
1120
          it.remove();
1✔
1121
          break;
1✔
1122
        }
1123
      }
1✔
1124

1125
      for (Iterator<Entry<String, TEndPoint>> it = deviceIdToEndpoint.entrySet().iterator();
1✔
1126
          it.hasNext(); ) {
1✔
1127
        Entry<String, TEndPoint> entry = it.next();
1✔
1128
        if (entry.getValue().equals(endPoint)) {
1✔
1129
          it.remove();
1✔
1130
        }
1131
      }
1✔
1132
    }
1133
  }
1✔
1134

1135
  private void handleRedirection(String deviceId, TEndPoint endpoint) {
1136
    if (enableRedirection) {
1✔
1137
      // no need to redirection
1138
      if (endpoint.ip.equals("0.0.0.0")) {
1✔
1139
        return;
×
1140
      }
1141
      AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
1✔
1142
      if (!deviceIdToEndpoint.containsKey(deviceId)
1✔
1143
          || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
1✔
1144
        deviceIdToEndpoint.put(deviceId, endpoint);
1✔
1145
      }
1146
      SessionConnection connection =
1✔
1147
          endPointToSessionConnection.computeIfAbsent(
1✔
1148
              endpoint,
1149
              k -> {
1150
                try {
1151
                  return constructSessionConnection(this, endpoint, zoneId);
1✔
1152
                } catch (IoTDBConnectionException ex) {
×
1153
                  exceptionReference.set(ex);
×
1154
                  return null;
×
1155
                }
1156
              });
1157
      if (connection == null) {
1✔
1158
        deviceIdToEndpoint.remove(deviceId);
×
1159
        logger.warn("Can not redirect to {}, because session can not connect to it.", endpoint);
×
1160
      }
1161
    }
1162
  }
1✔
1163

1164
  private void handleQueryRedirection(TEndPoint endPoint) throws IoTDBConnectionException {
1165
    if (enableQueryRedirection) {
×
1166
      AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
×
1167
      SessionConnection connection =
×
1168
          endPointToSessionConnection.computeIfAbsent(
×
1169
              endPoint,
1170
              k -> {
1171
                try {
1172
                  SessionConnection sessionConnection =
×
1173
                      constructSessionConnection(this, endPoint, zoneId);
×
1174
                  sessionConnection.setEnableRedirect(enableQueryRedirection);
×
1175
                  return sessionConnection;
×
1176
                } catch (IoTDBConnectionException ex) {
×
1177
                  exceptionReference.set(ex);
×
1178
                  return null;
×
1179
                }
1180
              });
1181
      if (connection == null) {
×
1182
        throw new IoTDBConnectionException(exceptionReference.get());
×
1183
      }
1184
      defaultSessionConnection = connection;
×
1185
    }
1186
  }
×
1187

1188
  /**
1189
   * insert data in one row, if you want improve your performance, please use insertRecords method
1190
   * or insertTablet method
1191
   *
1192
   * @see Session#insertRecords(List, List, List, List, List)
1193
   * @see Session#insertTablet(Tablet)
1194
   */
1195
  @Override
1196
  public void insertRecord(
1197
      String deviceId,
1198
      long time,
1199
      List<String> measurements,
1200
      List<TSDataType> types,
1201
      List<Object> values)
1202
      throws IoTDBConnectionException, StatementExecutionException {
1203
    // not vector by default
1204
    TSInsertRecordReq request;
1205
    try {
1206
      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
1✔
1207
    } catch (NoValidValueException e) {
×
1208
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
1209
      return;
×
1210
    }
1✔
1211

1212
    insertRecord(deviceId, request);
1✔
1213
  }
1✔
1214

1215
  /**
1216
   * insert aligned data in one row, if you want improve your performance, please use
1217
   * insertAlignedRecords method or insertTablet method.
1218
   *
1219
   * @see Session#insertAlignedRecords(List, List, List, List, List)
1220
   * @see Session#insertTablet(Tablet)
1221
   */
1222
  @Override
1223
  public void insertAlignedRecord(
1224
      String deviceId,
1225
      long time,
1226
      List<String> measurements,
1227
      List<TSDataType> types,
1228
      List<Object> values)
1229
      throws IoTDBConnectionException, StatementExecutionException {
1230
    TSInsertRecordReq request;
1231
    try {
1232
      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, true);
×
1233
    } catch (NoValidValueException e) {
×
1234
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
1235
      return;
×
1236
    }
×
1237
    insertRecord(deviceId, request);
×
1238
  }
×
1239

1240
  private TSInsertRecordReq filterAndGenTSInsertRecordReq(
1241
      String prefixPath,
1242
      long time,
1243
      List<String> measurements,
1244
      List<TSDataType> types,
1245
      List<Object> values,
1246
      boolean isAligned)
1247
      throws IoTDBConnectionException {
1248
    if (hasNull(values)) {
1✔
1249
      measurements = new ArrayList<>(measurements);
×
1250
      values = new ArrayList<>(values);
×
1251
      types = new ArrayList<>(types);
×
1252
      boolean isAllValuesNull =
×
1253
          filterNullValueAndMeasurement(prefixPath, measurements, types, values);
×
1254
      if (isAllValuesNull) {
×
1255
        throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1256
      }
1257
    }
1258
    return genTSInsertRecordReq(prefixPath, time, measurements, types, values, isAligned);
1✔
1259
  }
1260

1261
  private TSInsertRecordReq genTSInsertRecordReq(
1262
      String prefixPath,
1263
      long time,
1264
      List<String> measurements,
1265
      List<TSDataType> types,
1266
      List<Object> values,
1267
      boolean isAligned)
1268
      throws IoTDBConnectionException {
1269
    TSInsertRecordReq request = new TSInsertRecordReq();
1✔
1270
    request.setPrefixPath(prefixPath);
1✔
1271
    request.setTimestamp(time);
1✔
1272
    request.setMeasurements(measurements);
1✔
1273
    ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
1✔
1274
    request.setValues(buffer);
1✔
1275
    request.setIsAligned(isAligned);
1✔
1276
    return request;
1✔
1277
  }
1278

1279
  /**
1280
   * insert data in one row, if you want improve your performance, please use insertRecords method
1281
   * or insertTablet method
1282
   *
1283
   * @see Session#insertRecords(List, List, List, List, List)
1284
   * @see Session#insertTablet(Tablet)
1285
   */
1286
  @Override
1287
  public void insertRecord(
1288
      String deviceId, long time, List<String> measurements, List<String> values)
1289
      throws IoTDBConnectionException, StatementExecutionException {
1290
    TSInsertStringRecordReq request;
1291
    try {
1292
      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
1✔
1293
    } catch (NoValidValueException e) {
×
1294
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
1295
      return;
×
1296
    }
1✔
1297
    insertRecord(deviceId, request);
1✔
1298
  }
1✔
1299

1300
  /**
1301
   * insert aligned data in one row, if you want improve your performance, please use
1302
   * insertAlignedRecords method or insertTablet method.
1303
   *
1304
   * @see Session#insertAlignedRecords(List, List, List, List, List)
1305
   * @see Session#insertTablet(Tablet)
1306
   */
1307
  @Override
1308
  public void insertAlignedRecord(
1309
      String deviceId, long time, List<String> measurements, List<String> values)
1310
      throws IoTDBConnectionException, StatementExecutionException {
1311
    TSInsertStringRecordReq request;
1312
    try {
1313
      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, true);
×
1314
    } catch (NoValidValueException e) {
×
1315
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
1316
      return;
×
1317
    }
×
1318
    insertRecord(deviceId, request);
×
1319
  }
×
1320

1321
  private TSInsertStringRecordReq filterAndGenTSInsertStringRecordReq(
1322
      String prefixPath,
1323
      long time,
1324
      List<String> measurements,
1325
      List<String> values,
1326
      boolean isAligned) {
1327
    if (hasNull(values)) {
1✔
1328
      measurements = new ArrayList<>(measurements);
×
1329
      values = new ArrayList<>(values);
×
1330
      boolean isAllValueNull =
×
1331
          filterNullValueAndMeasurementWithStringType(values, prefixPath, measurements);
×
1332
      if (isAllValueNull) {
×
1333
        throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1334
      }
1335
    }
1336
    return genTSInsertStringRecordReq(prefixPath, time, measurements, values, isAligned);
1✔
1337
  }
1338

1339
  private TSInsertStringRecordReq genTSInsertStringRecordReq(
1340
      String prefixPath,
1341
      long time,
1342
      List<String> measurements,
1343
      List<String> values,
1344
      boolean isAligned) {
1345
    TSInsertStringRecordReq request = new TSInsertStringRecordReq();
1✔
1346
    request.setPrefixPath(prefixPath);
1✔
1347
    request.setTimestamp(time);
1✔
1348
    request.setMeasurements(measurements);
1✔
1349
    request.setValues(values);
1✔
1350
    request.setIsAligned(isAligned);
1✔
1351
    return request;
1✔
1352
  }
1353

1354
  /**
1355
   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
1356
   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
1357
   * your performance, please see insertTablet method
1358
   *
1359
   * <p>Each row is independent, which could have different deviceId, time, number of measurements
1360
   *
1361
   * @see Session#insertTablet(Tablet)
1362
   */
1363
  @Override
1364
  public void insertRecords(
1365
      List<String> deviceIds,
1366
      List<Long> times,
1367
      List<List<String>> measurementsList,
1368
      List<List<String>> valuesList)
1369
      throws IoTDBConnectionException, StatementExecutionException {
1370
    int len = deviceIds.size();
1✔
1371
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
1✔
1372
      throw new IllegalArgumentException(
×
1373
          "deviceIds, times, measurementsList and valuesList's size should be equal");
1374
    }
1375
    if (enableRedirection) {
1✔
1376
      insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
1✔
1377
    } else {
1378
      TSInsertStringRecordsReq request;
1379
      try {
1380
        request =
1✔
1381
            filterAndGenTSInsertStringRecordsReq(
1✔
1382
                deviceIds, times, measurementsList, valuesList, false);
1383
      } catch (NoValidValueException e) {
×
1384
        logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
1385
        return;
×
1386
      }
1✔
1387
      try {
1388
        defaultSessionConnection.insertRecords(request);
×
1389
      } catch (RedirectException ignored) {
1✔
1390
        logger.warn("session insertRecords fail:{}", ignored.getMessage());
1✔
1391
      }
×
1392
    }
1393
  }
1✔
1394

1395
  /**
1396
   * When the value is null,filter this,don't use this measurement.
1397
   *
1398
   * @param times
1399
   * @param measurementsList
1400
   * @param valuesList
1401
   * @param typesList
1402
   */
1403
  private void filterNullValueAndMeasurement(
1404
      List<String> deviceIds,
1405
      List<Long> times,
1406
      List<List<String>> measurementsList,
1407
      List<List<Object>> valuesList,
1408
      List<List<TSDataType>> typesList) {
1409
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1410
      List<Object> values = valuesList.get(i);
×
1411
      List<String> measurements = measurementsList.get(i);
×
1412
      List<TSDataType> types = typesList.get(i);
×
1413
      boolean isAllValuesNull =
×
1414
          filterNullValueAndMeasurement(deviceIds.get(i), measurements, types, values);
×
1415
      if (isAllValuesNull) {
×
1416
        valuesList.remove(i);
×
1417
        measurementsList.remove(i);
×
1418
        deviceIds.remove(i);
×
1419
        times.remove(i);
×
1420
        typesList.remove(i);
×
1421
      }
1422
    }
1423
    if (valuesList.isEmpty()) {
×
1424
      throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1425
    }
1426
  }
×
1427

1428
  /**
1429
   * Filter the null value of list。
1430
   *
1431
   * @param deviceId
1432
   * @param times
1433
   * @param measurementsList
1434
   * @param typesList
1435
   * @param valuesList
1436
   */
1437
  private void filterNullValueAndMeasurementOfOneDevice(
1438
      String deviceId,
1439
      List<Long> times,
1440
      List<List<String>> measurementsList,
1441
      List<List<TSDataType>> typesList,
1442
      List<List<Object>> valuesList) {
1443
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1444
      List<Object> values = valuesList.get(i);
×
1445
      List<String> measurements = measurementsList.get(i);
×
1446
      List<TSDataType> types = typesList.get(i);
×
1447
      boolean isAllValuesNull =
×
1448
          filterNullValueAndMeasurement(deviceId, measurements, types, values);
×
1449
      if (isAllValuesNull) {
×
1450
        valuesList.remove(i);
×
1451
        measurementsList.remove(i);
×
1452
        typesList.remove(i);
×
1453
        times.remove(i);
×
1454
      }
1455
    }
1456
    if (valuesList.isEmpty()) {
×
1457
      throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1458
    }
1459
  }
×
1460

1461
  /**
1462
   * Filter the null value of list。
1463
   *
1464
   * @param times
1465
   * @param deviceId
1466
   * @param measurementsList
1467
   * @param valuesList
1468
   */
1469
  private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
1470
      List<Long> times,
1471
      String deviceId,
1472
      List<List<String>> measurementsList,
1473
      List<List<String>> valuesList) {
1474
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1475
      List<String> values = valuesList.get(i);
×
1476
      List<String> measurements = measurementsList.get(i);
×
1477
      boolean isAllValuesNull =
×
1478
          filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
×
1479
      if (isAllValuesNull) {
×
1480
        valuesList.remove(i);
×
1481
        measurementsList.remove(i);
×
1482
        times.remove(i);
×
1483
      }
1484
    }
1485
    if (valuesList.isEmpty()) {
×
1486
      throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1487
    }
1488
  }
×
1489

1490
  /**
1491
   * Filter the null object of list。
1492
   *
1493
   * @param deviceId
1494
   * @param measurementsList
1495
   * @param types
1496
   * @param valuesList
1497
   * @return true:all value is null;false:not all null value is null.
1498
   */
1499
  private boolean filterNullValueAndMeasurement(
1500
      String deviceId,
1501
      List<String> measurementsList,
1502
      List<TSDataType> types,
1503
      List<Object> valuesList) {
1504
    Map<String, Object> nullMap = new HashMap<>();
×
1505
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1506
      if (valuesList.get(i) == null) {
×
1507
        nullMap.put(measurementsList.get(i), valuesList.get(i));
×
1508
        valuesList.remove(i);
×
1509
        measurementsList.remove(i);
×
1510
        types.remove(i);
×
1511
      }
1512
    }
1513
    if (valuesList.isEmpty()) {
×
1514
      logger.info("All values of the {} are null,null values are {}", deviceId, nullMap);
×
1515
      return true;
×
1516
    } else {
1517
      logger.info("Some values of {} are null,null values are {}", deviceId, nullMap);
×
1518
    }
1519
    return false;
×
1520
  }
1521

1522
  /**
1523
   * Filter the null object of list。
1524
   *
1525
   * @param prefixPaths devices path。
1526
   * @param times
1527
   * @param measurementsList
1528
   * @param valuesList
1529
   * @return true:all values of valuesList are null;false:Not all values of valuesList are null.
1530
   */
1531
  private void filterNullValueAndMeasurementWithStringType(
1532
      List<String> prefixPaths,
1533
      List<Long> times,
1534
      List<List<String>> measurementsList,
1535
      List<List<String>> valuesList) {
1536
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1537
      List<String> values = valuesList.get(i);
×
1538
      List<String> measurements = measurementsList.get(i);
×
1539
      boolean isAllValueNull =
×
1540
          filterNullValueAndMeasurementWithStringType(values, prefixPaths.get(i), measurements);
×
1541
      if (isAllValueNull) {
×
1542
        valuesList.remove(i);
×
1543
        measurementsList.remove(i);
×
1544
        times.remove(i);
×
1545
        prefixPaths.remove(i);
×
1546
      }
1547
    }
1548
    if (valuesList.isEmpty()) {
×
1549
      throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1550
    }
1551
  }
×
1552

1553
  /**
1554
   * When the value is null,filter this,don't use this measurement.
1555
   *
1556
   * @param valuesList
1557
   * @param measurementsList
1558
   * @return true:all value is null;false:not all null value is null.
1559
   */
1560
  private boolean filterNullValueAndMeasurementWithStringType(
1561
      List<String> valuesList, String deviceId, List<String> measurementsList) {
1562
    Map<String, Object> nullMap = new HashMap<>();
×
1563
    for (int i = valuesList.size() - 1; i >= 0; i--) {
×
1564
      if (valuesList.get(i) == null) {
×
1565
        nullMap.put(measurementsList.get(i), valuesList.get(i));
×
1566
        valuesList.remove(i);
×
1567
        measurementsList.remove(i);
×
1568
      }
1569
    }
1570
    if (valuesList.isEmpty()) {
×
1571
      logger.info("All values of the {} are null,null values are {}", deviceId, nullMap);
×
1572
      return true;
×
1573
    } else {
1574
      logger.info("Some values of {} are null,null values are {}", deviceId, nullMap);
×
1575
    }
1576
    return false;
×
1577
  }
1578

1579
  private boolean hasNull(List valuesList) {
1580
    boolean haveNull = false;
1✔
1581
    for (int i1 = 0; i1 < valuesList.size(); i1++) {
1✔
1582
      Object o = valuesList.get(i1);
1✔
1583
      if (o instanceof List) {
1✔
1584
        List o1 = (List) o;
1✔
1585
        if (hasNull(o1)) {
1✔
1586
          haveNull = true;
×
1587
          break;
×
1588
        }
1589
      } else {
1✔
1590
        if (o == null) {
1✔
1591
          haveNull = true;
×
1592
          break;
×
1593
        }
1594
      }
1595
    }
1596
    return haveNull;
1✔
1597
  }
1598

1599
  /**
1600
   * Insert aligned multiple rows, which can reduce the overhead of network. This method is just
1601
   * like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
1602
   * want improve your performance, please see insertTablet method
1603
   *
1604
   * <p>Each row is independent, which could have different prefixPath, time, number of
1605
   * subMeasurements
1606
   *
1607
   * @see Session#insertTablet(Tablet)
1608
   */
1609
  @Override
1610
  public void insertAlignedRecords(
1611
      List<String> deviceIds,
1612
      List<Long> times,
1613
      List<List<String>> measurementsList,
1614
      List<List<String>> valuesList)
1615
      throws IoTDBConnectionException, StatementExecutionException {
1616
    int len = deviceIds.size();
×
1617
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
×
1618
      throw new IllegalArgumentException(
×
1619
          "prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
1620
    }
1621
    if (enableRedirection) {
×
1622
      insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
×
1623
    } else {
1624
      TSInsertStringRecordsReq request;
1625
      try {
1626
        request =
×
1627
            filterAndGenTSInsertStringRecordsReq(
×
1628
                deviceIds, times, measurementsList, valuesList, true);
1629
      } catch (NoValidValueException e) {
×
1630
        logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
1631
        return;
×
1632
      }
×
1633

1634
      try {
1635
        defaultSessionConnection.insertRecords(request);
×
1636
      } catch (RedirectException ignored) {
×
1637
        logger.warn("session insertRecords fail:{}", ignored.getMessage());
×
1638
      }
×
1639
    }
1640
  }
×
1641

1642
  private void insertStringRecordsWithLeaderCache(
1643
      List<String> deviceIds,
1644
      List<Long> times,
1645
      List<List<String>> measurementsList,
1646
      List<List<String>> valuesList,
1647
      boolean isAligned)
1648
      throws IoTDBConnectionException, StatementExecutionException {
1649
    Map<SessionConnection, TSInsertStringRecordsReq> recordsGroup = new HashMap<>();
1✔
1650
    for (int i = 0; i < deviceIds.size(); i++) {
1✔
1651
      final SessionConnection connection = getSessionConnection(deviceIds.get(i));
1✔
1652
      TSInsertStringRecordsReq request =
1✔
1653
          recordsGroup.getOrDefault(connection, new TSInsertStringRecordsReq());
1✔
1654
      request.setIsAligned(isAligned);
1✔
1655
      try {
1656
        filterAndUpdateTSInsertStringRecordsReq(
1✔
1657
            request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
1✔
1658
        recordsGroup.putIfAbsent(connection, request);
1✔
1659
      } catch (NoValidValueException e) {
×
1660
        logger.warn(
×
1661
            ALL_VALUES_ARE_NULL,
1662
            deviceIds.get(i),
×
1663
            times.get(i),
×
1664
            measurementsList.get(i).toString());
×
1665
      }
1✔
1666
    }
1667

1668
    insertByGroup(recordsGroup, SessionConnection::insertRecords);
1✔
1669
  }
1✔
1670

1671
  private TSInsertStringRecordsReq filterAndGenTSInsertStringRecordsReq(
1672
      List<String> prefixPaths,
1673
      List<Long> time,
1674
      List<List<String>> measurements,
1675
      List<List<String>> values,
1676
      boolean isAligned) {
1677
    if (hasNull(values)) {
1✔
1678
      values = changeToArrayListWithStringType(values);
×
1679
      measurements = changeToArrayListWithStringType(measurements);
×
1680
      prefixPaths = new ArrayList<>(prefixPaths);
×
1681
      time = new ArrayList<>(time);
×
1682
      filterNullValueAndMeasurementWithStringType(prefixPaths, time, measurements, values);
×
1683
    }
1684
    return genTSInsertStringRecordsReq(prefixPaths, time, measurements, values, isAligned);
1✔
1685
  }
1686

1687
  private List<List<String>> changeToArrayListWithStringType(List<List<String>> values) {
1688
    if (!(values instanceof ArrayList)) {
×
1689
      values = new ArrayList<>(values);
×
1690
    }
1691
    for (int i = 0; i < values.size(); i++) {
×
1692
      List<String> currentValue = values.get(i);
×
1693
      if (!(currentValue instanceof ArrayList)) {
×
1694
        values.set(i, new ArrayList<>(currentValue));
×
1695
      }
1696
    }
1697
    return values;
×
1698
  }
1699

1700
  private List<List<Object>> changeToArrayList(List<List<Object>> values) {
1701
    if (!(values instanceof ArrayList)) {
×
1702
      values = new ArrayList<>(values);
×
1703
    }
1704
    for (int i = 0; i < values.size(); i++) {
×
1705
      List<Object> currentValue = values.get(i);
×
1706
      if (!(currentValue instanceof ArrayList)) {
×
1707
        values.set(i, new ArrayList<>(currentValue));
×
1708
      }
1709
    }
1710
    return values;
×
1711
  }
1712

1713
  private List<List<TSDataType>> changeToArrayListWithTSDataType(List<List<TSDataType>> values) {
1714
    if (!(values instanceof ArrayList)) {
×
1715
      values = new ArrayList<>(values);
×
1716
    }
1717
    for (int i = 0; i < values.size(); i++) {
×
1718
      List<TSDataType> currentValue = values.get(i);
×
1719
      if (!(currentValue instanceof ArrayList)) {
×
1720
        values.set(i, new ArrayList<>(currentValue));
×
1721
      }
1722
    }
1723
    return values;
×
1724
  }
1725

1726
  private TSInsertStringRecordsReq genTSInsertStringRecordsReq(
1727
      List<String> prefixPaths,
1728
      List<Long> time,
1729
      List<List<String>> measurements,
1730
      List<List<String>> values,
1731
      boolean isAligned) {
1732
    TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
1✔
1733

1734
    request.setPrefixPaths(prefixPaths);
1✔
1735
    request.setTimestamps(time);
1✔
1736
    request.setMeasurementsList(measurements);
1✔
1737
    request.setValuesList(values);
1✔
1738
    request.setIsAligned(isAligned);
1✔
1739
    return request;
1✔
1740
  }
1741

1742
  private void filterAndUpdateTSInsertStringRecordsReq(
1743
      TSInsertStringRecordsReq request,
1744
      String deviceId,
1745
      long time,
1746
      List<String> measurements,
1747
      List<String> values) {
1748
    if (hasNull(values)) {
1✔
1749
      measurements = new ArrayList<>(measurements);
×
1750
      values = new ArrayList<>(values);
×
1751
      boolean isAllValueNull =
×
1752
          filterNullValueAndMeasurementWithStringType(values, deviceId, measurements);
×
1753
      if (isAllValueNull) {
×
1754
        throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
1755
      }
1756
    }
1757
    updateTSInsertStringRecordsReq(request, deviceId, time, measurements, values);
1✔
1758
  }
1✔
1759

1760
  private void updateTSInsertStringRecordsReq(
1761
      TSInsertStringRecordsReq request,
1762
      String deviceId,
1763
      long time,
1764
      List<String> measurements,
1765
      List<String> values) {
1766
    request.addToPrefixPaths(deviceId);
1✔
1767
    request.addToTimestamps(time);
1✔
1768
    request.addToMeasurementsList(measurements);
1✔
1769
    request.addToValuesList(values);
1✔
1770
  }
1✔
1771

1772
  /**
1773
   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
1774
   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
1775
   * your performance, please see insertTablet method
1776
   *
1777
   * <p>Each row is independent, which could have different deviceId, time, number of measurements
1778
   *
1779
   * @see Session#insertTablet(Tablet)
1780
   */
1781
  @Override
1782
  public void insertRecords(
1783
      List<String> deviceIds,
1784
      List<Long> times,
1785
      List<List<String>> measurementsList,
1786
      List<List<TSDataType>> typesList,
1787
      List<List<Object>> valuesList)
1788
      throws IoTDBConnectionException, StatementExecutionException {
1789
    int len = deviceIds.size();
1✔
1790
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
1✔
1791
      throw new IllegalArgumentException(
×
1792
          "deviceIds, times, measurementsList and valuesList's size should be equal");
1793
    }
1794
    if (enableRedirection) {
1✔
1795
      insertRecordsWithLeaderCache(
1✔
1796
          deviceIds, times, measurementsList, typesList, valuesList, false);
1797
    } else {
1798
      TSInsertRecordsReq request;
1799
      try {
1800
        request =
1✔
1801
            filterAndGenTSInsertRecordsReq(
1✔
1802
                deviceIds, times, measurementsList, typesList, valuesList, false);
1803
      } catch (NoValidValueException e) {
×
1804
        logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
1805
        return;
×
1806
      }
1✔
1807
      try {
1808
        defaultSessionConnection.insertRecords(request);
×
1809
      } catch (RedirectException ignored) {
1✔
1810
        logger.warn("session insertRecords fail:{}", ignored.getMessage());
1✔
1811
      }
×
1812
    }
1813
  }
1✔
1814

1815
  /**
1816
   * Insert aligned multiple rows, which can reduce the overhead of network. This method is just
1817
   * like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
1818
   * want improve your performance, please see insertTablet method
1819
   *
1820
   * <p>Each row is independent, which could have different prefixPath, time, number of
1821
   * subMeasurements
1822
   *
1823
   * @see Session#insertTablet(Tablet)
1824
   */
1825
  @Override
1826
  public void insertAlignedRecords(
1827
      List<String> deviceIds,
1828
      List<Long> times,
1829
      List<List<String>> measurementsList,
1830
      List<List<TSDataType>> typesList,
1831
      List<List<Object>> valuesList)
1832
      throws IoTDBConnectionException, StatementExecutionException {
1833
    int len = deviceIds.size();
×
1834
    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
×
1835
      throw new IllegalArgumentException(
×
1836
          "prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
1837
    }
1838
    if (enableRedirection) {
×
1839
      insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
×
1840
    } else {
1841
      TSInsertRecordsReq request;
1842
      try {
1843
        request =
×
1844
            filterAndGenTSInsertRecordsReq(
×
1845
                deviceIds, times, measurementsList, typesList, valuesList, true);
1846
      } catch (NoValidValueException e) {
×
1847
        logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
1848
        return;
×
1849
      }
×
1850
      try {
1851
        defaultSessionConnection.insertRecords(request);
×
1852
      } catch (RedirectException ignored) {
×
1853
        logger.warn("session insertRecords fail:{}", ignored.getMessage());
×
1854
      }
×
1855
    }
1856
  }
×
1857

1858
  /**
1859
   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
1860
   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
1861
   * your performance, please see insertTablet method
1862
   *
1863
   * <p>Each row could have same deviceId but different time, number of measurements
1864
   *
1865
   * @see Session#insertTablet(Tablet)
1866
   */
1867
  @Override
1868
  public void insertRecordsOfOneDevice(
1869
      String deviceId,
1870
      List<Long> times,
1871
      List<List<String>> measurementsList,
1872
      List<List<TSDataType>> typesList,
1873
      List<List<Object>> valuesList)
1874
      throws IoTDBConnectionException, StatementExecutionException {
1875
    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
1✔
1876
  }
1✔
1877

1878
  /**
1879
   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
1880
   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
1881
   * your performance, please see insertTablet method
1882
   *
1883
   * <p>Each row could have same deviceId but different time, number of measurements
1884
   *
1885
   * @param haveSorted deprecated, whether the times have been sorted
1886
   * @see Session#insertTablet(Tablet)
1887
   */
1888
  @Override
1889
  public void insertRecordsOfOneDevice(
1890
      String deviceId,
1891
      List<Long> times,
1892
      List<List<String>> measurementsList,
1893
      List<List<TSDataType>> typesList,
1894
      List<List<Object>> valuesList,
1895
      boolean haveSorted)
1896
      throws IoTDBConnectionException, StatementExecutionException {
1897
    int len = times.size();
1✔
1898
    if (len != measurementsList.size() || len != valuesList.size()) {
1✔
1899
      throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
×
1900
    }
1901
    TSInsertRecordsOfOneDeviceReq request;
1902
    try {
1903
      request =
1✔
1904
          filterAndGenTSInsertRecordsOfOneDeviceReq(
1✔
1905
              deviceId, times, measurementsList, typesList, valuesList, haveSorted, false);
1906
    } catch (NoValidValueException e) {
×
1907
      logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
×
1908
      return;
×
1909
    }
1✔
1910
    try {
1911
      getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
×
1912
    } catch (RedirectException e) {
1✔
1913
      handleRedirection(deviceId, e.getEndPoint());
1✔
1914
    } catch (IoTDBConnectionException e) {
×
1915
      if (enableRedirection
×
1916
          && !deviceIdToEndpoint.isEmpty()
×
1917
          && deviceIdToEndpoint.get(deviceId) != null) {
×
1918
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
×
1919
        deviceIdToEndpoint.remove(deviceId);
×
1920

1921
        // reconnect with default connection
1922
        try {
1923
          defaultSessionConnection.insertRecordsOfOneDevice(request);
×
1924
        } catch (RedirectException ignored) {
×
1925
          logger.warn("session insertRecordsOfOneDevice fail:{}", ignored.getMessage());
×
1926
        }
×
1927
      } else {
1928
        throw e;
×
1929
      }
1930
    }
1✔
1931
  }
1✔
1932

1933
  /**
1934
   * Insert multiple rows with String format data, which can reduce the overhead of network. This
1935
   * method is just like jdbc executeBatch, we pack some insert request in batch and send them to
1936
   * server. If you want improve your performance, please see insertTablet method
1937
   *
1938
   * <p>Each row could have same deviceId but different time, number of measurements, number of
1939
   * values as String
1940
   *
1941
   * @param haveSorted deprecated, whether the times have been sorted
1942
   */
1943
  @Override
1944
  public void insertStringRecordsOfOneDevice(
1945
      String deviceId,
1946
      List<Long> times,
1947
      List<List<String>> measurementsList,
1948
      List<List<String>> valuesList,
1949
      boolean haveSorted)
1950
      throws IoTDBConnectionException, StatementExecutionException {
1951
    int len = times.size();
×
1952
    if (len != measurementsList.size() || len != valuesList.size()) {
×
1953
      throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
×
1954
    }
1955
    TSInsertStringRecordsOfOneDeviceReq req;
1956
    try {
1957
      req =
×
1958
          filterAndGenTSInsertStringRecordsOfOneDeviceReq(
×
1959
              deviceId, times, measurementsList, valuesList, haveSorted, false);
1960
    } catch (NoValidValueException e) {
×
1961
      logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
×
1962
      return;
×
1963
    }
×
1964
    try {
1965
      getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
×
1966
    } catch (RedirectException e) {
×
1967
      handleRedirection(deviceId, e.getEndPoint());
×
1968
    } catch (IoTDBConnectionException e) {
×
1969
      if (enableRedirection
×
1970
          && !deviceIdToEndpoint.isEmpty()
×
1971
          && deviceIdToEndpoint.get(deviceId) != null) {
×
1972
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
×
1973
        deviceIdToEndpoint.remove(deviceId);
×
1974

1975
        // reconnect with default connection
1976
        try {
1977
          defaultSessionConnection.insertStringRecordsOfOneDevice(req);
×
1978
        } catch (RedirectException ignored) {
×
1979
          logger.warn("session insertStringRecordsOfOneDevice fail:{}", ignored.getMessage());
×
1980
        }
×
1981
      } else {
1982
        throw e;
×
1983
      }
1984
    }
×
1985
  }
×
1986

1987
  /**
1988
   * Insert multiple rows with String format data, which can reduce the overhead of network. This
1989
   * method is just like jdbc executeBatch, we pack some insert request in batch and send them to
1990
   * server. If you want improve your performance, please see insertTablet method
1991
   *
1992
   * <p>Each row could have same deviceId but different time, number of measurements, number of
1993
   * values as String
1994
   */
1995
  @Override
1996
  public void insertStringRecordsOfOneDevice(
1997
      String deviceId,
1998
      List<Long> times,
1999
      List<List<String>> measurementsList,
2000
      List<List<String>> valuesList)
2001
      throws IoTDBConnectionException, StatementExecutionException {
2002
    insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
×
2003
  }
×
2004

2005
  /**
2006
   * Insert aligned multiple rows, which can reduce the overhead of network. This method is just
2007
   * like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
2008
   * want improve your performance, please see insertTablet method
2009
   *
2010
   * <p>Each row could have same prefixPath but different time, number of measurements
2011
   *
2012
   * @see Session#insertTablet(Tablet)
2013
   */
2014
  @Override
2015
  public void insertAlignedRecordsOfOneDevice(
2016
      String deviceId,
2017
      List<Long> times,
2018
      List<List<String>> measurementsList,
2019
      List<List<TSDataType>> typesList,
2020
      List<List<Object>> valuesList)
2021
      throws IoTDBConnectionException, StatementExecutionException {
2022
    insertAlignedRecordsOfOneDevice(
×
2023
        deviceId, times, measurementsList, typesList, valuesList, false);
2024
  }
×
2025

2026
  /**
2027
   * Insert aligned multiple rows, which can reduce the overhead of network. This method is just
2028
   * like jdbc executeBatch, we pack some insert request in batch and send them to server. If you
2029
   * want improve your performance, please see insertTablet method
2030
   *
2031
   * <p>Each row could have same prefixPath but different time, number of measurements
2032
   *
2033
   * @param haveSorted deprecated, whether the times have been sorted
2034
   * @see Session#insertTablet(Tablet)
2035
   */
2036
  @Override
2037
  public void insertAlignedRecordsOfOneDevice(
2038
      String deviceId,
2039
      List<Long> times,
2040
      List<List<String>> measurementsList,
2041
      List<List<TSDataType>> typesList,
2042
      List<List<Object>> valuesList,
2043
      boolean haveSorted)
2044
      throws IoTDBConnectionException, StatementExecutionException {
2045
    int len = times.size();
×
2046
    if (len != measurementsList.size() || len != valuesList.size()) {
×
2047
      throw new IllegalArgumentException(
×
2048
          "times, subMeasurementsList and valuesList's size should be equal");
2049
    }
2050
    TSInsertRecordsOfOneDeviceReq request;
2051
    try {
2052
      request =
×
2053
          filterAndGenTSInsertRecordsOfOneDeviceReq(
×
2054
              deviceId, times, measurementsList, typesList, valuesList, haveSorted, true);
2055
    } catch (NoValidValueException e) {
×
2056
      logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
×
2057
      return;
×
2058
    }
×
2059
    try {
2060
      getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
×
2061
    } catch (RedirectException e) {
×
2062
      handleRedirection(deviceId, e.getEndPoint());
×
2063
    } catch (IoTDBConnectionException e) {
×
2064
      if (enableRedirection
×
2065
          && !deviceIdToEndpoint.isEmpty()
×
2066
          && deviceIdToEndpoint.get(deviceId) != null) {
×
2067
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
×
2068
        deviceIdToEndpoint.remove(deviceId);
×
2069

2070
        // reconnect with default connection
2071
        try {
2072
          defaultSessionConnection.insertRecordsOfOneDevice(request);
×
2073
        } catch (RedirectException ignored) {
×
2074
          logger.warn("session insertRecordsOfOneDevice fail:{}", ignored.getMessage());
×
2075
        }
×
2076
      } else {
2077
        throw e;
×
2078
      }
2079
    }
×
2080
  }
×
2081

2082
  /**
2083
   * Insert multiple rows with String format data, which can reduce the overhead of network. This
2084
   * method is just like jdbc executeBatch, we pack some insert request in batch and send them to
2085
   * server. If you want improve your performance, please see insertTablet method
2086
   *
2087
   * <p>Each row could have same deviceId but different time, number of measurements, number of
2088
   * values as String
2089
   *
2090
   * @param haveSorted deprecated, whether the times have been sorted
2091
   */
2092
  @Override
2093
  public void insertAlignedStringRecordsOfOneDevice(
2094
      String deviceId,
2095
      List<Long> times,
2096
      List<List<String>> measurementsList,
2097
      List<List<String>> valuesList,
2098
      boolean haveSorted)
2099
      throws IoTDBConnectionException, StatementExecutionException {
2100
    int len = times.size();
×
2101
    if (len != measurementsList.size() || len != valuesList.size()) {
×
2102
      throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
×
2103
    }
2104
    TSInsertStringRecordsOfOneDeviceReq req;
2105
    try {
2106
      req =
×
2107
          filterAndGenTSInsertStringRecordsOfOneDeviceReq(
×
2108
              deviceId, times, measurementsList, valuesList, haveSorted, true);
2109
    } catch (NoValidValueException e) {
×
2110
      logger.warn(ALL_VALUES_ARE_NULL_WITH_TIME, deviceId, times, measurementsList);
×
2111
      return;
×
2112
    }
×
2113
    try {
2114
      getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
×
2115
    } catch (RedirectException e) {
×
2116
      handleRedirection(deviceId, e.getEndPoint());
×
2117
    } catch (IoTDBConnectionException e) {
×
2118
      if (enableRedirection
×
2119
          && !deviceIdToEndpoint.isEmpty()
×
2120
          && deviceIdToEndpoint.get(deviceId) != null) {
×
2121
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
×
2122
        deviceIdToEndpoint.remove(deviceId);
×
2123

2124
        // reconnect with default connection
2125
        try {
2126
          defaultSessionConnection.insertStringRecordsOfOneDevice(req);
×
2127
        } catch (RedirectException ignored) {
×
2128
          logger.warn("session insertStringRecordsOfOneDevice fail:{}", ignored.getMessage());
×
2129
        }
×
2130
      } else {
2131
        throw e;
×
2132
      }
2133
    }
×
2134
  }
×
2135

2136
  /**
2137
   * Insert aligned multiple rows with String format data, which can reduce the overhead of network.
2138
   * This method is just like jdbc executeBatch, we pack some insert request in batch and send them
2139
   * to server. If you want improve your performance, please see insertTablet method
2140
   *
2141
   * <p>Each row could have same prefixPath but different time, number of measurements, number of
2142
   * values as String
2143
   *
2144
   * @see Session#insertTablet(Tablet)
2145
   */
2146
  @Override
2147
  public void insertAlignedStringRecordsOfOneDevice(
2148
      String deviceId,
2149
      List<Long> times,
2150
      List<List<String>> measurementsList,
2151
      List<List<String>> valuesList)
2152
      throws IoTDBConnectionException, StatementExecutionException {
2153
    insertAlignedStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList, false);
×
2154
  }
×
2155

2156
  private TSInsertRecordsOfOneDeviceReq filterAndGenTSInsertRecordsOfOneDeviceReq(
2157
      String prefixPath,
2158
      List<Long> times,
2159
      List<List<String>> measurementsList,
2160
      List<List<TSDataType>> typesList,
2161
      List<List<Object>> valuesList,
2162
      boolean haveSorted,
2163
      boolean isAligned)
2164
      throws IoTDBConnectionException {
2165
    if (hasNull(valuesList)) {
1✔
2166
      measurementsList = changeToArrayListWithStringType(measurementsList);
×
2167
      valuesList = changeToArrayList(valuesList);
×
2168
      typesList = changeToArrayListWithTSDataType(typesList);
×
2169
      times = new ArrayList<>(times);
×
2170
      filterNullValueAndMeasurementOfOneDevice(
×
2171
          prefixPath, times, measurementsList, typesList, valuesList);
2172
    }
2173
    return genTSInsertRecordsOfOneDeviceReq(
1✔
2174
        prefixPath, times, measurementsList, typesList, valuesList, haveSorted, isAligned);
2175
  }
2176

2177
  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(
2178
      String prefixPath,
2179
      List<Long> times,
2180
      List<List<String>> measurementsList,
2181
      List<List<TSDataType>> typesList,
2182
      List<List<Object>> valuesList,
2183
      boolean haveSorted,
2184
      boolean isAligned)
2185
      throws IoTDBConnectionException {
2186
    // check params size
2187
    int len = times.size();
1✔
2188
    if (len != measurementsList.size() || len != valuesList.size()) {
1✔
2189
      throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
×
2190
    }
2191

2192
    if (!checkSorted(times)) {
1✔
2193
      // sort
2194
      Integer[] index = new Integer[times.size()];
1✔
2195
      for (int i = 0; i < times.size(); i++) {
1✔
2196
        index[i] = i;
1✔
2197
      }
2198
      Arrays.sort(index, Comparator.comparingLong(times::get));
1✔
2199
      times.sort(Long::compareTo);
1✔
2200
      // sort measurementList
2201
      measurementsList = sortList(measurementsList, index);
1✔
2202
      // sort typesList
2203
      typesList = sortList(typesList, index);
1✔
2204
      // sort values
2205
      valuesList = sortList(valuesList, index);
1✔
2206
    }
2207

2208
    TSInsertRecordsOfOneDeviceReq request = new TSInsertRecordsOfOneDeviceReq();
1✔
2209
    request.setPrefixPath(prefixPath);
1✔
2210
    request.setTimestamps(times);
1✔
2211
    request.setMeasurementsList(measurementsList);
1✔
2212
    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
1✔
2213
    request.setValuesList(buffersList);
1✔
2214
    request.setIsAligned(isAligned);
1✔
2215
    return request;
1✔
2216
  }
2217

2218
  private TSInsertStringRecordsOfOneDeviceReq filterAndGenTSInsertStringRecordsOfOneDeviceReq(
2219
      String prefixPath,
2220
      List<Long> times,
2221
      List<List<String>> measurementsList,
2222
      List<List<String>> valuesList,
2223
      boolean haveSorted,
2224
      boolean isAligned) {
2225
    if (hasNull(valuesList)) {
×
2226
      measurementsList = changeToArrayListWithStringType(measurementsList);
×
2227
      valuesList = changeToArrayListWithStringType(valuesList);
×
2228
      times = new ArrayList<>(times);
×
2229
      filterNullValueAndMeasurementWithStringTypeOfOneDevice(
×
2230
          times, prefixPath, measurementsList, valuesList);
2231
    }
2232
    return genTSInsertStringRecordsOfOneDeviceReq(
×
2233
        prefixPath, times, measurementsList, valuesList, haveSorted, isAligned);
2234
  }
2235

2236
  private TSInsertStringRecordsOfOneDeviceReq genTSInsertStringRecordsOfOneDeviceReq(
2237
      String prefixPath,
2238
      List<Long> times,
2239
      List<List<String>> measurementsList,
2240
      List<List<String>> valuesList,
2241
      boolean haveSorted,
2242
      boolean isAligned) {
2243
    // check params size
2244
    int len = times.size();
×
2245
    if (len != measurementsList.size() || len != valuesList.size()) {
×
2246
      throw new IllegalArgumentException(VALUES_SIZE_SHOULD_BE_EQUAL);
×
2247
    }
2248

2249
    if (!checkSorted(times)) {
×
2250
      Integer[] index = new Integer[times.size()];
×
2251
      for (int i = 0; i < index.length; i++) {
×
2252
        index[i] = i;
×
2253
      }
2254
      Arrays.sort(index, Comparator.comparingLong(times::get));
×
2255
      times.sort(Long::compareTo);
×
2256
      // sort measurementsList
2257
      measurementsList = sortList(measurementsList, index);
×
2258
      // sort valuesList
2259
      valuesList = sortList(valuesList, index);
×
2260
    }
2261

2262
    TSInsertStringRecordsOfOneDeviceReq req = new TSInsertStringRecordsOfOneDeviceReq();
×
2263
    req.setPrefixPath(prefixPath);
×
2264
    req.setTimestamps(times);
×
2265
    req.setMeasurementsList(measurementsList);
×
2266
    req.setValuesList(valuesList);
×
2267
    req.setIsAligned(isAligned);
×
2268
    return req;
×
2269
  }
2270

2271
  /**
2272
   * Sort the input source list.
2273
   *
2274
   * <p>e.g. source: [1,2,3,4,5], index:[1,0,3,2,4], return : [2,1,4,3,5]
2275
   *
2276
   * @param source Input list
2277
   * @param index retuen order
2278
   * @param <T> Input type
2279
   * @return ordered list
2280
   */
2281
  private static <T> List<T> sortList(List<T> source, Integer[] index) {
2282
    return Arrays.stream(index).map(source::get).collect(Collectors.toList());
1✔
2283
  }
2284

2285
  private List<ByteBuffer> objectValuesListToByteBufferList(
2286
      List<List<Object>> valuesList, List<List<TSDataType>> typesList)
2287
      throws IoTDBConnectionException {
2288
    List<ByteBuffer> buffersList = new ArrayList<>();
1✔
2289
    for (int i = 0; i < valuesList.size(); i++) {
1✔
2290
      ByteBuffer buffer = SessionUtils.getValueBuffer(typesList.get(i), valuesList.get(i));
1✔
2291
      buffersList.add(buffer);
1✔
2292
    }
2293
    return buffersList;
1✔
2294
  }
2295

2296
  private void insertRecordsWithLeaderCache(
2297
      List<String> deviceIds,
2298
      List<Long> times,
2299
      List<List<String>> measurementsList,
2300
      List<List<TSDataType>> typesList,
2301
      List<List<Object>> valuesList,
2302
      boolean isAligned)
2303
      throws IoTDBConnectionException, StatementExecutionException {
2304
    Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
1✔
2305
    for (int i = 0; i < deviceIds.size(); i++) {
1✔
2306
      final SessionConnection connection = getSessionConnection(deviceIds.get(i));
1✔
2307
      TSInsertRecordsReq request = recordsGroup.getOrDefault(connection, new TSInsertRecordsReq());
1✔
2308
      request.setIsAligned(isAligned);
1✔
2309
      try {
2310
        filterAndUpdateTSInsertRecordsReq(
1✔
2311
            request,
2312
            deviceIds.get(i),
1✔
2313
            times.get(i),
1✔
2314
            measurementsList.get(i),
1✔
2315
            typesList.get(i),
1✔
2316
            valuesList.get(i));
1✔
2317
        recordsGroup.putIfAbsent(connection, request);
1✔
2318
      } catch (NoValidValueException e) {
×
2319
        logger.warn(
×
2320
            "All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]",
2321
            deviceIds.get(i),
×
2322
            times.get(i),
×
2323
            measurementsList.get(i));
×
2324
      }
1✔
2325
    }
2326
    insertByGroup(recordsGroup, SessionConnection::insertRecords);
1✔
2327
  }
1✔
2328

2329
  private TSInsertRecordsReq filterAndGenTSInsertRecordsReq(
2330
      List<String> deviceIds,
2331
      List<Long> times,
2332
      List<List<String>> measurementsList,
2333
      List<List<TSDataType>> typesList,
2334
      List<List<Object>> valuesList,
2335
      boolean isAligned)
2336
      throws IoTDBConnectionException {
2337
    if (hasNull(valuesList)) {
1✔
2338
      measurementsList = changeToArrayListWithStringType(measurementsList);
×
2339
      valuesList = changeToArrayList(valuesList);
×
2340
      deviceIds = new ArrayList<>(deviceIds);
×
2341
      times = new ArrayList<>(times);
×
2342
      typesList = changeToArrayListWithTSDataType(typesList);
×
2343
      filterNullValueAndMeasurement(deviceIds, times, measurementsList, valuesList, typesList);
×
2344
    }
2345
    return genTSInsertRecordsReq(
1✔
2346
        deviceIds, times, measurementsList, typesList, valuesList, isAligned);
2347
  }
2348

2349
  private TSInsertRecordsReq genTSInsertRecordsReq(
2350
      List<String> deviceIds,
2351
      List<Long> times,
2352
      List<List<String>> measurementsList,
2353
      List<List<TSDataType>> typesList,
2354
      List<List<Object>> valuesList,
2355
      boolean isAligned)
2356
      throws IoTDBConnectionException {
2357
    TSInsertRecordsReq request = new TSInsertRecordsReq();
1✔
2358
    request.setPrefixPaths(deviceIds);
1✔
2359
    request.setTimestamps(times);
1✔
2360
    request.setMeasurementsList(measurementsList);
1✔
2361
    request.setIsAligned(isAligned);
1✔
2362
    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
1✔
2363
    request.setValuesList(buffersList);
1✔
2364
    return request;
1✔
2365
  }
2366

2367
  private void filterAndUpdateTSInsertRecordsReq(
2368
      TSInsertRecordsReq request,
2369
      String deviceId,
2370
      Long time,
2371
      List<String> measurements,
2372
      List<TSDataType> types,
2373
      List<Object> values)
2374
      throws IoTDBConnectionException {
2375
    if (hasNull(values)) {
1✔
2376
      measurements = new ArrayList<>(measurements);
×
2377
      types = new ArrayList<>(types);
×
2378
      values = new ArrayList<>(values);
×
2379
      boolean isAllValuesNull =
×
2380
          filterNullValueAndMeasurement(deviceId, measurements, types, values);
×
2381
      if (isAllValuesNull) {
×
2382
        throw new NoValidValueException(ALL_INSERT_DATA_IS_NULL);
×
2383
      }
2384
    }
2385
    updateTSInsertRecordsReq(request, deviceId, time, measurements, types, values);
1✔
2386
  }
1✔
2387

2388
  private void updateTSInsertRecordsReq(
2389
      TSInsertRecordsReq request,
2390
      String deviceId,
2391
      Long time,
2392
      List<String> measurements,
2393
      List<TSDataType> types,
2394
      List<Object> values)
2395
      throws IoTDBConnectionException {
2396
    request.addToPrefixPaths(deviceId);
1✔
2397
    request.addToTimestamps(time);
1✔
2398
    request.addToMeasurementsList(measurements);
1✔
2399
    ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
1✔
2400
    request.addToValuesList(buffer);
1✔
2401
  }
1✔
2402

2403
  /**
2404
   * insert the data of a device. For each timestamp, the number of measurements is the same.
2405
   *
2406
   * <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
2407
   *
2408
   * <p>times in Tablet may be not in ascending order
2409
   *
2410
   * @param tablet data batch
2411
   */
2412
  @Override
2413
  public void insertTablet(Tablet tablet)
2414
      throws StatementExecutionException, IoTDBConnectionException {
2415
    insertTablet(tablet, false);
×
2416
  }
×
2417

2418
  /**
2419
   * insert a Tablet
2420
   *
2421
   * @param tablet data batch
2422
   * @param sorted deprecated, whether times in Tablet are in ascending order
2423
   */
2424
  @Override
2425
  public void insertTablet(Tablet tablet, boolean sorted)
2426
      throws IoTDBConnectionException, StatementExecutionException {
2427
    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
1✔
2428
    try {
2429
      getSessionConnection(tablet.deviceId).insertTablet(request);
×
2430
    } catch (RedirectException e) {
1✔
2431
      handleRedirection(tablet.deviceId, e.getEndPoint());
1✔
2432
    } catch (IoTDBConnectionException e) {
×
2433
      if (enableRedirection
×
2434
          && !deviceIdToEndpoint.isEmpty()
×
2435
          && deviceIdToEndpoint.get(tablet.deviceId) != null) {
×
2436
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(tablet.deviceId));
×
2437
        deviceIdToEndpoint.remove(tablet.deviceId);
×
2438

2439
        // reconnect with default connection
2440
        try {
2441
          defaultSessionConnection.insertTablet(request);
×
2442
        } catch (RedirectException ignored) {
×
2443
          logger.warn("session insertTablet fail:{}", ignored.getMessage());
×
2444
        }
×
2445
      } else {
2446
        throw e;
×
2447
      }
2448
    }
1✔
2449
  }
1✔
2450

2451
  /**
2452
   * insert the aligned timeseries data of a device. For each timestamp, the number of measurements
2453
   * is the same.
2454
   *
2455
   * <p>a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
2456
   *
2457
   * <p>times in Tablet may be not in ascending order
2458
   *
2459
   * @param tablet data batch
2460
   */
2461
  @Override
2462
  public void insertAlignedTablet(Tablet tablet)
2463
      throws StatementExecutionException, IoTDBConnectionException {
2464
    insertAlignedTablet(tablet, false);
×
2465
  }
×
2466

2467
  /**
2468
   * insert the aligned timeseries data of a device.
2469
   *
2470
   * @param tablet data batch
2471
   * @param sorted deprecated, whether times in Tablet are in ascending order
2472
   */
2473
  @Override
2474
  public void insertAlignedTablet(Tablet tablet, boolean sorted)
2475
      throws IoTDBConnectionException, StatementExecutionException {
2476
    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, true);
×
2477
    try {
2478
      getSessionConnection(tablet.deviceId).insertTablet(request);
×
2479
    } catch (RedirectException e) {
×
2480
      handleRedirection(tablet.deviceId, e.getEndPoint());
×
2481
    } catch (IoTDBConnectionException e) {
×
2482
      if (enableRedirection
×
2483
          && !deviceIdToEndpoint.isEmpty()
×
2484
          && deviceIdToEndpoint.get(tablet.deviceId) != null) {
×
2485
        logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(tablet.deviceId));
×
2486
        deviceIdToEndpoint.remove(tablet.deviceId);
×
2487

2488
        // reconnect with default connection
2489
        try {
2490
          defaultSessionConnection.insertTablet(request);
×
2491
        } catch (RedirectException ignored) {
×
2492
          logger.warn("session insertTablet fail:{}", ignored.getMessage());
×
2493
        }
×
2494
      } else {
2495
        throw e;
×
2496
      }
2497
    }
×
2498
  }
×
2499

2500
  private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted, boolean isAligned) {
2501
    if (!checkSorted(tablet)) {
1✔
2502
      sortTablet(tablet);
×
2503
    }
2504

2505
    TSInsertTabletReq request = new TSInsertTabletReq();
1✔
2506

2507
    for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
1✔
2508
      request.addToMeasurements(measurementSchema.getMeasurementId());
1✔
2509
      request.addToTypes(measurementSchema.getType().ordinal());
1✔
2510
    }
1✔
2511

2512
    request.setPrefixPath(tablet.deviceId);
1✔
2513
    request.setIsAligned(isAligned);
1✔
2514
    request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
1✔
2515
    request.setValues(SessionUtils.getValueBuffer(tablet));
1✔
2516
    request.setSize(tablet.rowSize);
1✔
2517
    return request;
1✔
2518
  }
2519

2520
  /**
2521
   * insert the data of several deivces. Given a deivce, for each timestamp, the number of
2522
   * measurements is the same.
2523
   *
2524
   * <p>Times in each Tablet may not be in ascending order
2525
   *
2526
   * @param tablets data batch in multiple device
2527
   */
2528
  @Override
2529
  public void insertTablets(Map<String, Tablet> tablets)
2530
      throws IoTDBConnectionException, StatementExecutionException {
2531
    insertTablets(tablets, false);
×
2532
  }
×
2533

2534
  /**
2535
   * insert the data of several devices. Given a device, for each timestamp, the number of
2536
   * measurements is the same.
2537
   *
2538
   * @param tablets data batch in multiple device
2539
   * @param sorted deprecated, whether times in each Tablet are in ascending order
2540
   */
2541
  @Override
2542
  public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
2543
      throws IoTDBConnectionException, StatementExecutionException {
2544
    if (enableRedirection) {
1✔
2545
      insertTabletsWithLeaderCache(tablets, sorted, false);
1✔
2546
    } else {
2547
      TSInsertTabletsReq request =
1✔
2548
          genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
1✔
2549
      try {
2550
        defaultSessionConnection.insertTablets(request);
×
2551
      } catch (RedirectException ignored) {
1✔
2552
        logger.warn("session insertTablets fail:{}", ignored.getMessage());
1✔
2553
      }
×
2554
    }
2555
  }
1✔
2556

2557
  /**
2558
   * insert aligned data of several deivces. Given a deivce, for each timestamp, the number of
2559
   * measurements is the same.
2560
   *
2561
   * <p>Times in each Tablet may not be in ascending order
2562
   *
2563
   * @param tablets data batch in multiple device
2564
   */
2565
  @Override
2566
  public void insertAlignedTablets(Map<String, Tablet> tablets)
2567
      throws IoTDBConnectionException, StatementExecutionException {
2568
    insertAlignedTablets(tablets, false);
×
2569
  }
×
2570

2571
  /**
2572
   * insert aligned data of several devices. Given a device, for each timestamp, the number of
2573
   * measurements is the same.
2574
   *
2575
   * @param tablets data batch in multiple device
2576
   * @param sorted deprecated, whether times in each Tablet are in ascending order
2577
   */
2578
  @Override
2579
  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
2580
      throws IoTDBConnectionException, StatementExecutionException {
2581
    if (enableRedirection) {
×
2582
      insertTabletsWithLeaderCache(tablets, sorted, true);
×
2583
    } else {
2584
      TSInsertTabletsReq request =
×
2585
          genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, true);
×
2586
      try {
2587
        defaultSessionConnection.insertTablets(request);
×
2588
      } catch (RedirectException ignored) {
×
2589
        logger.warn("session insertTablets fail:{}", ignored.getMessage());
×
2590
      }
×
2591
    }
2592
  }
×
2593

2594
  private void insertTabletsWithLeaderCache(
2595
      Map<String, Tablet> tablets, boolean sorted, boolean isAligned)
2596
      throws IoTDBConnectionException, StatementExecutionException {
2597
    Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
1✔
2598
    for (Entry<String, Tablet> entry : tablets.entrySet()) {
1✔
2599
      final SessionConnection connection = getSessionConnection(entry.getKey());
1✔
2600
      TSInsertTabletsReq request =
1✔
2601
          tabletGroup.computeIfAbsent(connection, k -> new TSInsertTabletsReq());
1✔
2602
      updateTSInsertTabletsReq(request, entry.getValue(), sorted, isAligned);
1✔
2603
    }
1✔
2604

2605
    insertByGroup(tabletGroup, SessionConnection::insertTablets);
1✔
2606
  }
1✔
2607

2608
  private TSInsertTabletsReq genTSInsertTabletsReq(
2609
      List<Tablet> tablets, boolean sorted, boolean isAligned) throws BatchExecutionException {
2610
    TSInsertTabletsReq request = new TSInsertTabletsReq();
1✔
2611
    if (tablets.isEmpty()) {
1✔
2612
      throw new BatchExecutionException("No tablet is inserting!");
×
2613
    }
2614
    for (Tablet tablet : tablets) {
1✔
2615
      updateTSInsertTabletsReq(request, tablet, sorted, isAligned);
1✔
2616
    }
1✔
2617
    return request;
1✔
2618
  }
2619

2620
  private void updateTSInsertTabletsReq(
2621
      TSInsertTabletsReq request, Tablet tablet, boolean sorted, boolean isAligned) {
2622
    if (!checkSorted(tablet)) {
1✔
2623
      sortTablet(tablet);
×
2624
    }
2625
    request.addToPrefixPaths(tablet.deviceId);
1✔
2626
    List<String> measurements = new ArrayList<>();
1✔
2627
    List<Integer> dataTypes = new ArrayList<>();
1✔
2628
    request.setIsAligned(isAligned);
1✔
2629
    for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
1✔
2630
      measurements.add(measurementSchema.getMeasurementId());
1✔
2631
      dataTypes.add(measurementSchema.getType().ordinal());
1✔
2632
    }
1✔
2633
    request.addToMeasurementsList(measurements);
1✔
2634
    request.addToTypesList(dataTypes);
1✔
2635
    request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
1✔
2636
    request.addToValuesList(SessionUtils.getValueBuffer(tablet));
1✔
2637
    request.addToSizeList(tablet.rowSize);
1✔
2638
  }
1✔
2639

2640
  /**
2641
   * This method NOT insert data into database and the server just return after accept the request,
2642
   * this method should be used to test other time cost in client
2643
   */
2644
  @Override
2645
  public void testInsertTablet(Tablet tablet)
2646
      throws IoTDBConnectionException, StatementExecutionException {
2647
    testInsertTablet(tablet, false);
×
2648
  }
×
2649

2650
  /**
2651
   * This method NOT insert data into database and the server just return after accept the request,
2652
   * this method should be used to test other time cost in client
2653
   */
2654
  @Override
2655
  public void testInsertTablet(Tablet tablet, boolean sorted)
2656
      throws IoTDBConnectionException, StatementExecutionException {
2657
    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
×
2658
    defaultSessionConnection.testInsertTablet(request);
×
2659
  }
×
2660

2661
  /**
2662
   * This method NOT insert data into database and the server just return after accept the request,
2663
   * this method should be used to test other time cost in client
2664
   */
2665
  @Override
2666
  public void testInsertTablets(Map<String, Tablet> tablets)
2667
      throws IoTDBConnectionException, StatementExecutionException {
2668
    testInsertTablets(tablets, false);
×
2669
  }
×
2670

2671
  /**
2672
   * This method NOT insert data into database and the server just return after accept the request,
2673
   * this method should be used to test other time cost in client
2674
   */
2675
  @Override
2676
  public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
2677
      throws IoTDBConnectionException, StatementExecutionException {
2678
    TSInsertTabletsReq request =
×
2679
        genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
×
2680
    defaultSessionConnection.testInsertTablets(request);
×
2681
  }
×
2682

2683
  /**
2684
   * This method NOT insert data into database and the server just return after accept the request,
2685
   * this method should be used to test other time cost in client
2686
   */
2687
  @Override
2688
  public void testInsertRecords(
2689
      List<String> deviceIds,
2690
      List<Long> times,
2691
      List<List<String>> measurementsList,
2692
      List<List<String>> valuesList)
2693
      throws IoTDBConnectionException, StatementExecutionException {
2694
    TSInsertStringRecordsReq request;
2695
    try {
2696
      request =
×
2697
          filterAndGenTSInsertStringRecordsReq(
×
2698
              deviceIds, times, measurementsList, valuesList, false);
2699
    } catch (NoValidValueException e) {
×
2700
      logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
2701
      return;
×
2702
    }
×
2703

2704
    defaultSessionConnection.testInsertRecords(request);
×
2705
  }
×
2706

2707
  /**
2708
   * This method NOT insert data into database and the server just return after accept the request,
2709
   * this method should be used to test other time cost in client
2710
   */
2711
  @Override
2712
  public void testInsertRecords(
2713
      List<String> deviceIds,
2714
      List<Long> times,
2715
      List<List<String>> measurementsList,
2716
      List<List<TSDataType>> typesList,
2717
      List<List<Object>> valuesList)
2718
      throws IoTDBConnectionException, StatementExecutionException {
2719
    TSInsertRecordsReq request;
2720
    try {
2721
      request =
×
2722
          filterAndGenTSInsertRecordsReq(
×
2723
              deviceIds, times, measurementsList, typesList, valuesList, false);
2724
    } catch (NoValidValueException e) {
×
2725
      logger.warn(ALL_VALUES_ARE_NULL_MULTI_DEVICES, deviceIds, times, measurementsList);
×
2726
      return;
×
2727
    }
×
2728

2729
    defaultSessionConnection.testInsertRecords(request);
×
2730
  }
×
2731

2732
  /**
2733
   * This method NOT insert data into database and the server just return after accept the request,
2734
   * this method should be used to test other time cost in client
2735
   */
2736
  @Override
2737
  public void testInsertRecord(
2738
      String deviceId, long time, List<String> measurements, List<String> values)
2739
      throws IoTDBConnectionException, StatementExecutionException {
2740
    TSInsertStringRecordReq request;
2741
    try {
2742
      request = filterAndGenTSInsertStringRecordReq(deviceId, time, measurements, values, false);
×
2743
    } catch (NoValidValueException e) {
×
2744
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
2745
      return;
×
2746
    }
×
2747
    defaultSessionConnection.testInsertRecord(request);
×
2748
  }
×
2749

2750
  /**
2751
   * This method NOT insert data into database and the server just return after accept the request,
2752
   * this method should be used to test other time cost in client
2753
   */
2754
  @Override
2755
  public void testInsertRecord(
2756
      String deviceId,
2757
      long time,
2758
      List<String> measurements,
2759
      List<TSDataType> types,
2760
      List<Object> values)
2761
      throws IoTDBConnectionException, StatementExecutionException {
2762
    TSInsertRecordReq request;
2763
    try {
2764
      request = filterAndGenTSInsertRecordReq(deviceId, time, measurements, types, values, false);
×
2765
    } catch (NoValidValueException e) {
×
2766
      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
×
2767
      return;
×
2768
    }
×
2769
    defaultSessionConnection.testInsertRecord(request);
×
2770
  }
×
2771

2772
  /**
2773
   * delete a timeseries, including data and schema
2774
   *
2775
   * @param path timeseries to delete, should be a whole path
2776
   */
2777
  @Override
2778
  public void deleteTimeseries(String path)
2779
      throws IoTDBConnectionException, StatementExecutionException {
2780
    defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
×
2781
  }
×
2782

2783
  /**
2784
   * delete some timeseries, including data and schema
2785
   *
2786
   * @param paths timeseries to delete, should be a whole path
2787
   */
2788
  @Override
2789
  public void deleteTimeseries(List<String> paths)
2790
      throws IoTDBConnectionException, StatementExecutionException {
2791
    defaultSessionConnection.deleteTimeseries(paths);
×
2792
  }
×
2793

2794
  /**
2795
   * delete data <= time in one timeseries
2796
   *
2797
   * @param path data in which time series to delete
2798
   * @param endTime data with time stamp less than or equal to time will be deleted
2799
   */
2800
  @Override
2801
  public void deleteData(String path, long endTime)
2802
      throws IoTDBConnectionException, StatementExecutionException {
2803
    deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime);
×
2804
  }
×
2805

2806
  /**
2807
   * delete data <= time in multiple timeseries
2808
   *
2809
   * @param paths data in which time series to delete
2810
   * @param endTime data with time stamp less than or equal to time will be deleted
2811
   */
2812
  @Override
2813
  public void deleteData(List<String> paths, long endTime)
2814
      throws IoTDBConnectionException, StatementExecutionException {
2815
    deleteData(paths, Long.MIN_VALUE, endTime);
×
2816
  }
×
2817

2818
  /**
2819
   * delete data >= startTime and data <= endTime in multiple timeseries
2820
   *
2821
   * @param paths data in which time series to delete
2822
   * @param startTime delete range start time
2823
   * @param endTime delete range end time
2824
   */
2825
  @Override
2826
  public void deleteData(List<String> paths, long startTime, long endTime)
2827
      throws IoTDBConnectionException, StatementExecutionException {
2828
    TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
×
2829
    defaultSessionConnection.deleteData(request);
×
2830
  }
×
2831

2832
  private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long startTime, long endTime) {
2833
    TSDeleteDataReq request = new TSDeleteDataReq();
×
2834
    request.setPaths(paths);
×
2835
    request.setStartTime(startTime);
×
2836
    request.setEndTime(endTime);
×
2837
    return request;
×
2838
  }
2839

2840
  /**
2841
   * check whether the batch has been sorted
2842
   *
2843
   * @return whether the batch has been sorted
2844
   */
2845
  private boolean checkSorted(Tablet tablet) {
2846
    for (int i = 1; i < tablet.rowSize; i++) {
1✔
2847
      if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
1✔
2848
        return false;
×
2849
      }
2850
    }
2851
    return true;
1✔
2852
  }
2853

2854
  private boolean checkSorted(List<Long> times) {
2855
    for (int i = 1; i < times.size(); i++) {
1✔
2856
      if (times.get(i) < times.get(i - 1)) {
1✔
2857
        return false;
1✔
2858
      }
2859
    }
2860
    return true;
1✔
2861
  }
2862

2863
  @SuppressWarnings({
2864
    "squid:S3776"
2865
  }) // ignore Cognitive Complexity of methods should not be too high
2866
  public void sortTablet(Tablet tablet) {
2867
    /*
2868
     * following part of code sort the batch data by time,
2869
     * so we can insert continuous data in value list to get a better performance
2870
     */
2871
    // sort to get index, and use index to sort value list
2872
    Integer[] index = new Integer[tablet.rowSize];
1✔
2873
    for (int i = 0; i < tablet.rowSize; i++) {
1✔
2874
      index[i] = i;
1✔
2875
    }
2876
    Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
1✔
2877
    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
1✔
2878
    int columnIndex = 0;
1✔
2879
    for (int i = 0; i < tablet.getSchemas().size(); i++) {
1✔
2880
      IMeasurementSchema schema = tablet.getSchemas().get(i);
1✔
2881
      if (schema instanceof MeasurementSchema) {
1✔
2882
        tablet.values[columnIndex] = sortList(tablet.values[columnIndex], schema.getType(), index);
1✔
2883
        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
1✔
2884
          tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
×
2885
        }
2886
        columnIndex++;
1✔
2887
      } else {
2888
        int measurementSize = schema.getSubMeasurementsList().size();
×
2889
        for (int j = 0; j < measurementSize; j++) {
×
2890
          tablet.values[columnIndex] =
×
2891
              sortList(
×
2892
                  tablet.values[columnIndex],
2893
                  schema.getSubMeasurementsTSDataTypeList().get(j),
×
2894
                  index);
2895
          if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
×
2896
            tablet.bitMaps[columnIndex] = sortBitMap(tablet.bitMaps[columnIndex], index);
×
2897
          }
2898
          columnIndex++;
×
2899
        }
2900
      }
2901
    }
2902
  }
1✔
2903

2904
  /**
2905
   * sort value list by index
2906
   *
2907
   * @param valueList value list
2908
   * @param dataType data type
2909
   * @param index index
2910
   * @return sorted list
2911
   */
2912
  private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {
2913
    switch (dataType) {
1✔
2914
      case BOOLEAN:
2915
        boolean[] boolValues = (boolean[]) valueList;
×
2916
        boolean[] sortedValues = new boolean[boolValues.length];
×
2917
        for (int i = 0; i < index.length; i++) {
×
2918
          sortedValues[i] = boolValues[index[i]];
×
2919
        }
2920
        return sortedValues;
×
2921
      case INT32:
2922
        int[] intValues = (int[]) valueList;
×
2923
        int[] sortedIntValues = new int[intValues.length];
×
2924
        for (int i = 0; i < index.length; i++) {
×
2925
          sortedIntValues[i] = intValues[index[i]];
×
2926
        }
2927
        return sortedIntValues;
×
2928
      case INT64:
2929
        long[] longValues = (long[]) valueList;
1✔
2930
        long[] sortedLongValues = new long[longValues.length];
1✔
2931
        for (int i = 0; i < index.length; i++) {
1✔
2932
          sortedLongValues[i] = longValues[index[i]];
1✔
2933
        }
2934
        return sortedLongValues;
1✔
2935
      case FLOAT:
2936
        float[] floatValues = (float[]) valueList;
×
2937
        float[] sortedFloatValues = new float[floatValues.length];
×
2938
        for (int i = 0; i < index.length; i++) {
×
2939
          sortedFloatValues[i] = floatValues[index[i]];
×
2940
        }
2941
        return sortedFloatValues;
×
2942
      case DOUBLE:
2943
        double[] doubleValues = (double[]) valueList;
×
2944
        double[] sortedDoubleValues = new double[doubleValues.length];
×
2945
        for (int i = 0; i < index.length; i++) {
×
2946
          sortedDoubleValues[i] = doubleValues[index[i]];
×
2947
        }
2948
        return sortedDoubleValues;
×
2949
      case TEXT:
2950
        Binary[] binaryValues = (Binary[]) valueList;
×
2951
        Binary[] sortedBinaryValues = new Binary[binaryValues.length];
×
2952
        for (int i = 0; i < index.length; i++) {
×
2953
          sortedBinaryValues[i] = binaryValues[index[i]];
×
2954
        }
2955
        return sortedBinaryValues;
×
2956
      default:
2957
        throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType);
×
2958
    }
2959
  }
2960

2961
  /**
2962
   * sort BitMap by index
2963
   *
2964
   * @param bitMap BitMap to be sorted
2965
   * @param index index
2966
   * @return sorted bitMap
2967
   */
2968
  private BitMap sortBitMap(BitMap bitMap, Integer[] index) {
2969
    BitMap sortedBitMap = new BitMap(bitMap.getSize());
×
2970
    for (int i = 0; i < index.length; i++) {
×
2971
      if (bitMap.isMarked(index[i])) {
×
2972
        sortedBitMap.mark(i);
×
2973
      }
2974
    }
2975
    return sortedBitMap;
×
2976
  }
2977

2978
  @Override
2979
  public void setSchemaTemplate(String templateName, String prefixPath)
2980
      throws IoTDBConnectionException, StatementExecutionException {
2981
    TSSetSchemaTemplateReq request = getTSSetSchemaTemplateReq(templateName, prefixPath);
×
2982
    defaultSessionConnection.setSchemaTemplate(request);
×
2983
  }
×
2984

2985
  /**
2986
   * Construct Template at session and create it at server.
2987
   *
2988
   * <p>The template instance constructed within session is SUGGESTED to be a flat measurement
2989
   * template, which has no internal nodes inside a template.
2990
   *
2991
   * <p>For example, template(s1, s2, s3) is a flat measurement template, while template2(GPS.x,
2992
   * GPS.y, s1) is not.
2993
   *
2994
   * <p>Tree-structured template, which is contrary to flat measurement template, may not be
2995
   * supported in further version of IoTDB
2996
   *
2997
   * @see Template
2998
   */
2999
  @Override
3000
  public void createSchemaTemplate(Template template)
3001
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3002
    TSCreateSchemaTemplateReq req = new TSCreateSchemaTemplateReq();
×
3003
    req.setName(template.getName());
×
3004
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
×
3005
    template.serialize(baos);
×
3006
    req.setSerializedTemplate(baos.toByteArray());
×
3007
    baos.close();
×
3008
    defaultSessionConnection.createSchemaTemplate(req);
×
3009
  }
×
3010

3011
  /**
3012
   * Create a template with flat measurements, not tree structured. Need to specify datatype,
3013
   * encoding and compressor of each measurement, and alignment of these measurements at once.
3014
   *
3015
   * @param measurements flat measurements of the template, cannot contain character dot
3016
   * @param dataTypes datatype of each measurement in the template
3017
   * @param encodings encodings of each measurement in the template
3018
   * @param compressors compression type of each measurement in the template
3019
   * @param isAligned specify whether these flat measurements are aligned
3020
   * @oaram templateName name of template to create
3021
   */
3022
  @Override
3023
  public void createSchemaTemplate(
3024
      String templateName,
3025
      List<String> measurements,
3026
      List<TSDataType> dataTypes,
3027
      List<TSEncoding> encodings,
3028
      List<CompressionType> compressors,
3029
      boolean isAligned)
3030
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3031
    Template temp = new Template(templateName, isAligned);
×
3032
    int len = measurements.size();
×
3033
    if (len != dataTypes.size() || len != encodings.size() || len != compressors.size()) {
×
3034
      throw new StatementExecutionException(
×
3035
          "Different length of measurements, datatypes, encodings "
3036
              + "or compressors when create schema tempalte.");
3037
    }
3038
    for (int idx = 0; idx < measurements.size(); idx++) {
×
3039
      MeasurementNode mNode =
×
3040
          new MeasurementNode(
3041
              measurements.get(idx), dataTypes.get(idx), encodings.get(idx), compressors.get(idx));
×
3042
      temp.addToTemplate(mNode);
×
3043
    }
3044
    createSchemaTemplate(temp);
×
3045
  }
×
3046

3047
  /**
3048
   * Compatible for rel/0.12, this method will create an unaligned flat template as a result. Notice
3049
   * that there is no aligned concept in 0.12, so only the first measurement in each nested list
3050
   * matters.
3051
   *
3052
   * @param name name of the template
3053
   * @param schemaNames it works as a virtual layer inside template in 0.12, and makes no difference
3054
   *     after 0.13
3055
   * @param measurements the first measurement in each nested list will constitute the final flat
3056
   *     template
3057
   * @param dataTypes the data type of each measurement, only the first one in each nested list
3058
   *     matters as above
3059
   * @param encodings the encoding of each measurement, only the first one in each nested list
3060
   *     matters as above
3061
   * @param compressors the compressor of each measurement
3062
   * @throws IoTDBConnectionException
3063
   * @throws StatementExecutionException
3064
   * @deprecated
3065
   */
3066
  @Override
3067
  @Deprecated
3068
  public void createSchemaTemplate(
3069
      String name,
3070
      List<String> schemaNames,
3071
      List<List<String>> measurements,
3072
      List<List<TSDataType>> dataTypes,
3073
      List<List<TSEncoding>> encodings,
3074
      List<CompressionType> compressors)
3075
      throws IoTDBConnectionException, StatementExecutionException {
3076
    List<String> flatMeasurements = new ArrayList<>();
×
3077
    List<TSDataType> flatDataTypes = new ArrayList<>();
×
3078
    List<TSEncoding> flatEncodings = new ArrayList<>();
×
3079
    for (int idx = 0; idx < measurements.size(); idx++) {
×
3080
      flatMeasurements.add(measurements.get(idx).get(0));
×
3081
      flatDataTypes.add(dataTypes.get(idx).get(0));
×
3082
      flatEncodings.add(encodings.get(idx).get(0));
×
3083
    }
3084
    try {
3085
      createSchemaTemplate(
×
3086
          name, flatMeasurements, flatDataTypes, flatEncodings, compressors, false);
3087
    } catch (IOException e) {
×
3088
      throw new StatementExecutionException(e.getMessage());
×
3089
    }
×
3090
  }
×
3091

3092
  /**
3093
   * @param templateName Template to add aligned measurements.
3094
   * @param measurementsPath If measurements get different prefix, or the prefix already exists in
3095
   *     template but not aligned, throw exception.
3096
   * @param dataTypes Data type of these measurements.
3097
   * @param encodings Encoding of these measurements.
3098
   * @param compressors CompressionType of these measurements.
3099
   */
3100
  @Override
3101
  public void addAlignedMeasurementsInTemplate(
3102
      String templateName,
3103
      List<String> measurementsPath,
3104
      List<TSDataType> dataTypes,
3105
      List<TSEncoding> encodings,
3106
      List<CompressionType> compressors)
3107
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3108
    TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
×
3109
    req.setName(templateName);
×
3110
    req.setMeasurements(measurementsPath);
×
3111
    req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
×
3112
    req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
×
3113
    req.setCompressors(
×
3114
        compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
×
3115
    req.setIsAligned(true);
×
3116
    defaultSessionConnection.appendSchemaTemplate(req);
×
3117
  }
×
3118

3119
  /**
3120
   * @param templateName Template to add a single aligned measurement.
3121
   * @param measurementPath If prefix of the path exists in template and not aligned, throw
3122
   *     exception.
3123
   */
3124
  @Override
3125
  public void addAlignedMeasurementInTemplate(
3126
      String templateName,
3127
      String measurementPath,
3128
      TSDataType dataType,
3129
      TSEncoding encoding,
3130
      CompressionType compressor)
3131
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3132
    TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
×
3133
    req.setName(templateName);
×
3134
    req.setMeasurements(Collections.singletonList(measurementPath));
×
3135
    req.setDataTypes(Collections.singletonList(dataType.ordinal()));
×
3136
    req.setEncodings(Collections.singletonList(encoding.ordinal()));
×
3137
    req.setCompressors(Collections.singletonList((int) compressor.serialize()));
×
3138
    req.setIsAligned(true);
×
3139
    defaultSessionConnection.appendSchemaTemplate(req);
×
3140
  }
×
3141

3142
  /**
3143
   * @param templateName Template to add unaligned measurements.
3144
   * @param measurementsPath If prefix of any path exist in template but aligned, throw exception.
3145
   */
3146
  @Override
3147
  public void addUnalignedMeasurementsInTemplate(
3148
      String templateName,
3149
      List<String> measurementsPath,
3150
      List<TSDataType> dataTypes,
3151
      List<TSEncoding> encodings,
3152
      List<CompressionType> compressors)
3153
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3154
    TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
×
3155
    req.setName(templateName);
×
3156
    req.setMeasurements(measurementsPath);
×
3157
    req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList()));
×
3158
    req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList()));
×
3159
    req.setCompressors(
×
3160
        compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList()));
×
3161
    req.setIsAligned(false);
×
3162
    defaultSessionConnection.appendSchemaTemplate(req);
×
3163
  }
×
3164

3165
  /**
3166
   * @param templateName Template to add a single unaligned measurement.
3167
   * @param measurementPath If prefix of path exists in template but aligned, throw exception.
3168
   */
3169
  @Override
3170
  public void addUnalignedMeasurementInTemplate(
3171
      String templateName,
3172
      String measurementPath,
3173
      TSDataType dataType,
3174
      TSEncoding encoding,
3175
      CompressionType compressor)
3176
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3177
    TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq();
×
3178
    req.setName(templateName);
×
3179
    req.setMeasurements(Collections.singletonList(measurementPath));
×
3180
    req.setDataTypes(Collections.singletonList(dataType.ordinal()));
×
3181
    req.setEncodings(Collections.singletonList(encoding.ordinal()));
×
3182
    req.setCompressors(Collections.singletonList((int) compressor.serialize()));
×
3183
    req.setIsAligned(false);
×
3184
    defaultSessionConnection.appendSchemaTemplate(req);
×
3185
  }
×
3186

3187
  /**
3188
   * @param templateName Template to prune.
3189
   * @param path Remove node from template specified by the path, including its children nodes.
3190
   */
3191
  @Override
3192
  public void deleteNodeInTemplate(String templateName, String path)
3193
      throws IOException, IoTDBConnectionException, StatementExecutionException {
3194
    TSPruneSchemaTemplateReq req = new TSPruneSchemaTemplateReq();
×
3195
    req.setName(templateName);
×
3196
    req.setPath(path);
×
3197
    defaultSessionConnection.pruneSchemaTemplate(req);
×
3198
  }
×
3199

3200
  /** @return Amount of measurements in the template */
3201
  @Override
3202
  public int countMeasurementsInTemplate(String name)
3203
      throws StatementExecutionException, IoTDBConnectionException {
3204
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3205
    req.setName(name);
×
3206
    req.setQueryType(TemplateQueryType.COUNT_MEASUREMENTS.ordinal());
×
3207
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3208
    return resp.getCount();
×
3209
  }
3210

3211
  /** @return If the node specified by the path is a measurement. */
3212
  @Override
3213
  public boolean isMeasurementInTemplate(String templateName, String path)
3214
      throws StatementExecutionException, IoTDBConnectionException {
3215
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3216
    req.setName(templateName);
×
3217
    req.setQueryType(TemplateQueryType.IS_MEASUREMENT.ordinal());
×
3218
    req.setMeasurement(path);
×
3219
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3220
    return resp.result;
×
3221
  }
3222

3223
  /** @return if there is a node correspond to the path in the template. */
3224
  @Override
3225
  public boolean isPathExistInTemplate(String templateName, String path)
3226
      throws StatementExecutionException, IoTDBConnectionException {
3227
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3228
    req.setName(templateName);
×
3229
    req.setQueryType(TemplateQueryType.PATH_EXIST.ordinal());
×
3230
    req.setMeasurement(path);
×
3231
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3232
    return resp.result;
×
3233
  }
3234

3235
  /** @return All paths of measurements in the template. */
3236
  @Override
3237
  public List<String> showMeasurementsInTemplate(String templateName)
3238
      throws StatementExecutionException, IoTDBConnectionException {
3239
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3240
    req.setName(templateName);
×
3241
    req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
×
3242
    req.setMeasurement("");
×
3243
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3244
    return resp.getMeasurements();
×
3245
  }
3246

3247
  /** @return All paths of measurements under the pattern in the template. */
3248
  @Override
3249
  public List<String> showMeasurementsInTemplate(String templateName, String pattern)
3250
      throws StatementExecutionException, IoTDBConnectionException {
3251
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3252
    req.setName(templateName);
×
3253
    req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
×
3254
    req.setMeasurement(pattern);
×
3255
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3256
    return resp.getMeasurements();
×
3257
  }
3258

3259
  /** @return All template names. */
3260
  @Override
3261
  public List<String> showAllTemplates()
3262
      throws StatementExecutionException, IoTDBConnectionException {
3263
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3264
    req.setName("");
×
3265
    req.setQueryType(TemplateQueryType.SHOW_TEMPLATES.ordinal());
×
3266
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3267
    return resp.getMeasurements();
×
3268
  }
3269

3270
  /** @return All paths have been set to designated template. */
3271
  @Override
3272
  public List<String> showPathsTemplateSetOn(String templateName)
3273
      throws StatementExecutionException, IoTDBConnectionException {
3274
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3275
    req.setName(templateName);
×
3276
    req.setQueryType(TemplateQueryType.SHOW_SET_TEMPLATES.ordinal());
×
3277
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3278
    return resp.getMeasurements();
×
3279
  }
3280

3281
  /** @return All paths are using designated template. */
3282
  @Override
3283
  public List<String> showPathsTemplateUsingOn(String templateName)
3284
      throws StatementExecutionException, IoTDBConnectionException {
3285
    TSQueryTemplateReq req = new TSQueryTemplateReq();
×
3286
    req.setName(templateName);
×
3287
    req.setQueryType(TemplateQueryType.SHOW_USING_TEMPLATES.ordinal());
×
3288
    TSQueryTemplateResp resp = defaultSessionConnection.querySchemaTemplate(req);
×
3289
    return resp.getMeasurements();
×
3290
  }
3291

3292
  @Override
3293
  public void unsetSchemaTemplate(String prefixPath, String templateName)
3294
      throws IoTDBConnectionException, StatementExecutionException {
3295
    TSUnsetSchemaTemplateReq request = getTSUnsetSchemaTemplateReq(prefixPath, templateName);
×
3296
    defaultSessionConnection.unsetSchemaTemplate(request);
×
3297
  }
×
3298

3299
  @Override
3300
  public void dropSchemaTemplate(String templateName)
3301
      throws IoTDBConnectionException, StatementExecutionException {
3302
    TSDropSchemaTemplateReq request = getTSDropSchemaTemplateReq(templateName);
×
3303
    defaultSessionConnection.dropSchemaTemplate(request);
×
3304
  }
×
3305

3306
  private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String templateName, String prefixPath) {
3307
    TSSetSchemaTemplateReq request = new TSSetSchemaTemplateReq();
×
3308
    request.setTemplateName(templateName);
×
3309
    request.setPrefixPath(prefixPath);
×
3310
    return request;
×
3311
  }
3312

3313
  private TSUnsetSchemaTemplateReq getTSUnsetSchemaTemplateReq(
3314
      String prefixPath, String templateName) {
3315
    TSUnsetSchemaTemplateReq request = new TSUnsetSchemaTemplateReq();
×
3316
    request.setPrefixPath(prefixPath);
×
3317
    request.setTemplateName(templateName);
×
3318
    return request;
×
3319
  }
3320

3321
  private TSDropSchemaTemplateReq getTSDropSchemaTemplateReq(String templateName) {
3322
    TSDropSchemaTemplateReq request = new TSDropSchemaTemplateReq();
×
3323
    request.setTemplateName(templateName);
×
3324
    return request;
×
3325
  }
3326

3327
  /**
3328
   * Create timeseries represented by schema template under given device paths.
3329
   *
3330
   * @param devicePathList the target device paths used for timeseries creation
3331
   */
3332
  @Override
3333
  public void createTimeseriesUsingSchemaTemplate(List<String> devicePathList)
3334
      throws IoTDBConnectionException, StatementExecutionException {
3335
    if (devicePathList == null || devicePathList.contains(null)) {
×
3336
      throw new StatementExecutionException(
×
3337
          "Given device path list should not be  or contains null.");
3338
    }
3339
    TCreateTimeseriesUsingSchemaTemplateReq request = new TCreateTimeseriesUsingSchemaTemplateReq();
×
3340
    request.setDevicePathList(devicePathList);
×
3341
    defaultSessionConnection.createTimeseriesUsingSchemaTemplate(request);
×
3342
  }
×
3343

3344
  /**
3345
   * @param recordsGroup connection to record map
3346
   * @param insertConsumer insert function
3347
   * @param <T>
3348
   *     <ul>
3349
   *       <li>{@link TSInsertRecordsReq}
3350
   *       <li>{@link TSInsertStringRecordsReq}
3351
   *       <li>{@link TSInsertTabletsReq}
3352
   *     </ul>
3353
   *
3354
   * @throws IoTDBConnectionException
3355
   * @throws StatementExecutionException
3356
   */
3357
  @SuppressWarnings({
3358
    "squid:S3776"
3359
  }) // ignore Cognitive Complexity of methods should not be too high
3360
  private <T> void insertByGroup(
3361
      Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
3362
      throws IoTDBConnectionException, StatementExecutionException {
3363
    List<CompletableFuture<Void>> completableFutures =
1✔
3364
        recordsGroup.entrySet().stream()
1✔
3365
            .map(
1✔
3366
                entry -> {
3367
                  SessionConnection connection = entry.getKey();
1✔
3368
                  T recordsReq = entry.getValue();
1✔
3369
                  return CompletableFuture.runAsync(
1✔
3370
                      () -> {
3371
                        try {
3372
                          insertConsumer.insert(connection, recordsReq);
×
3373
                        } catch (RedirectException e) {
1✔
3374
                          e.getDeviceEndPointMap().forEach(this::handleRedirection);
1✔
3375
                        } catch (StatementExecutionException e) {
×
3376
                          throw new CompletionException(e);
×
3377
                        } catch (IoTDBConnectionException e) {
1✔
3378
                          // remove the broken session
3379
                          removeBrokenSessionConnection(connection);
1✔
3380
                          try {
3381
                            insertConsumer.insert(defaultSessionConnection, recordsReq);
×
3382
                          } catch (IoTDBConnectionException | StatementExecutionException ex) {
×
3383
                            throw new CompletionException(ex);
×
3384
                          } catch (RedirectException ignored) {
1✔
3385
                            logger.info("insert by group has been redirect");
1✔
3386
                          }
×
3387
                        }
1✔
3388
                      },
1✔
3389
                      OPERATION_EXECUTOR);
3390
                })
3391
            .collect(Collectors.toList());
1✔
3392

3393
    StringBuilder errMsgBuilder = new StringBuilder();
1✔
3394
    for (CompletableFuture<Void> completableFuture : completableFutures) {
1✔
3395
      try {
3396
        completableFuture.join();
1✔
3397
      } catch (CompletionException completionException) {
×
3398
        Throwable cause = completionException.getCause();
×
3399
        logger.error("Meet error when async insert!", cause);
×
3400
        if (cause instanceof IoTDBConnectionException) {
×
3401
          throw (IoTDBConnectionException) cause;
×
3402
        } else {
3403
          errMsgBuilder.append(cause.getMessage());
×
3404
        }
3405
      }
1✔
3406
    }
1✔
3407
    if (errMsgBuilder.length() > 0) {
1✔
3408
      throw new StatementExecutionException(errMsgBuilder.toString());
×
3409
    }
3410
  }
1✔
3411

3412
  @Override
3413
  public boolean isEnableQueryRedirection() {
3414
    return enableQueryRedirection;
×
3415
  }
3416

3417
  @Override
3418
  public void setEnableQueryRedirection(boolean enableQueryRedirection) {
3419
    this.enableQueryRedirection = enableQueryRedirection;
×
3420
  }
×
3421

3422
  @Override
3423
  public boolean isEnableRedirection() {
3424
    return enableRedirection;
×
3425
  }
3426

3427
  @Override
3428
  public void setEnableRedirection(boolean enableRedirection) {
3429
    this.enableRedirection = enableRedirection;
×
3430
  }
×
3431

3432
  @Override
3433
  public TSBackupConfigurationResp getBackupConfiguration()
3434
      throws IoTDBConnectionException, StatementExecutionException {
3435
    return defaultSessionConnection.getBackupConfiguration();
×
3436
  }
3437

3438
  @Override
3439
  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
3440
    return defaultSessionConnection.fetchAllConnections();
×
3441
  }
3442

3443
  public static class Builder {
×
3444
    private String host = SessionConfig.DEFAULT_HOST;
×
3445
    private int rpcPort = SessionConfig.DEFAULT_PORT;
×
3446
    private String username = SessionConfig.DEFAULT_USER;
×
3447
    private String pw = SessionConfig.DEFAULT_PASSWORD;
×
3448
    private int fetchSize = SessionConfig.DEFAULT_FETCH_SIZE;
×
3449
    private ZoneId zoneId = null;
×
3450
    private int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
×
3451
    private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
×
3452
    private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
×
3453
    private Version version = SessionConfig.DEFAULT_VERSION;
×
3454
    private long timeOut = SessionConfig.DEFAULT_QUERY_TIME_OUT;
×
3455

3456
    private List<String> nodeUrls = null;
×
3457

3458
    public Builder host(String host) {
3459
      this.host = host;
×
3460
      return this;
×
3461
    }
3462

3463
    public Builder port(int port) {
3464
      this.rpcPort = port;
×
3465
      return this;
×
3466
    }
3467

3468
    public Builder username(String username) {
3469
      this.username = username;
×
3470
      return this;
×
3471
    }
3472

3473
    public Builder password(String password) {
3474
      this.pw = password;
×
3475
      return this;
×
3476
    }
3477

3478
    public Builder fetchSize(int fetchSize) {
3479
      this.fetchSize = fetchSize;
×
3480
      return this;
×
3481
    }
3482

3483
    public Builder zoneId(ZoneId zoneId) {
3484
      this.zoneId = zoneId;
×
3485
      return this;
×
3486
    }
3487

3488
    public Builder thriftDefaultBufferSize(int thriftDefaultBufferSize) {
3489
      this.thriftDefaultBufferSize = thriftDefaultBufferSize;
×
3490
      return this;
×
3491
    }
3492

3493
    public Builder thriftMaxFrameSize(int thriftMaxFrameSize) {
3494
      this.thriftMaxFrameSize = thriftMaxFrameSize;
×
3495
      return this;
×
3496
    }
3497

3498
    public Builder enableRedirection(boolean enableRedirection) {
3499
      this.enableRedirection = enableRedirection;
×
3500
      return this;
×
3501
    }
3502

3503
    public Builder nodeUrls(List<String> nodeUrls) {
3504
      this.nodeUrls = nodeUrls;
×
3505
      return this;
×
3506
    }
3507

3508
    public Builder version(Version version) {
3509
      this.version = version;
×
3510
      return this;
×
3511
    }
3512

3513
    public Builder timeOut(long timeOut) {
3514
      this.timeOut = timeOut;
×
3515
      return this;
×
3516
    }
3517

3518
    public Session build() {
3519
      if (nodeUrls != null
×
3520
          && (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort != SessionConfig.DEFAULT_PORT)) {
×
3521
        throw new IllegalArgumentException(
×
3522
            "You should specify either nodeUrls or (host + rpcPort), but not both");
3523
      }
3524

3525
      if (nodeUrls != null) {
×
3526
        Session newSession =
×
3527
            new Session(
3528
                nodeUrls,
3529
                username,
3530
                pw,
3531
                fetchSize,
3532
                zoneId,
3533
                thriftDefaultBufferSize,
3534
                thriftMaxFrameSize,
3535
                enableRedirection,
3536
                version);
3537
        newSession.setEnableQueryRedirection(true);
×
3538
        return newSession;
×
3539
      }
3540

3541
      return new Session(
×
3542
          host,
3543
          rpcPort,
3544
          username,
3545
          pw,
3546
          fetchSize,
3547
          zoneId,
3548
          thriftDefaultBufferSize,
3549
          thriftMaxFrameSize,
3550
          enableRedirection,
3551
          version);
3552
    }
3553
  }
3554
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc