• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9634

pending completion
#9634

push

travis_ci

web-flow
Add session interface of faster last query in one device

190 of 190 new or added lines in 5 files covered. (100.0%)

79025 of 165424 relevant lines covered (47.77%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.61
/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.session.pool;
21

22
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.isession.ISession;
25
import org.apache.iotdb.isession.SessionConfig;
26
import org.apache.iotdb.isession.SessionDataSet;
27
import org.apache.iotdb.isession.pool.ISessionPool;
28
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
29
import org.apache.iotdb.isession.template.Template;
30
import org.apache.iotdb.isession.util.Version;
31
import org.apache.iotdb.rpc.IoTDBConnectionException;
32
import org.apache.iotdb.rpc.StatementExecutionException;
33
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
34
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
35
import org.apache.iotdb.session.Session;
36
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
37
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
38
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
39
import org.apache.iotdb.tsfile.write.record.Tablet;
40

41
import org.apache.thrift.TException;
42
import org.slf4j.Logger;
43
import org.slf4j.LoggerFactory;
44

45
import java.io.IOException;
46
import java.time.ZoneId;
47
import java.util.List;
48
import java.util.Map;
49
import java.util.concurrent.ConcurrentHashMap;
50
import java.util.concurrent.ConcurrentLinkedDeque;
51
import java.util.concurrent.ConcurrentMap;
52

53
/**
54
 * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider
55
 * how to reuse a session connection. Even if the session is disconnected, the session pool can
56
 * recognize it and remove the broken session connection and create a new one.
57
 *
58
 * <p>If there is no available connections and the pool reaches its max size, the all methods will
59
 * hang until there is a available connection.
60
 *
61
 * <p>If a user has waited for a session for more than 60 seconds, a warn log will be printed.
62
 *
63
 * <p>The only thing you have to remember is that:
64
 *
65
 * <p>For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is
66
 * ok. Otherwise, i.e., you want to stop the query before you get all data
67
 * (SessionDataSetWrapper.hasNext() == true), then you have to call
68
 * closeResultSet(SessionDataSetWrapper wrapper) manually. Otherwise the connection is occupied by
69
 * the query.
70
 *
71
 * <p>Another case that you have to manually call closeResultSet() is that when there is exception
72
 * when you call SessionDataSetWrapper.hasNext() or next()
73
 */
74

75
// ignore Generic exceptions & Throwable should never be throw, ignore Either log or rethrow this
76
// exception.
77
@SuppressWarnings({"squid:S112", "java:S1181", "java:S2139"})
78
public class SessionPool implements ISessionPool {
79

80
  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
1✔
81
  public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
82
  public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
83

84
  private static final int RETRY = 3;
85
  private static final int FINAL_RETRY = RETRY - 1;
86

87
  private final ConcurrentLinkedDeque<ISession> queue = new ConcurrentLinkedDeque<>();
1✔
88
  // for session whose resultSet is not released.
89
  private final ConcurrentMap<ISession, ISession> occupied = new ConcurrentHashMap<>();
1✔
90
  private int size = 0;
1✔
91
  private int maxSize = 0;
1✔
92
  private final long waitToGetSessionTimeoutInMs;
93

94
  // parameters for Session constructor
95
  private final String host;
96
  private final int port;
97
  private final String user;
98
  private final String password;
99
  private int fetchSize;
100
  private ZoneId zoneId;
101
  private boolean enableRedirection;
102
  private boolean enableQueryRedirection = false;
1✔
103

104
  private Map<String, TEndPoint> deviceIdToEndpoint;
105

106
  private int thriftDefaultBufferSize;
107
  private int thriftMaxFrameSize;
108

109
  /**
110
   * Timeout of query can be set by users. A negative number means using the default configuration
111
   * of server. And value 0 will disable the function of query timeout.
112
   */
113
  private long queryTimeoutInMs = -1;
1✔
114

115
  // The version number of the client which used for compatibility in the server
116
  private Version version;
117

118
  // parameters for Session#open()
119
  private final int connectionTimeoutInMs;
120
  private final boolean enableCompression;
121

122
  // whether the queue is closed.
123
  private boolean closed;
124

125
  // Redirect-able SessionPool
126
  private final List<String> nodeUrls;
127

128
  // formatted nodeUrls for logging e.g. "host:port" or "[host:port, host:port, host:port]"
129
  private final String formattedNodeUrls;
130

131
  private static final String INSERT_RECORD_FAIL = "insertRecord failed";
132

133
  private static final String INSERT_RECORD_ERROR_MSG = "unexpected error in insertRecord";
134

135
  private static final String INSERT_RECORDS_ERROR_MSG = "unexpected error in insertRecords";
136

137
  private static final String EXECUTE_LASTDATAQUERY_FAIL = "executeLastDataQuery failed";
138

139
  private static final String EXECUTE_LASTDATAQUERY_ERROR =
140
      "unexpected error in executeLastDataQuery";
141

142
  private static final String EXECUTE_AGGREGATION_QUERY_FAIL = "executeAggregationQuery failed";
143

144
  private static final String INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG =
145
      "unexpected error in insertRecordsOfOneDevice";
146

147
  private static final String DELETE_DATA_ERROR_MSG = "unexpected error in deleteData";
148

149
  private static final String CREATE_SCHEMA_TEMPLATE_ERROR_MSG =
150
      "unexpected error in createSchemaTemplate";
151

152
  private static final String EXECUTE_AGGREGATION_QUERY_ERROR_MSG =
153
      "unexpected error in executeAggregationQuery";
154

155
  private static final String DELETE_DATA_FAIL = "deleteData failed";
156

157
  private static final String INSERT_RECORDS_OF_ONE_DEVICE_FAIL = "insertRecordsOfOneDevice failed";
158

159
  private static final String CREATE_SCHEMA_TEMPLATE_FAIL = "createSchemaTemplate failed";
160

161
  public SessionPool(String host, int port, String user, String password, int maxSize) {
162
    this(
×
163
        host,
164
        port,
165
        user,
166
        password,
167
        maxSize,
168
        SessionConfig.DEFAULT_FETCH_SIZE,
169
        60_000,
170
        false,
171
        null,
172
        SessionConfig.DEFAULT_REDIRECTION_MODE,
173
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
174
        SessionConfig.DEFAULT_VERSION,
175
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
176
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
177
  }
×
178

179
  public SessionPool(List<String> nodeUrls, String user, String password, int maxSize) {
180
    this(
×
181
        nodeUrls,
182
        user,
183
        password,
184
        maxSize,
185
        SessionConfig.DEFAULT_FETCH_SIZE,
186
        60_000,
187
        false,
188
        null,
189
        SessionConfig.DEFAULT_REDIRECTION_MODE,
190
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
191
        SessionConfig.DEFAULT_VERSION,
192
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
193
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
194
  }
×
195

196
  public SessionPool(
197
      String host, int port, String user, String password, int maxSize, boolean enableCompression) {
198
    this(
×
199
        host,
200
        port,
201
        user,
202
        password,
203
        maxSize,
204
        SessionConfig.DEFAULT_FETCH_SIZE,
205
        60_000,
206
        enableCompression,
207
        null,
208
        SessionConfig.DEFAULT_REDIRECTION_MODE,
209
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
210
        SessionConfig.DEFAULT_VERSION,
211
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
212
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
213
  }
×
214

215
  public SessionPool(
216
      List<String> nodeUrls, String user, String password, int maxSize, boolean enableCompression) {
217
    this(
×
218
        nodeUrls,
219
        user,
220
        password,
221
        maxSize,
222
        SessionConfig.DEFAULT_FETCH_SIZE,
223
        60_000,
224
        enableCompression,
225
        null,
226
        SessionConfig.DEFAULT_REDIRECTION_MODE,
227
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
228
        SessionConfig.DEFAULT_VERSION,
229
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
230
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
231
  }
×
232

233
  public SessionPool(
234
      String host,
235
      int port,
236
      String user,
237
      String password,
238
      int maxSize,
239
      boolean enableCompression,
240
      boolean enableRedirection) {
241
    this(
×
242
        host,
243
        port,
244
        user,
245
        password,
246
        maxSize,
247
        SessionConfig.DEFAULT_FETCH_SIZE,
248
        60_000,
249
        enableCompression,
250
        null,
251
        enableRedirection,
252
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
253
        SessionConfig.DEFAULT_VERSION,
254
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
255
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
256
  }
×
257

258
  public SessionPool(
259
      List<String> nodeUrls,
260
      String user,
261
      String password,
262
      int maxSize,
263
      boolean enableCompression,
264
      boolean enableRedirection) {
265
    this(
×
266
        nodeUrls,
267
        user,
268
        password,
269
        maxSize,
270
        SessionConfig.DEFAULT_FETCH_SIZE,
271
        60_000,
272
        enableCompression,
273
        null,
274
        enableRedirection,
275
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
276
        SessionConfig.DEFAULT_VERSION,
277
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
278
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
279
  }
×
280

281
  public SessionPool(
282
      String host, int port, String user, String password, int maxSize, ZoneId zoneId) {
283
    this(
×
284
        host,
285
        port,
286
        user,
287
        password,
288
        maxSize,
289
        SessionConfig.DEFAULT_FETCH_SIZE,
290
        60_000,
291
        false,
292
        zoneId,
293
        SessionConfig.DEFAULT_REDIRECTION_MODE,
294
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
295
        SessionConfig.DEFAULT_VERSION,
296
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
297
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
298
  }
×
299

300
  public SessionPool(
301
      List<String> nodeUrls, String user, String password, int maxSize, ZoneId zoneId) {
302
    this(
×
303
        nodeUrls,
304
        user,
305
        password,
306
        maxSize,
307
        SessionConfig.DEFAULT_FETCH_SIZE,
308
        60_000,
309
        false,
310
        zoneId,
311
        SessionConfig.DEFAULT_REDIRECTION_MODE,
312
        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS,
313
        SessionConfig.DEFAULT_VERSION,
314
        SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
315
        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
316
  }
×
317

318
  @SuppressWarnings("squid:S107")
319
  public SessionPool(
320
      String host,
321
      int port,
322
      String user,
323
      String password,
324
      int maxSize,
325
      int fetchSize,
326
      long waitToGetSessionTimeoutInMs,
327
      boolean enableCompression,
328
      ZoneId zoneId,
329
      boolean enableRedirection,
330
      int connectionTimeoutInMs,
331
      Version version,
332
      int thriftDefaultBufferSize,
333
      int thriftMaxFrameSize) {
1✔
334
    this.maxSize = maxSize;
1✔
335
    this.host = host;
1✔
336
    this.port = port;
1✔
337
    this.nodeUrls = null;
1✔
338
    this.user = user;
1✔
339
    this.password = password;
1✔
340
    this.fetchSize = fetchSize;
1✔
341
    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
1✔
342
    this.enableCompression = enableCompression;
1✔
343
    this.zoneId = zoneId;
1✔
344
    this.enableRedirection = enableRedirection;
1✔
345
    if (this.enableRedirection) {
1✔
346
      deviceIdToEndpoint = new ConcurrentHashMap<>();
1✔
347
    }
348
    this.connectionTimeoutInMs = connectionTimeoutInMs;
1✔
349
    this.version = version;
1✔
350
    this.thriftDefaultBufferSize = thriftDefaultBufferSize;
1✔
351
    this.thriftMaxFrameSize = thriftMaxFrameSize;
1✔
352
    this.formattedNodeUrls = String.format("%s:%s", host, port);
1✔
353
  }
1✔
354

355
  @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters
356
  public SessionPool(
357
      List<String> nodeUrls,
358
      String user,
359
      String password,
360
      int maxSize,
361
      int fetchSize,
362
      long waitToGetSessionTimeoutInMs,
363
      boolean enableCompression,
364
      ZoneId zoneId,
365
      boolean enableRedirection,
366
      int connectionTimeoutInMs,
367
      Version version,
368
      int thriftDefaultBufferSize,
369
      int thriftMaxFrameSize) {
×
370
    this.maxSize = maxSize;
×
371
    this.host = null;
×
372
    this.port = -1;
×
373
    this.nodeUrls = nodeUrls;
×
374
    this.user = user;
×
375
    this.password = password;
×
376
    this.fetchSize = fetchSize;
×
377
    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
×
378
    this.enableCompression = enableCompression;
×
379
    this.zoneId = zoneId;
×
380
    this.enableRedirection = enableRedirection;
×
381
    if (this.enableRedirection) {
×
382
      deviceIdToEndpoint = new ConcurrentHashMap<>();
×
383
    }
384
    this.connectionTimeoutInMs = connectionTimeoutInMs;
×
385
    this.version = version;
×
386
    this.thriftDefaultBufferSize = thriftDefaultBufferSize;
×
387
    this.thriftMaxFrameSize = thriftMaxFrameSize;
×
388
    this.formattedNodeUrls = nodeUrls.toString();
×
389
  }
×
390

391
  private Session constructNewSession() {
392
    Session session;
393
    if (nodeUrls == null) {
×
394
      // Construct custom Session
395
      session =
×
396
          new Session.Builder()
397
              .host(host)
×
398
              .port(port)
×
399
              .username(user)
×
400
              .password(password)
×
401
              .fetchSize(fetchSize)
×
402
              .zoneId(zoneId)
×
403
              .thriftDefaultBufferSize(thriftDefaultBufferSize)
×
404
              .thriftMaxFrameSize(thriftMaxFrameSize)
×
405
              .enableRedirection(enableRedirection)
×
406
              .version(version)
×
407
              .build();
×
408
    } else {
409
      // Construct redirect-able Session
410
      session =
×
411
          new Session.Builder()
412
              .nodeUrls(nodeUrls)
×
413
              .username(user)
×
414
              .password(password)
×
415
              .fetchSize(fetchSize)
×
416
              .zoneId(zoneId)
×
417
              .thriftDefaultBufferSize(thriftDefaultBufferSize)
×
418
              .thriftMaxFrameSize(thriftMaxFrameSize)
×
419
              .enableRedirection(enableRedirection)
×
420
              .version(version)
×
421
              .build();
×
422
    }
423
    session.setEnableQueryRedirection(enableQueryRedirection);
×
424
    return session;
×
425
  }
426

427
  // if this method throws an exception, either the server is broken, or the ip/port/user/password
428
  // is incorrect.
429
  @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
430
  private ISession getSession() throws IoTDBConnectionException {
431
    ISession session = queue.poll();
×
432
    if (closed) {
×
433
      throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
×
434
    }
435
    if (session != null) {
×
436
      return session;
×
437
    }
438

439
    boolean shouldCreate = false;
×
440

441
    long start = System.currentTimeMillis();
×
442
    while (session == null) {
×
443
      synchronized (this) {
×
444
        if (size < maxSize) {
×
445
          // we can create more session
446
          size++;
×
447
          shouldCreate = true;
×
448
          // but we do it after skip synchronized block because connection a session is time
449
          // consuming.
450
          break;
×
451
        }
452

453
        // we have to wait for someone returns a session.
454
        try {
455
          this.wait(1000);
×
456
          long timeOut = Math.min(waitToGetSessionTimeoutInMs, 60_000);
×
457
          if (System.currentTimeMillis() - start > timeOut) {
×
458
            logger.warn(
×
459
                "the SessionPool has wait for {} seconds to get a new connection: {} with {}, {}",
460
                (System.currentTimeMillis() - start) / 1000,
×
461
                formattedNodeUrls,
462
                user,
463
                password);
464
            logger.warn(
×
465
                "current occupied size {}, queue size {}, considered size {} ",
466
                occupied.size(),
×
467
                queue.size(),
×
468
                size);
×
469
            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
×
470
              throw new IoTDBConnectionException(
×
471
                  String.format("timeout to get a connection from %s", formattedNodeUrls));
×
472
            }
473
          }
474
        } catch (InterruptedException e) {
×
475
          logger.warn("Interrupted!", e);
×
476
          Thread.currentThread().interrupt();
×
477
          // wake up from this.wait(1000) by this.notify()
478
        }
×
479

480
        session = queue.poll();
×
481

482
        // for putBack or size--
483
        this.notify();
×
484

485
        if (closed) {
×
486
          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
×
487
        }
488
      }
×
489
    }
490

491
    if (shouldCreate) {
×
492
      // create a new one.
493

494
      session = constructNewSession();
×
495

496
      try {
497
        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
×
498
        // avoid someone has called close() the session pool
499
        synchronized (this) {
×
500
          if (closed) {
×
501
            // have to release the connection...
502
            session.close();
×
503
            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
×
504
          }
505
        }
×
506
      } catch (IoTDBConnectionException e) {
×
507
        // if exception, we will throw the exception.
508
        // Meanwhile, we have to set size--
509
        synchronized (this) {
×
510
          size--;
×
511
          // we do not need to notifyAll as any waited thread can continue to work after waked up.
512
          this.notify();
×
513
        }
×
514
        throw e;
×
515
      }
×
516
    }
517

518
    return session;
×
519
  }
520

521
  @Override
522
  public int currentAvailableSize() {
523
    return queue.size();
×
524
  }
525

526
  @Override
527
  public int currentOccupiedSize() {
528
    return occupied.size();
×
529
  }
530

531
  @SuppressWarnings({"squid:S2446"})
532
  private void putBack(ISession session) {
533
    queue.push(session);
×
534
    synchronized (this) {
×
535
      // we do not need to notifyAll as any waited thread can continue to work after waked up.
536
      this.notify();
×
537
      // comment the following codes as putBack is too frequently called.
538
      //      if (logger.isTraceEnabled()) {
539
      //        logger.trace("put a session back and notify others..., queue.size = {}",
540
      // queue.size());
541
      //      }
542
    }
×
543
  }
×
544

545
  private void occupy(ISession session) {
546
    occupied.put(session, session);
×
547
  }
×
548

549
  /** close all connections in the pool */
550
  @Override
551
  public synchronized void close() {
552
    for (ISession session : queue) {
×
553
      try {
554
        session.close();
×
555
      } catch (IoTDBConnectionException e) {
×
556
        // do nothing
557
        logger.warn(CLOSE_THE_SESSION_FAILED, e);
×
558
      }
×
559
    }
×
560
    for (ISession session : occupied.keySet()) {
×
561
      try {
562
        session.close();
×
563
      } catch (IoTDBConnectionException e) {
×
564
        // do nothing
565
        logger.warn(CLOSE_THE_SESSION_FAILED, e);
×
566
      }
×
567
    }
×
568
    logger.info("closing the session pool, cleaning queues...");
×
569
    this.closed = true;
×
570
    queue.clear();
×
571
    occupied.clear();
×
572
  }
×
573

574
  @Override
575
  public void closeResultSet(SessionDataSetWrapper wrapper) {
576
    boolean putback = true;
×
577
    try {
578
      wrapper.getSessionDataSet().closeOperationHandle();
×
579
    } catch (IoTDBConnectionException | StatementExecutionException e) {
×
580
      tryConstructNewSession();
×
581
      putback = false;
×
582
    } finally {
583
      ISession session = occupied.remove(wrapper.getSession());
×
584
      if (putback && session != null) {
×
585
        putBack(wrapper.getSession());
×
586
      }
587
    }
588
  }
×
589

590
  @SuppressWarnings({"squid:S2446"})
591
  private void tryConstructNewSession() {
592
    Session session = constructNewSession();
×
593
    try {
594
      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
×
595
      // avoid someone has called close() the session pool
596
      synchronized (this) {
×
597
        if (closed) {
×
598
          // have to release the connection...
599
          session.close();
×
600
          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
×
601
        }
602
        queue.push(session);
×
603
        this.notify();
×
604
      }
×
605
    } catch (IoTDBConnectionException e) {
×
606
      synchronized (this) {
×
607
        size--;
×
608
        // we do not need to notifyAll as any waited thread can continue to work after waked up.
609
        this.notify();
×
610
      }
×
611
    }
×
612
  }
×
613

614
  private void closeSession(ISession session) {
615
    if (session != null) {
×
616
      try {
617
        session.close();
×
618
      } catch (Exception e2) {
×
619
        // do nothing. We just want to guarantee the session is closed.
620
        logger.warn(CLOSE_THE_SESSION_FAILED, e2);
×
621
      }
×
622
    }
623
  }
×
624

625
  private void cleanSessionAndMayThrowConnectionException(
626
      ISession session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
627
    closeSession(session);
×
628
    tryConstructNewSession();
×
629
    if (times == FINAL_RETRY) {
×
630
      throw new IoTDBConnectionException(
×
631
          String.format(
×
632
              "retry to execute statement on %s failed %d times: %s",
633
              formattedNodeUrls, RETRY, e.getMessage()),
×
634
          e);
635
    }
636
  }
×
637

638
  /**
639
   * insert the data of a device. For each timestamp, the number of measurements is the same.
640
   *
641
   * @param tablet data batch
642
   */
643
  @Override
644
  public void insertTablet(Tablet tablet)
645
      throws IoTDBConnectionException, StatementExecutionException {
646
    /*
647
     *  A Tablet example:
648
     *        device1
649
     *     time s1, s2, s3
650
     *     1,   1,  1,  1
651
     *     2,   2,  2,  2
652
     *     3,   3,  3,  3
653
     *
654
     * times in Tablet may be not in ascending orde
655
     */
656
    insertTablet(tablet, false);
×
657
  }
×
658

659
  /**
660
   * insert the data of a device. For each timestamp, the number of measurements is the same.
661
   *
662
   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
663
   *
664
   * @param tablet a tablet data of one device
665
   * @param sorted whether times in Tablet are in ascending order
666
   */
667
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
668
  @Override
669
  public void insertTablet(Tablet tablet, boolean sorted)
670
      throws IoTDBConnectionException, StatementExecutionException {
671
    /*
672
     *  A Tablet example:
673
     *        device1
674
     *     time s1, s2, s3
675
     *     1,   1,  1,  1
676
     *     2,   2,  2,  2
677
     *     3,   3,  3,  3
678
     */
679

680
    for (int i = 0; i < RETRY; i++) {
×
681
      ISession session = getSession();
×
682
      try {
683
        session.insertTablet(tablet, sorted);
×
684
        putBack(session);
×
685
        return;
×
686
      } catch (IoTDBConnectionException e) {
×
687
        // TException means the connection is broken, remove it and get a new one.
688
        logger.warn("insertTablet failed", e);
×
689
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
690
      } catch (StatementExecutionException | RuntimeException e) {
×
691
        putBack(session);
×
692
        throw e;
×
693
      } catch (Throwable e) {
×
694
        logger.error("unexpected error in insertTablet", e);
×
695
        putBack(session);
×
696
        throw new RuntimeException(e);
×
697
      }
×
698
    }
699
  }
×
700

701
  /**
702
   * insert the data of a device. For each timestamp, the number of measurements is the same.
703
   *
704
   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
705
   *
706
   * @param tablet a tablet data of one device
707
   */
708
  @Override
709
  public void insertAlignedTablet(Tablet tablet)
710
      throws IoTDBConnectionException, StatementExecutionException {
711
    insertAlignedTablet(tablet, false);
×
712
  }
×
713

714
  /**
715
   * insert the data of a device. For each timestamp, the number of measurements is the same.
716
   *
717
   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
718
   *
719
   * @param tablet a tablet data of one device
720
   * @param sorted whether times in Tablet are in ascending order
721
   */
722
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
723
  @Override
724
  public void insertAlignedTablet(Tablet tablet, boolean sorted)
725
      throws IoTDBConnectionException, StatementExecutionException {
726
    for (int i = 0; i < RETRY; i++) {
×
727
      ISession session = getSession();
×
728
      try {
729
        session.insertAlignedTablet(tablet, sorted);
×
730
        putBack(session);
×
731
        return;
×
732
      } catch (IoTDBConnectionException e) {
×
733
        // TException means the connection is broken, remove it and get a new one.
734
        logger.warn("insertAlignedTablet failed", e);
×
735
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
736
      } catch (StatementExecutionException | RuntimeException e) {
×
737
        putBack(session);
×
738
        throw e;
×
739
      } catch (Throwable e) {
×
740
        logger.error("unexpected error in insertAlignedTablet", e);
×
741
        putBack(session);
×
742
        throw new RuntimeException(e);
×
743
      }
×
744
    }
745
  }
×
746

747
  /**
748
   * use batch interface to insert data
749
   *
750
   * @param tablets multiple batch
751
   */
752
  @Override
753
  public void insertTablets(Map<String, Tablet> tablets)
754
      throws IoTDBConnectionException, StatementExecutionException {
755
    insertTablets(tablets, false);
×
756
  }
×
757

758
  /**
759
   * use batch interface to insert data
760
   *
761
   * @param tablets multiple batch
762
   */
763
  @Override
764
  public void insertAlignedTablets(Map<String, Tablet> tablets)
765
      throws IoTDBConnectionException, StatementExecutionException {
766
    insertAlignedTablets(tablets, false);
×
767
  }
×
768

769
  /**
770
   * use batch interface to insert aligned data
771
   *
772
   * @param tablets multiple batch
773
   */
774
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
775
  @Override
776
  public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
777
      throws IoTDBConnectionException, StatementExecutionException {
778
    for (int i = 0; i < RETRY; i++) {
×
779
      ISession session = getSession();
×
780
      try {
781
        session.insertTablets(tablets, sorted);
×
782
        putBack(session);
×
783
        return;
×
784
      } catch (IoTDBConnectionException e) {
×
785
        // TException means the connection is broken, remove it and get a new one.
786
        logger.warn("insertTablets failed", e);
×
787
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
788
      } catch (StatementExecutionException | RuntimeException e) {
×
789
        putBack(session);
×
790
        throw e;
×
791
      } catch (Throwable e) {
×
792
        logger.error("unexpected error in insertTablets", e);
×
793
        putBack(session);
×
794
        throw new RuntimeException(e);
×
795
      }
×
796
    }
797
  }
×
798

799
  /**
800
   * use batch interface to insert aligned data
801
   *
802
   * @param tablets multiple batch
803
   */
804
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
805
  @Override
806
  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
807
      throws IoTDBConnectionException, StatementExecutionException {
808
    for (int i = 0; i < RETRY; i++) {
×
809
      ISession session = getSession();
×
810
      try {
811
        session.insertAlignedTablets(tablets, sorted);
×
812
        putBack(session);
×
813
        return;
×
814
      } catch (IoTDBConnectionException e) {
×
815
        // TException means the connection is broken, remove it and get a new one.
816
        logger.warn("insertAlignedTablets failed", e);
×
817
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
818
      } catch (StatementExecutionException | RuntimeException e) {
×
819
        putBack(session);
×
820
        throw e;
×
821
      } catch (Throwable e) {
×
822
        logger.error("unexpected error in insertAlignedTablets", e);
×
823
        putBack(session);
×
824
        throw new RuntimeException(e);
×
825
      }
×
826
    }
827
  }
×
828

829
  /**
830
   * Insert data in batch format, which can reduce the overhead of network. This method is just like
831
   * jdbc batch insert, we pack some insert request in batch and send them to server If you want
832
   * improve your performance, please see insertTablet method
833
   *
834
   * @see Session#insertTablet(Tablet)
835
   */
836
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
837
  @Override
838
  public void insertRecords(
839
      List<String> deviceIds,
840
      List<Long> times,
841
      List<List<String>> measurementsList,
842
      List<List<TSDataType>> typesList,
843
      List<List<Object>> valuesList)
844
      throws IoTDBConnectionException, StatementExecutionException {
845
    for (int i = 0; i < RETRY; i++) {
×
846
      ISession session = getSession();
×
847
      try {
848
        session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList);
×
849
        putBack(session);
×
850
        return;
×
851
      } catch (IoTDBConnectionException e) {
×
852
        // TException means the connection is broken, remove it and get a new one.
853
        logger.warn("insertRecords failed", e);
×
854
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
855
      } catch (StatementExecutionException | RuntimeException e) {
×
856
        putBack(session);
×
857
        throw e;
×
858
      } catch (Throwable e) {
×
859
        logger.error(INSERT_RECORDS_ERROR_MSG, e);
×
860
        putBack(session);
×
861
        throw new RuntimeException(e);
×
862
      }
×
863
    }
864
  }
×
865

866
  /**
867
   * Insert aligned data in batch format, which can reduce the overhead of network. This method is
868
   * just like jdbc batch insert, we pack some insert request in batch and send them to server If
869
   * you want improve your performance, please see insertTablet method.
870
   *
871
   * @see Session#insertTablet(Tablet)
872
   */
873
  @SuppressWarnings({"squid:S112"}) // ignore Generic exceptions should never be throw
874
  @Override
875
  public void insertAlignedRecords(
876
      List<String> multiSeriesIds,
877
      List<Long> times,
878
      List<List<String>> multiMeasurementComponentsList,
879
      List<List<TSDataType>> typesList,
880
      List<List<Object>> valuesList)
881
      throws IoTDBConnectionException, StatementExecutionException {
882
    for (int i = 0; i < RETRY; i++) {
×
883
      ISession session = getSession();
×
884
      try {
885
        session.insertAlignedRecords(
×
886
            multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList);
887
        putBack(session);
×
888
        return;
×
889
      } catch (IoTDBConnectionException e) {
×
890
        // TException means the connection is broken, remove it and get a new one.
891
        logger.warn("insertAlignedRecords failed", e);
×
892
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
893
      } catch (StatementExecutionException | RuntimeException e) {
×
894
        putBack(session);
×
895
        throw e;
×
896
      } catch (Throwable e) {
×
897
        logger.error("unexpected error in insertAlignedRecords", e);
×
898
        putBack(session);
×
899
        throw new RuntimeException(e);
×
900
      }
×
901
    }
902
  }
×
903

904
  /**
905
   * Insert data that belong to the same device in batch format, which can reduce the overhead of
906
   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
907
   * send them to server If you want improve your performance, please see insertTablet method
908
   *
909
   * @see Session#insertTablet(Tablet)
910
   */
911
  @Override
912
  public void insertRecordsOfOneDevice(
913
      String deviceId,
914
      List<Long> times,
915
      List<List<String>> measurementsList,
916
      List<List<TSDataType>> typesList,
917
      List<List<Object>> valuesList)
918
      throws IoTDBConnectionException, StatementExecutionException {
919
    for (int i = 0; i < RETRY; i++) {
×
920
      ISession session = getSession();
×
921
      try {
922
        session.insertRecordsOfOneDevice(
×
923
            deviceId, times, measurementsList, typesList, valuesList, false);
924
        putBack(session);
×
925
        return;
×
926
      } catch (IoTDBConnectionException e) {
×
927
        // TException means the connection is broken, remove it and get a new one.
928
        logger.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
×
929
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
930
      } catch (StatementExecutionException | RuntimeException e) {
×
931
        putBack(session);
×
932
        throw e;
×
933
      } catch (Throwable e) {
×
934
        logger.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
×
935
        putBack(session);
×
936
        throw new RuntimeException(e);
×
937
      }
×
938
    }
939
  }
×
940

941
  /**
942
   * Insert data that belong to the same device in batch format, which can reduce the overhead of
943
   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
944
   * send them to server If you want improve your performance, please see insertTablet method
945
   *
946
   * @see Session#insertTablet(Tablet)
947
   * @deprecated
948
   */
949
  @Deprecated
950
  @Override
951
  public void insertOneDeviceRecords(
952
      String deviceId,
953
      List<Long> times,
954
      List<List<String>> measurementsList,
955
      List<List<TSDataType>> typesList,
956
      List<List<Object>> valuesList)
957
      throws IoTDBConnectionException, StatementExecutionException {
958
    for (int i = 0; i < RETRY; i++) {
×
959
      ISession session = getSession();
×
960
      try {
961
        session.insertRecordsOfOneDevice(
×
962
            deviceId, times, measurementsList, typesList, valuesList, false);
963
        putBack(session);
×
964
        return;
×
965
      } catch (IoTDBConnectionException e) {
×
966
        // TException means the connection is broken, remove it and get a new one.
967
        logger.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
×
968
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
969
      } catch (StatementExecutionException | RuntimeException e) {
×
970
        putBack(session);
×
971
        throw e;
×
972
      } catch (Throwable e) {
×
973
        logger.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
×
974
        putBack(session);
×
975
        throw new RuntimeException(e);
×
976
      }
×
977
    }
978
  }
×
979

980
  /**
981
   * Insert String format data that belong to the same device in batch format, which can reduce the
982
   * overhead of network. This method is just like jdbc batch insert, we pack some insert request in
983
   * batch and send them to server If you want improve your performance, please see insertTablet
984
   * method
985
   *
986
   * @see Session#insertTablet(Tablet)
987
   */
988
  @Override
989
  public void insertStringRecordsOfOneDevice(
990
      String deviceId,
991
      List<Long> times,
992
      List<List<String>> measurementsList,
993
      List<List<String>> valuesList)
994
      throws IoTDBConnectionException, StatementExecutionException {
995
    for (int i = 0; i < RETRY; i++) {
×
996
      ISession session = getSession();
×
997
      try {
998
        session.insertStringRecordsOfOneDevice(
×
999
            deviceId, times, measurementsList, valuesList, false);
1000
        putBack(session);
×
1001
        return;
×
1002
      } catch (IoTDBConnectionException e) {
×
1003
        // TException means the connection is broken, remove it and get a new one.
1004
        logger.warn("insertStringRecordsOfOneDevice failed", e);
×
1005
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1006
      } catch (StatementExecutionException | RuntimeException e) {
×
1007
        putBack(session);
×
1008
        throw e;
×
1009
      } catch (Throwable e) {
×
1010
        logger.error("unexpected error in insertStringRecordsOfOneDevice", e);
×
1011
        putBack(session);
×
1012
        throw new RuntimeException(e);
×
1013
      }
×
1014
    }
1015
  }
×
1016

1017
  /**
1018
   * Insert data that belong to the same device in batch format, which can reduce the overhead of
1019
   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
1020
   * send them to server If you want improve your performance, please see insertTablet method
1021
   *
1022
   * @param haveSorted whether the times list has been ordered.
1023
   * @see Session#insertTablet(Tablet)
1024
   */
1025
  @Override
1026
  public void insertRecordsOfOneDevice(
1027
      String deviceId,
1028
      List<Long> times,
1029
      List<List<String>> measurementsList,
1030
      List<List<TSDataType>> typesList,
1031
      List<List<Object>> valuesList,
1032
      boolean haveSorted)
1033
      throws IoTDBConnectionException, StatementExecutionException {
1034
    for (int i = 0; i < RETRY; i++) {
×
1035
      ISession session = getSession();
×
1036
      try {
1037
        session.insertRecordsOfOneDevice(
×
1038
            deviceId, times, measurementsList, typesList, valuesList, haveSorted);
1039
        putBack(session);
×
1040
        return;
×
1041
      } catch (IoTDBConnectionException e) {
×
1042
        // TException means the connection is broken, remove it and get a new one.
1043
        logger.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
×
1044
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1045
      } catch (StatementExecutionException | RuntimeException e) {
×
1046
        putBack(session);
×
1047
        throw e;
×
1048
      } catch (Throwable e) {
×
1049
        logger.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
×
1050
        putBack(session);
×
1051
        throw new RuntimeException(e);
×
1052
      }
×
1053
    }
1054
  }
×
1055

1056
  /**
1057
   * Insert data that belong to the same device in batch format, which can reduce the overhead of
1058
   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
1059
   * send them to server If you want improve your performance, please see insertTablet method
1060
   *
1061
   * @param haveSorted whether the times list has been ordered.
1062
   * @see Session#insertTablet(Tablet)
1063
   * @deprecated
1064
   */
1065
  @Override
1066
  @Deprecated
1067
  public void insertOneDeviceRecords(
1068
      String deviceId,
1069
      List<Long> times,
1070
      List<List<String>> measurementsList,
1071
      List<List<TSDataType>> typesList,
1072
      List<List<Object>> valuesList,
1073
      boolean haveSorted)
1074
      throws IoTDBConnectionException, StatementExecutionException {
1075
    for (int i = 0; i < RETRY; i++) {
×
1076
      ISession session = getSession();
×
1077
      try {
1078
        session.insertRecordsOfOneDevice(
×
1079
            deviceId, times, measurementsList, typesList, valuesList, haveSorted);
1080
        putBack(session);
×
1081
        return;
×
1082
      } catch (IoTDBConnectionException e) {
×
1083
        // TException means the connection is broken, remove it and get a new one.
1084
        logger.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
×
1085
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1086
      } catch (StatementExecutionException | RuntimeException e) {
×
1087
        putBack(session);
×
1088
        throw e;
×
1089
      } catch (Throwable e) {
×
1090
        logger.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
×
1091
        putBack(session);
×
1092
        throw new RuntimeException(e);
×
1093
      }
×
1094
    }
1095
  }
×
1096

1097
  /**
1098
   * Insert String format data that belong to the same device in batch format, which can reduce the
1099
   * overhead of network. This method is just like jdbc batch insert, we pack some insert request in
1100
   * batch and send them to server If you want improve your performance, please see insertTablet
1101
   * method
1102
   *
1103
   * @param haveSorted whether the times list has been ordered.
1104
   * @see Session#insertTablet(Tablet)
1105
   */
1106
  @Override
1107
  public void insertStringRecordsOfOneDevice(
1108
      String deviceId,
1109
      List<Long> times,
1110
      List<List<String>> measurementsList,
1111
      List<List<String>> valuesList,
1112
      boolean haveSorted)
1113
      throws IoTDBConnectionException, StatementExecutionException {
1114
    for (int i = 0; i < RETRY; i++) {
×
1115
      ISession session = getSession();
×
1116
      try {
1117
        session.insertStringRecordsOfOneDevice(
×
1118
            deviceId, times, measurementsList, valuesList, haveSorted);
1119
        putBack(session);
×
1120
        return;
×
1121
      } catch (IoTDBConnectionException e) {
×
1122
        // TException means the connection is broken, remove it and get a new one.
1123
        logger.warn("insertStringRecordsOfOneDevice failed", e);
×
1124
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1125
      } catch (StatementExecutionException | RuntimeException e) {
×
1126
        putBack(session);
×
1127
        throw e;
×
1128
      } catch (Throwable e) {
×
1129
        logger.error("unexpected error in insertStringRecordsOfOneDevice", e);
×
1130
        putBack(session);
×
1131
        throw new RuntimeException(e);
×
1132
      }
×
1133
    }
1134
  }
×
1135

1136
  /**
1137
   * Insert aligned data that belong to the same device in batch format, which can reduce the
1138
   * overhead of network. This method is just like jdbc batch insert, we pack some insert request in
1139
   * batch and send them to server If you want improve your performance, please see insertTablet
1140
   * method.
1141
   *
1142
   * @see Session#insertTablet(Tablet)
1143
   */
1144
  @Override
1145
  public void insertAlignedRecordsOfOneDevice(
1146
      String deviceId,
1147
      List<Long> times,
1148
      List<List<String>> measurementsList,
1149
      List<List<TSDataType>> typesList,
1150
      List<List<Object>> valuesList)
1151
      throws IoTDBConnectionException, StatementExecutionException {
1152
    for (int i = 0; i < RETRY; i++) {
×
1153
      ISession session = getSession();
×
1154
      try {
1155
        session.insertAlignedRecordsOfOneDevice(
×
1156
            deviceId, times, measurementsList, typesList, valuesList, false);
1157
        putBack(session);
×
1158
        return;
×
1159
      } catch (IoTDBConnectionException e) {
×
1160
        // TException means the connection is broken, remove it and get a new one.
1161
        logger.warn("insertAlignedRecordsOfOneDevice failed", e);
×
1162
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1163
      } catch (StatementExecutionException | RuntimeException e) {
×
1164
        putBack(session);
×
1165
        throw e;
×
1166
      } catch (Throwable e) {
×
1167
        logger.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
×
1168
        putBack(session);
×
1169
        throw new RuntimeException(e);
×
1170
      }
×
1171
    }
1172
  }
×
1173

1174
  /**
1175
   * Insert aligned data as String format that belong to the same device in batch format, which can
1176
   * reduce the overhead of network. This method is just like jdbc batch insert, we pack some insert
1177
   * request in batch and send them to server If you want improve your performance, please see
1178
   * insertTablet method.
1179
   *
1180
   * @see Session#insertTablet(Tablet)
1181
   */
1182
  @Override
1183
  public void insertAlignedStringRecordsOfOneDevice(
1184
      String deviceId,
1185
      List<Long> times,
1186
      List<List<String>> measurementsList,
1187
      List<List<String>> valuesList)
1188
      throws IoTDBConnectionException, StatementExecutionException {
1189
    for (int i = 0; i < RETRY; i++) {
×
1190
      ISession session = getSession();
×
1191
      try {
1192
        session.insertAlignedStringRecordsOfOneDevice(
×
1193
            deviceId, times, measurementsList, valuesList);
1194
        putBack(session);
×
1195
        return;
×
1196
      } catch (IoTDBConnectionException e) {
×
1197
        // TException means the connection is broken, remove it and get a new one.
1198
        logger.warn("insertAlignedStringRecordsOfOneDevice failed", e);
×
1199
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1200
      } catch (StatementExecutionException | RuntimeException e) {
×
1201
        putBack(session);
×
1202
        throw e;
×
1203
      } catch (Throwable e) {
×
1204
        logger.error("unexpected error in insertAlignedStringRecordsOfOneDevice", e);
×
1205
        putBack(session);
×
1206
        throw new RuntimeException(e);
×
1207
      }
×
1208
    }
1209
  }
×
1210

1211
  /**
1212
   * Insert aligned data that belong to the same device in batch format, which can reduce the
1213
   * overhead of network. This method is just like jdbc batch insert, we pack some insert request in
1214
   * batch and send them to server If you want improve your performance, please see insertTablet
1215
   * method.
1216
   *
1217
   * @param haveSorted whether the times list has been ordered.
1218
   * @see Session#insertTablet(Tablet)
1219
   */
1220
  @Override
1221
  public void insertAlignedRecordsOfOneDevice(
1222
      String deviceId,
1223
      List<Long> times,
1224
      List<List<String>> measurementsList,
1225
      List<List<TSDataType>> typesList,
1226
      List<List<Object>> valuesList,
1227
      boolean haveSorted)
1228
      throws IoTDBConnectionException, StatementExecutionException {
1229
    for (int i = 0; i < RETRY; i++) {
×
1230
      ISession session = getSession();
×
1231
      try {
1232
        session.insertAlignedRecordsOfOneDevice(
×
1233
            deviceId, times, measurementsList, typesList, valuesList, haveSorted);
1234
        putBack(session);
×
1235
        return;
×
1236
      } catch (IoTDBConnectionException e) {
×
1237
        // TException means the connection is broken, remove it and get a new one.
1238
        logger.warn("insertAlignedRecordsOfOneDevice failed", e);
×
1239
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1240
      } catch (StatementExecutionException | RuntimeException e) {
×
1241
        putBack(session);
×
1242
        throw e;
×
1243
      } catch (Throwable e) {
×
1244
        logger.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
×
1245
        putBack(session);
×
1246
        throw new RuntimeException(e);
×
1247
      }
×
1248
    }
1249
  }
×
1250

1251
  /**
1252
   * Insert aligned data as String format that belong to the same device in batch format, which can
1253
   * reduce the overhead of network. This method is just like jdbc batch insert, we pack some insert
1254
   * request in batch and send them to server If you want improve your performance, please see
1255
   * insertTablet method.
1256
   *
1257
   * @param haveSorted whether the times list has been ordered.
1258
   * @see Session#insertTablet(Tablet)
1259
   */
1260
  @Override
1261
  public void insertAlignedStringRecordsOfOneDevice(
1262
      String deviceId,
1263
      List<Long> times,
1264
      List<List<String>> measurementsList,
1265
      List<List<String>> valuesList,
1266
      boolean haveSorted)
1267
      throws IoTDBConnectionException, StatementExecutionException {
1268
    for (int i = 0; i < RETRY; i++) {
×
1269
      ISession session = getSession();
×
1270
      try {
1271
        session.insertAlignedStringRecordsOfOneDevice(
×
1272
            deviceId, times, measurementsList, valuesList, haveSorted);
1273
        putBack(session);
×
1274
        return;
×
1275
      } catch (IoTDBConnectionException e) {
×
1276
        // TException means the connection is broken, remove it and get a new one.
1277
        logger.warn("insertAlignedStringRecordsOfOneDevice failed", e);
×
1278
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1279
      } catch (StatementExecutionException | RuntimeException e) {
×
1280
        putBack(session);
×
1281
        throw e;
×
1282
      } catch (Throwable e) {
×
1283
        logger.error("unexpected error in insertAlignedStringRecordsOfOneDevice", e);
×
1284
        putBack(session);
×
1285
        throw new RuntimeException(e);
×
1286
      }
×
1287
    }
1288
  }
×
1289

1290
  /**
1291
   * Insert data in batch format, which can reduce the overhead of network. This method is just like
1292
   * jdbc batch insert, we pack some insert request in batch and send them to server If you want
1293
   * improve your performance, please see insertTablet method
1294
   *
1295
   * @see Session#insertTablet(Tablet)
1296
   */
1297
  @Override
1298
  public void insertRecords(
1299
      List<String> deviceIds,
1300
      List<Long> times,
1301
      List<List<String>> measurementsList,
1302
      List<List<String>> valuesList)
1303
      throws IoTDBConnectionException, StatementExecutionException {
1304
    for (int i = 0; i < RETRY; i++) {
×
1305
      ISession session = getSession();
×
1306
      try {
1307
        session.insertRecords(deviceIds, times, measurementsList, valuesList);
×
1308
        putBack(session);
×
1309
        return;
×
1310
      } catch (IoTDBConnectionException e) {
×
1311
        // TException means the connection is broken, remove it and get a new one.
1312
        logger.warn("insertRecords failed", e);
×
1313
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1314
      } catch (StatementExecutionException | RuntimeException e) {
×
1315
        putBack(session);
×
1316
        throw e;
×
1317
      } catch (Throwable e) {
×
1318
        logger.error(INSERT_RECORDS_ERROR_MSG, e);
×
1319
        putBack(session);
×
1320
        throw new RuntimeException(e);
×
1321
      }
×
1322
    }
1323
  }
×
1324

1325
  /**
1326
   * Insert aligned data in batch format, which can reduce the overhead of network. This method is
1327
   * just like jdbc batch insert, we pack some insert request in batch and send them to server If
1328
   * you want improve your performance, please see insertTablet method.
1329
   *
1330
   * @see Session#insertTablet(Tablet)
1331
   */
1332
  @Override
1333
  public void insertAlignedRecords(
1334
      List<String> multiSeriesIds,
1335
      List<Long> times,
1336
      List<List<String>> multiMeasurementComponentsList,
1337
      List<List<String>> valuesList)
1338
      throws IoTDBConnectionException, StatementExecutionException {
1339
    for (int i = 0; i < RETRY; i++) {
×
1340
      ISession session = getSession();
×
1341
      try {
1342
        session.insertAlignedRecords(
×
1343
            multiSeriesIds, times, multiMeasurementComponentsList, valuesList);
1344
        putBack(session);
×
1345
        return;
×
1346
      } catch (IoTDBConnectionException e) {
×
1347
        // TException means the connection is broken, remove it and get a new one.
1348
        logger.warn("insertAlignedRecords failed", e);
×
1349
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1350
      } catch (StatementExecutionException | RuntimeException e) {
×
1351
        putBack(session);
×
1352
        throw e;
×
1353
      } catch (Throwable e) {
×
1354
        logger.error("unexpected error in insertAlignedRecords", e);
×
1355
        putBack(session);
×
1356
        throw new RuntimeException(e);
×
1357
      }
×
1358
    }
1359
  }
×
1360

1361
  /**
1362
   * insert data in one row, if you want improve your performance, please use insertRecords method
1363
   * or insertTablet method
1364
   *
1365
   * @see Session#insertRecords(List, List, List, List, List)
1366
   * @see Session#insertTablet(Tablet)
1367
   */
1368
  @Override
1369
  public void insertRecord(
1370
      String deviceId,
1371
      long time,
1372
      List<String> measurements,
1373
      List<TSDataType> types,
1374
      Object... values)
1375
      throws IoTDBConnectionException, StatementExecutionException {
1376
    for (int i = 0; i < RETRY; i++) {
×
1377
      ISession session = getSession();
×
1378
      try {
1379
        session.insertRecord(deviceId, time, measurements, types, values);
×
1380
        putBack(session);
×
1381
        return;
×
1382
      } catch (IoTDBConnectionException e) {
×
1383
        // TException means the connection is broken, remove it and get a new one.
1384
        logger.error(INSERT_RECORD_FAIL, e);
×
1385
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1386
      } catch (StatementExecutionException | RuntimeException e) {
×
1387
        putBack(session);
×
1388
        throw e;
×
1389
      } catch (Throwable e) {
×
1390
        logger.error(INSERT_RECORD_ERROR_MSG, e);
×
1391
        putBack(session);
×
1392
        throw new RuntimeException(e);
×
1393
      }
×
1394
    }
1395
  }
×
1396

1397
  /**
1398
   * insert data in one row, if you want improve your performance, please use insertRecords method
1399
   * or insertTablet method
1400
   *
1401
   * @see Session#insertRecords(List, List, List, List, List)
1402
   * @see Session#insertTablet(Tablet)
1403
   */
1404
  @Override
1405
  public void insertRecord(
1406
      String deviceId,
1407
      long time,
1408
      List<String> measurements,
1409
      List<TSDataType> types,
1410
      List<Object> values)
1411
      throws IoTDBConnectionException, StatementExecutionException {
1412
    for (int i = 0; i < RETRY; i++) {
×
1413
      ISession session = getSession();
×
1414
      try {
1415
        session.insertRecord(deviceId, time, measurements, types, values);
×
1416
        putBack(session);
×
1417
        return;
×
1418
      } catch (IoTDBConnectionException e) {
×
1419
        // TException means the connection is broken, remove it and get a new one.
1420
        logger.warn(INSERT_RECORD_FAIL, e);
×
1421
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1422
      } catch (StatementExecutionException | RuntimeException e) {
×
1423
        putBack(session);
×
1424
        throw e;
×
1425
      } catch (Throwable e) {
×
1426
        logger.error(INSERT_RECORD_ERROR_MSG, e);
×
1427
        putBack(session);
×
1428
        throw new RuntimeException(e);
×
1429
      }
×
1430
    }
1431
  }
×
1432

1433
  @Override
1434
  public String getTimestampPrecision()
1435
      throws IoTDBConnectionException, StatementExecutionException {
1436
    String timestampPrecision = "ms";
×
1437
    for (int i = 0; i < RETRY; i++) {
×
1438
      ISession session = getSession();
×
1439
      try {
1440
        timestampPrecision = session.getTimestampPrecision();
×
1441
        putBack(session);
×
1442
        return timestampPrecision;
×
1443
      } catch (TException e) {
×
1444
        // TException means the connection is broken, remove it and get a new one.
1445
        logger.warn("getTimestampPrecision failed", e);
×
1446
        cleanSessionAndMayThrowConnectionException(session, i, new IoTDBConnectionException(e));
×
1447
      } catch (RuntimeException e) {
×
1448
        putBack(session);
×
1449
        throw e;
×
1450
      } catch (Throwable e) {
×
1451
        logger.error("unexpected error in getTimestampPrecision", e);
×
1452
        putBack(session);
×
1453
        throw new RuntimeException(e);
×
1454
      }
×
1455
    }
1456
    return timestampPrecision;
×
1457
  }
1458

1459
  /**
1460
   * insert aligned data in one row, if you want improve your performance, please use
1461
   * insertAlignedRecords method or insertTablet method.
1462
   *
1463
   * @see Session#insertAlignedRecords(List, List, List, List, List)
1464
   * @see Session#insertTablet(Tablet)
1465
   */
1466
  @Override
1467
  public void insertAlignedRecord(
1468
      String multiSeriesId,
1469
      long time,
1470
      List<String> multiMeasurementComponents,
1471
      List<TSDataType> types,
1472
      List<Object> values)
1473
      throws IoTDBConnectionException, StatementExecutionException {
1474
    for (int i = 0; i < RETRY; i++) {
×
1475
      ISession session = getSession();
×
1476
      try {
1477
        session.insertAlignedRecord(multiSeriesId, time, multiMeasurementComponents, types, values);
×
1478
        putBack(session);
×
1479
        return;
×
1480
      } catch (IoTDBConnectionException e) {
×
1481
        // TException means the connection is broken, remove it and get a new one.
1482
        logger.warn("insertAlignedRecord failed", e);
×
1483
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1484
      } catch (StatementExecutionException | RuntimeException e) {
×
1485
        putBack(session);
×
1486
        throw e;
×
1487
      } catch (Throwable e) {
×
1488
        logger.error("unexpected error in insertAlignedRecord", e);
×
1489
        putBack(session);
×
1490
        throw new RuntimeException(e);
×
1491
      }
×
1492
    }
1493
  }
×
1494

1495
  /**
1496
   * insert data in one row, if you want improve your performance, please use insertRecords method
1497
   * or insertTablet method
1498
   *
1499
   * @see Session#insertRecords(List, List, List, List, List)
1500
   * @see Session#insertTablet(Tablet)
1501
   */
1502
  @Override
1503
  public void insertRecord(
1504
      String deviceId, long time, List<String> measurements, List<String> values)
1505
      throws IoTDBConnectionException, StatementExecutionException {
1506
    for (int i = 0; i < RETRY; i++) {
×
1507
      ISession session = getSession();
×
1508
      try {
1509
        session.insertRecord(deviceId, time, measurements, values);
×
1510
        putBack(session);
×
1511
        return;
×
1512
      } catch (IoTDBConnectionException e) {
×
1513
        // TException means the connection is broken, remove it and get a new one.
1514
        logger.warn(INSERT_RECORD_FAIL, e);
×
1515
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1516
      } catch (StatementExecutionException | RuntimeException e) {
×
1517
        putBack(session);
×
1518
        throw e;
×
1519
      } catch (Throwable e) {
×
1520
        logger.error(INSERT_RECORD_ERROR_MSG, e);
×
1521
        putBack(session);
×
1522
        throw new RuntimeException(e);
×
1523
      }
×
1524
    }
1525
  }
×
1526

1527
  /**
1528
   * insert aligned data in one row, if you want improve your performance, please use
1529
   * insertAlignedRecords method or insertTablet method.
1530
   *
1531
   * @see Session#insertAlignedRecords(List, List, List, List, List)
1532
   * @see Session#insertTablet(Tablet)
1533
   */
1534
  @Override
1535
  public void insertAlignedRecord(
1536
      String multiSeriesId, long time, List<String> multiMeasurementComponents, List<String> values)
1537
      throws IoTDBConnectionException, StatementExecutionException {
1538
    for (int i = 0; i < RETRY; i++) {
×
1539
      ISession session = getSession();
×
1540
      try {
1541
        session.insertAlignedRecord(multiSeriesId, time, multiMeasurementComponents, values);
×
1542
        putBack(session);
×
1543
        return;
×
1544
      } catch (IoTDBConnectionException e) {
×
1545
        // TException means the connection is broken, remove it and get a new one.
1546
        logger.warn("insertAlignedRecord failed", e);
×
1547
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1548
      } catch (StatementExecutionException | RuntimeException e) {
×
1549
        putBack(session);
×
1550
        throw e;
×
1551
      } catch (Throwable e) {
×
1552
        logger.error("unexpected error in insertAlignedRecord", e);
×
1553
        putBack(session);
×
1554
        throw new RuntimeException(e);
×
1555
      }
×
1556
    }
1557
  }
×
1558

1559
  /**
1560
   * This method NOT insert data into database and the server just return after accept the request,
1561
   * this method should be used to test other time cost in client
1562
   */
1563
  @Override
1564
  public void testInsertTablet(Tablet tablet)
1565
      throws IoTDBConnectionException, StatementExecutionException {
1566
    for (int i = 0; i < RETRY; i++) {
×
1567
      ISession session = getSession();
×
1568
      try {
1569
        session.testInsertTablet(tablet);
×
1570
        putBack(session);
×
1571
        return;
×
1572
      } catch (IoTDBConnectionException e) {
×
1573
        // TException means the connection is broken, remove it and get a new one.
1574
        logger.warn("testInsertTablet failed", e);
×
1575
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1576
      } catch (StatementExecutionException | RuntimeException e) {
×
1577
        putBack(session);
×
1578
        throw e;
×
1579
      } catch (Throwable e) {
×
1580
        logger.error("unexpected error in testInsertTablet", e);
×
1581
        putBack(session);
×
1582
        throw new RuntimeException(e);
×
1583
      }
×
1584
    }
1585
  }
×
1586

1587
  /**
1588
   * This method NOT insert data into database and the server just return after accept the request,
1589
   * this method should be used to test other time cost in client
1590
   */
1591
  @Override
1592
  public void testInsertTablet(Tablet tablet, boolean sorted)
1593
      throws IoTDBConnectionException, StatementExecutionException {
1594
    for (int i = 0; i < RETRY; i++) {
×
1595
      ISession session = getSession();
×
1596
      try {
1597
        session.testInsertTablet(tablet, sorted);
×
1598
        putBack(session);
×
1599
        return;
×
1600
      } catch (IoTDBConnectionException e) {
×
1601
        // TException means the connection is broken, remove it and get a new one.
1602
        logger.warn("testInsertTablet failed", e);
×
1603
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1604
      } catch (StatementExecutionException | RuntimeException e) {
×
1605
        putBack(session);
×
1606
        throw e;
×
1607
      } catch (Throwable e) {
×
1608
        logger.error("unexpected error in testInsertTablet", e);
×
1609
        putBack(session);
×
1610
        throw new RuntimeException(e);
×
1611
      }
×
1612
    }
1613
  }
×
1614

1615
  /**
1616
   * This method NOT insert data into database and the server just return after accept the request,
1617
   * this method should be used to test other time cost in client
1618
   */
1619
  @Override
1620
  public void testInsertTablets(Map<String, Tablet> tablets)
1621
      throws IoTDBConnectionException, StatementExecutionException {
1622
    for (int i = 0; i < RETRY; i++) {
×
1623
      ISession session = getSession();
×
1624
      try {
1625
        session.testInsertTablets(tablets);
×
1626
        putBack(session);
×
1627
        return;
×
1628
      } catch (IoTDBConnectionException e) {
×
1629
        // TException means the connection is broken, remove it and get a new one.
1630
        logger.warn("testInsertTablets failed", e);
×
1631
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1632
      } catch (StatementExecutionException | RuntimeException e) {
×
1633
        putBack(session);
×
1634
        throw e;
×
1635
      } catch (Throwable e) {
×
1636
        logger.error("unexpected error in testInsertTablets", e);
×
1637
        putBack(session);
×
1638
        throw new RuntimeException(e);
×
1639
      }
×
1640
    }
1641
  }
×
1642

1643
  /**
1644
   * This method NOT insert data into database and the server just return after accept the request,
1645
   * this method should be used to test other time cost in client
1646
   */
1647
  @Override
1648
  public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
1649
      throws IoTDBConnectionException, StatementExecutionException {
1650
    for (int i = 0; i < RETRY; i++) {
×
1651
      ISession session = getSession();
×
1652
      try {
1653
        session.testInsertTablets(tablets, sorted);
×
1654
        putBack(session);
×
1655
        return;
×
1656
      } catch (IoTDBConnectionException e) {
×
1657
        // TException means the connection is broken, remove it and get a new one.
1658
        logger.warn("testInsertTablets failed", e);
×
1659
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1660
      } catch (StatementExecutionException | RuntimeException e) {
×
1661
        putBack(session);
×
1662
        throw e;
×
1663
      } catch (Throwable e) {
×
1664
        logger.error("unexpected error in testInsertTablets", e);
×
1665
        putBack(session);
×
1666
        throw new RuntimeException(e);
×
1667
      }
×
1668
    }
1669
  }
×
1670

1671
  /**
1672
   * This method NOT insert data into database and the server just return after accept the request,
1673
   * this method should be used to test other time cost in client
1674
   */
1675
  @Override
1676
  public void testInsertRecords(
1677
      List<String> deviceIds,
1678
      List<Long> times,
1679
      List<List<String>> measurementsList,
1680
      List<List<String>> valuesList)
1681
      throws IoTDBConnectionException, StatementExecutionException {
1682
    for (int i = 0; i < RETRY; i++) {
×
1683
      ISession session = getSession();
×
1684
      try {
1685
        session.testInsertRecords(deviceIds, times, measurementsList, valuesList);
×
1686
        putBack(session);
×
1687
        return;
×
1688
      } catch (IoTDBConnectionException e) {
×
1689
        // TException means the connection is broken, remove it and get a new one.
1690
        logger.warn("testInsertRecords failed", e);
×
1691
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1692
      } catch (StatementExecutionException | RuntimeException e) {
×
1693
        putBack(session);
×
1694
        throw e;
×
1695
      } catch (Throwable e) {
×
1696
        logger.error("unexpected error in testInsertRecords", e);
×
1697
        putBack(session);
×
1698
        throw new RuntimeException(e);
×
1699
      }
×
1700
    }
1701
  }
×
1702

1703
  /**
1704
   * This method NOT insert data into database and the server just return after accept the request,
1705
   * this method should be used to test other time cost in client
1706
   */
1707
  @Override
1708
  public void testInsertRecords(
1709
      List<String> deviceIds,
1710
      List<Long> times,
1711
      List<List<String>> measurementsList,
1712
      List<List<TSDataType>> typesList,
1713
      List<List<Object>> valuesList)
1714
      throws IoTDBConnectionException, StatementExecutionException {
1715
    for (int i = 0; i < RETRY; i++) {
×
1716
      ISession session = getSession();
×
1717
      try {
1718
        session.testInsertRecords(deviceIds, times, measurementsList, typesList, valuesList);
×
1719
        putBack(session);
×
1720
        return;
×
1721
      } catch (IoTDBConnectionException e) {
×
1722
        // TException means the connection is broken, remove it and get a new one.
1723
        logger.warn("testInsertRecords failed", e);
×
1724
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1725
      } catch (StatementExecutionException | RuntimeException e) {
×
1726
        putBack(session);
×
1727
        throw e;
×
1728
      } catch (Throwable e) {
×
1729
        logger.error("unexpected error in testInsertRecords", e);
×
1730
        putBack(session);
×
1731
        throw new RuntimeException(e);
×
1732
      }
×
1733
    }
1734
  }
×
1735

1736
  /**
1737
   * This method NOT insert data into database and the server just return after accept the request,
1738
   * this method should be used to test other time cost in client
1739
   */
1740
  @Override
1741
  public void testInsertRecord(
1742
      String deviceId, long time, List<String> measurements, List<String> values)
1743
      throws IoTDBConnectionException, StatementExecutionException {
1744
    for (int i = 0; i < RETRY; i++) {
×
1745
      ISession session = getSession();
×
1746
      try {
1747
        session.testInsertRecord(deviceId, time, measurements, values);
×
1748
        putBack(session);
×
1749
        return;
×
1750
      } catch (IoTDBConnectionException e) {
×
1751
        // TException means the connection is broken, remove it and get a new one.
1752
        logger.warn("testInsertRecord failed", e);
×
1753
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1754
      } catch (StatementExecutionException | RuntimeException e) {
×
1755
        putBack(session);
×
1756
        throw e;
×
1757
      } catch (Throwable e) {
×
1758
        logger.error("unexpected error in testInsertRecord", e);
×
1759
        putBack(session);
×
1760
        throw new RuntimeException(e);
×
1761
      }
×
1762
    }
1763
  }
×
1764

1765
  /**
1766
   * This method NOT insert data into database and the server just return after accept the request,
1767
   * this method should be used to test other time cost in client
1768
   */
1769
  @Override
1770
  public void testInsertRecord(
1771
      String deviceId,
1772
      long time,
1773
      List<String> measurements,
1774
      List<TSDataType> types,
1775
      List<Object> values)
1776
      throws IoTDBConnectionException, StatementExecutionException {
1777
    for (int i = 0; i < RETRY; i++) {
×
1778
      ISession session = getSession();
×
1779
      try {
1780
        session.testInsertRecord(deviceId, time, measurements, types, values);
×
1781
        putBack(session);
×
1782
        return;
×
1783
      } catch (IoTDBConnectionException e) {
×
1784
        // TException means the connection is broken, remove it and get a new one.
1785
        logger.warn("testInsertRecord failed", e);
×
1786
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1787
      } catch (StatementExecutionException | RuntimeException e) {
×
1788
        putBack(session);
×
1789
        throw e;
×
1790
      } catch (Throwable e) {
×
1791
        logger.error("unexpected error in testInsertRecord", e);
×
1792
        putBack(session);
×
1793
        throw new RuntimeException(e);
×
1794
      }
×
1795
    }
1796
  }
×
1797

1798
  /**
1799
   * delete a timeseries, including data and schema
1800
   *
1801
   * @param path timeseries to delete, should be a whole path
1802
   */
1803
  @Override
1804
  public void deleteTimeseries(String path)
1805
      throws IoTDBConnectionException, StatementExecutionException {
1806
    for (int i = 0; i < RETRY; i++) {
×
1807
      ISession session = getSession();
×
1808
      try {
1809
        session.deleteTimeseries(path);
×
1810
        putBack(session);
×
1811
        return;
×
1812
      } catch (IoTDBConnectionException e) {
×
1813
        // TException means the connection is broken, remove it and get a new one.
1814
        logger.warn("deleteTimeseries failed", e);
×
1815
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1816
      } catch (StatementExecutionException | RuntimeException e) {
×
1817
        putBack(session);
×
1818
        throw e;
×
1819
      } catch (Throwable e) {
×
1820
        logger.error("unexpected error in deleteTimeseries", e);
×
1821
        putBack(session);
×
1822
        throw new RuntimeException(e);
×
1823
      }
×
1824
    }
1825
  }
×
1826

1827
  /**
1828
   * delete a timeseries, including data and schema
1829
   *
1830
   * @param paths timeseries to delete, should be a whole path
1831
   */
1832
  @Override
1833
  public void deleteTimeseries(List<String> paths)
1834
      throws IoTDBConnectionException, StatementExecutionException {
1835
    for (int i = 0; i < RETRY; i++) {
×
1836
      ISession session = getSession();
×
1837
      try {
1838
        session.deleteTimeseries(paths);
×
1839
        putBack(session);
×
1840
        return;
×
1841
      } catch (IoTDBConnectionException e) {
×
1842
        // TException means the connection is broken, remove it and get a new one.
1843
        logger.warn("deleteTimeseries failed", e);
×
1844
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1845
      } catch (StatementExecutionException | RuntimeException e) {
×
1846
        putBack(session);
×
1847
        throw e;
×
1848
      } catch (Throwable e) {
×
1849
        logger.error("unexpected error in deleteTimeseries", e);
×
1850
        putBack(session);
×
1851
        throw new RuntimeException(e);
×
1852
      }
×
1853
    }
1854
  }
×
1855

1856
  /**
1857
   * delete data <= time in one timeseries
1858
   *
1859
   * @param path data in which time series to delete
1860
   * @param time data with time stamp less than or equal to time will be deleted
1861
   */
1862
  @Override
1863
  public void deleteData(String path, long time)
1864
      throws IoTDBConnectionException, StatementExecutionException {
1865
    for (int i = 0; i < RETRY; i++) {
×
1866
      ISession session = getSession();
×
1867
      try {
1868
        session.deleteData(path, time);
×
1869
        putBack(session);
×
1870
        return;
×
1871
      } catch (IoTDBConnectionException e) {
×
1872
        // TException means the connection is broken, remove it and get a new one.
1873
        logger.warn(DELETE_DATA_FAIL, e);
×
1874
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1875
      } catch (StatementExecutionException | RuntimeException e) {
×
1876
        putBack(session);
×
1877
        throw e;
×
1878
      } catch (Throwable e) {
×
1879
        logger.error(DELETE_DATA_ERROR_MSG, e);
×
1880
        putBack(session);
×
1881
        throw new RuntimeException(e);
×
1882
      }
×
1883
    }
1884
  }
×
1885

1886
  /**
1887
   * delete data <= time in multiple timeseries
1888
   *
1889
   * @param paths data in which time series to delete
1890
   * @param time data with time stamp less than or equal to time will be deleted
1891
   */
1892
  @Override
1893
  public void deleteData(List<String> paths, long time)
1894
      throws IoTDBConnectionException, StatementExecutionException {
1895
    for (int i = 0; i < RETRY; i++) {
×
1896
      ISession session = getSession();
×
1897
      try {
1898
        session.deleteData(paths, time);
×
1899
        putBack(session);
×
1900
        return;
×
1901
      } catch (IoTDBConnectionException e) {
×
1902
        // TException means the connection is broken, remove it and get a new one.
1903
        logger.warn(DELETE_DATA_FAIL, e);
×
1904
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1905
      } catch (StatementExecutionException | RuntimeException e) {
×
1906
        putBack(session);
×
1907
        throw e;
×
1908
      } catch (Throwable e) {
×
1909
        logger.error(DELETE_DATA_ERROR_MSG, e);
×
1910
        putBack(session);
×
1911
        throw new RuntimeException(e);
×
1912
      }
×
1913
    }
1914
  }
×
1915

1916
  /**
1917
   * delete data >= startTime and data <= endTime in multiple timeseries
1918
   *
1919
   * @param paths data in which time series to delete
1920
   * @param startTime delete range start time
1921
   * @param endTime delete range end time
1922
   */
1923
  @Override
1924
  public void deleteData(List<String> paths, long startTime, long endTime)
1925
      throws IoTDBConnectionException, StatementExecutionException {
1926
    for (int i = 0; i < RETRY; i++) {
×
1927
      ISession session = getSession();
×
1928
      try {
1929
        session.deleteData(paths, startTime, endTime);
×
1930
        putBack(session);
×
1931
        return;
×
1932
      } catch (IoTDBConnectionException e) {
×
1933
        // TException means the connection is broken, remove it and get a new one.
1934
        logger.warn(DELETE_DATA_FAIL, e);
×
1935
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1936
      } catch (StatementExecutionException | RuntimeException e) {
×
1937
        putBack(session);
×
1938
        throw e;
×
1939
      } catch (Throwable e) {
×
1940
        logger.error(DELETE_DATA_ERROR_MSG, e);
×
1941
        putBack(session);
×
1942
        throw new RuntimeException(e);
×
1943
      }
×
1944
    }
1945
  }
×
1946

1947
  /** @deprecated Use {@link #createDatabase(String)} instead. */
1948
  @Deprecated
1949
  @Override
1950
  public void setStorageGroup(String storageGroupId)
1951
      throws IoTDBConnectionException, StatementExecutionException {
1952
    for (int i = 0; i < RETRY; i++) {
×
1953
      ISession session = getSession();
×
1954
      try {
1955
        session.setStorageGroup(storageGroupId);
×
1956
        putBack(session);
×
1957
        return;
×
1958
      } catch (IoTDBConnectionException e) {
×
1959
        // TException means the connection is broken, remove it and get a new one.
1960
        logger.warn("setStorageGroup failed", e);
×
1961
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1962
      } catch (StatementExecutionException | RuntimeException e) {
×
1963
        putBack(session);
×
1964
        throw e;
×
1965
      } catch (Throwable e) {
×
1966
        logger.error("unexpected error in setStorageGroup", e);
×
1967
        putBack(session);
×
1968
        throw new RuntimeException(e);
×
1969
      }
×
1970
    }
1971
  }
×
1972

1973
  /** @deprecated Use {@link #deleteDatabase(String)} instead. */
1974
  @Deprecated
1975
  @Override
1976
  public void deleteStorageGroup(String storageGroup)
1977
      throws IoTDBConnectionException, StatementExecutionException {
1978
    for (int i = 0; i < RETRY; i++) {
×
1979
      ISession session = getSession();
×
1980
      try {
1981
        session.deleteStorageGroup(storageGroup);
×
1982
        putBack(session);
×
1983
        return;
×
1984
      } catch (IoTDBConnectionException e) {
×
1985
        // TException means the connection is broken, remove it and get a new one.
1986
        logger.warn("deleteStorageGroup failed", e);
×
1987
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
1988
      } catch (StatementExecutionException | RuntimeException e) {
×
1989
        putBack(session);
×
1990
        throw e;
×
1991
      } catch (Throwable e) {
×
1992
        logger.error("unexpected error in deleteStorageGroup", e);
×
1993
        putBack(session);
×
1994
        throw new RuntimeException(e);
×
1995
      }
×
1996
    }
1997
  }
×
1998

1999
  /** @deprecated Use {@link #deleteDatabases(List)} instead. */
2000
  @Deprecated
2001
  @Override
2002
  public void deleteStorageGroups(List<String> storageGroup)
2003
      throws IoTDBConnectionException, StatementExecutionException {
2004
    for (int i = 0; i < RETRY; i++) {
×
2005
      ISession session = getSession();
×
2006
      try {
2007
        session.deleteStorageGroups(storageGroup);
×
2008
        putBack(session);
×
2009
        return;
×
2010
      } catch (IoTDBConnectionException e) {
×
2011
        // TException means the connection is broken, remove it and get a new one.
2012
        logger.warn("deleteStorageGroups failed", e);
×
2013
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2014
      } catch (StatementExecutionException | RuntimeException e) {
×
2015
        putBack(session);
×
2016
        throw e;
×
2017
      } catch (Throwable e) {
×
2018
        logger.error("unexpected error in deleteStorageGroups", e);
×
2019
        putBack(session);
×
2020
        throw new RuntimeException(e);
×
2021
      }
×
2022
    }
2023
  }
×
2024

2025
  @Override
2026
  public void createDatabase(String database)
2027
      throws IoTDBConnectionException, StatementExecutionException {
2028
    for (int i = 0; i < RETRY; i++) {
×
2029
      ISession session = getSession();
×
2030
      try {
2031
        session.createDatabase(database);
×
2032
        putBack(session);
×
2033
        return;
×
2034
      } catch (IoTDBConnectionException e) {
×
2035
        // TException means the connection is broken, remove it and get a new one.
2036
        logger.warn("createDatabase failed", e);
×
2037
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2038
      } catch (StatementExecutionException | RuntimeException e) {
×
2039
        putBack(session);
×
2040
        throw e;
×
2041
      } catch (Throwable e) {
×
2042
        logger.error("unexpected error in createDatabase", e);
×
2043
        putBack(session);
×
2044
        throw new RuntimeException(e);
×
2045
      }
×
2046
    }
2047
  }
×
2048

2049
  @Override
2050
  public void deleteDatabase(String database)
2051
      throws IoTDBConnectionException, StatementExecutionException {
2052
    for (int i = 0; i < RETRY; i++) {
×
2053
      ISession session = getSession();
×
2054
      try {
2055
        session.deleteDatabase(database);
×
2056
        putBack(session);
×
2057
        return;
×
2058
      } catch (IoTDBConnectionException e) {
×
2059
        // TException means the connection is broken, remove it and get a new one.
2060
        logger.warn("deleteDatabase failed", e);
×
2061
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2062
      } catch (StatementExecutionException | RuntimeException e) {
×
2063
        putBack(session);
×
2064
        throw e;
×
2065
      } catch (Throwable e) {
×
2066
        logger.error("unexpected error in deleteDatabase", e);
×
2067
        putBack(session);
×
2068
        throw new RuntimeException(e);
×
2069
      }
×
2070
    }
2071
  }
×
2072

2073
  @Override
2074
  public void deleteDatabases(List<String> databases)
2075
      throws IoTDBConnectionException, StatementExecutionException {
2076
    for (int i = 0; i < RETRY; i++) {
×
2077
      ISession session = getSession();
×
2078
      try {
2079
        session.deleteDatabases(databases);
×
2080
        putBack(session);
×
2081
        return;
×
2082
      } catch (IoTDBConnectionException e) {
×
2083
        // TException means the connection is broken, remove it and get a new one.
2084
        logger.warn("deleteDatabases failed", e);
×
2085
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2086
      } catch (StatementExecutionException | RuntimeException e) {
×
2087
        putBack(session);
×
2088
        throw e;
×
2089
      } catch (Throwable e) {
×
2090
        logger.error("unexpected error in deleteDatabases", e);
×
2091
        putBack(session);
×
2092
        throw new RuntimeException(e);
×
2093
      }
×
2094
    }
2095
  }
×
2096

2097
  @Override
2098
  public void createTimeseries(
2099
      String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
2100
      throws IoTDBConnectionException, StatementExecutionException {
2101
    for (int i = 0; i < RETRY; i++) {
×
2102
      ISession session = getSession();
×
2103
      try {
2104
        session.createTimeseries(path, dataType, encoding, compressor);
×
2105
        putBack(session);
×
2106
        return;
×
2107
      } catch (IoTDBConnectionException e) {
×
2108
        // TException means the connection is broken, remove it and get a new one.
2109
        logger.warn("createTimeseries failed", e);
×
2110
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2111
      } catch (StatementExecutionException | RuntimeException e) {
×
2112
        putBack(session);
×
2113
        throw e;
×
2114
      } catch (Throwable e) {
×
2115
        logger.error("unexpected error in createTimeseries", e);
×
2116
        putBack(session);
×
2117
        throw new RuntimeException(e);
×
2118
      }
×
2119
    }
2120
  }
×
2121

2122
  @Override
2123
  public void createTimeseries(
2124
      String path,
2125
      TSDataType dataType,
2126
      TSEncoding encoding,
2127
      CompressionType compressor,
2128
      Map<String, String> props,
2129
      Map<String, String> tags,
2130
      Map<String, String> attributes,
2131
      String measurementAlias)
2132
      throws IoTDBConnectionException, StatementExecutionException {
2133
    for (int i = 0; i < RETRY; i++) {
×
2134
      ISession session = getSession();
×
2135
      try {
2136
        session.createTimeseries(
×
2137
            path, dataType, encoding, compressor, props, tags, attributes, measurementAlias);
2138
        putBack(session);
×
2139
        return;
×
2140
      } catch (IoTDBConnectionException e) {
×
2141
        // TException means the connection is broken, remove it and get a new one.
2142
        logger.warn("createTimeseries failed", e);
×
2143
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2144
      } catch (StatementExecutionException | RuntimeException e) {
×
2145
        putBack(session);
×
2146
        throw e;
×
2147
      } catch (Throwable e) {
×
2148
        logger.error("unexpected error in createTimeseries", e);
×
2149
        putBack(session);
×
2150
        throw new RuntimeException(e);
×
2151
      }
×
2152
    }
2153
  }
×
2154

2155
  @Override
2156
  public void createAlignedTimeseries(
2157
      String deviceId,
2158
      List<String> measurements,
2159
      List<TSDataType> dataTypes,
2160
      List<TSEncoding> encodings,
2161
      List<CompressionType> compressors,
2162
      List<String> measurementAliasList)
2163
      throws IoTDBConnectionException, StatementExecutionException {
2164
    for (int i = 0; i < RETRY; i++) {
×
2165
      ISession session = getSession();
×
2166
      try {
2167
        session.createAlignedTimeseries(
×
2168
            deviceId, measurements, dataTypes, encodings, compressors, measurementAliasList);
2169
        putBack(session);
×
2170
        return;
×
2171
      } catch (IoTDBConnectionException e) {
×
2172
        // TException means the connection is broken, remove it and get a new one.
2173
        logger.warn("createAlignedTimeseries failed", e);
×
2174
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2175
      } catch (StatementExecutionException | RuntimeException e) {
×
2176
        putBack(session);
×
2177
        throw e;
×
2178
      } catch (Throwable e) {
×
2179
        logger.error("unexpected error in createAlignedTimeseries", e);
×
2180
        putBack(session);
×
2181
        throw new RuntimeException(e);
×
2182
      }
×
2183
    }
2184
  }
×
2185

2186
  @Override
2187
  public void createAlignedTimeseries(
2188
      String deviceId,
2189
      List<String> measurements,
2190
      List<TSDataType> dataTypes,
2191
      List<TSEncoding> encodings,
2192
      List<CompressionType> compressors,
2193
      List<String> measurementAliasList,
2194
      List<Map<String, String>> tagsList,
2195
      List<Map<String, String>> attributesList)
2196
      throws IoTDBConnectionException, StatementExecutionException {
2197
    for (int i = 0; i < RETRY; i++) {
×
2198
      ISession session = getSession();
×
2199
      try {
2200
        session.createAlignedTimeseries(
×
2201
            deviceId,
2202
            measurements,
2203
            dataTypes,
2204
            encodings,
2205
            compressors,
2206
            measurementAliasList,
2207
            tagsList,
2208
            attributesList);
2209
        putBack(session);
×
2210
        return;
×
2211
      } catch (IoTDBConnectionException e) {
×
2212
        // TException means the connection is broken, remove it and get a new one.
2213
        logger.warn("createAlignedTimeseries failed", e);
×
2214
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2215
      } catch (StatementExecutionException | RuntimeException e) {
×
2216
        putBack(session);
×
2217
        throw e;
×
2218
      } catch (Throwable e) {
×
2219
        logger.error("unexpected error in createAlignedTimeseries", e);
×
2220
        putBack(session);
×
2221
        throw new RuntimeException(e);
×
2222
      }
×
2223
    }
2224
  }
×
2225

2226
  @Override
2227
  public void createMultiTimeseries(
2228
      List<String> paths,
2229
      List<TSDataType> dataTypes,
2230
      List<TSEncoding> encodings,
2231
      List<CompressionType> compressors,
2232
      List<Map<String, String>> propsList,
2233
      List<Map<String, String>> tagsList,
2234
      List<Map<String, String>> attributesList,
2235
      List<String> measurementAliasList)
2236
      throws IoTDBConnectionException, StatementExecutionException {
2237
    for (int i = 0; i < RETRY; i++) {
×
2238
      ISession session = getSession();
×
2239
      try {
2240
        session.createMultiTimeseries(
×
2241
            paths,
2242
            dataTypes,
2243
            encodings,
2244
            compressors,
2245
            propsList,
2246
            tagsList,
2247
            attributesList,
2248
            measurementAliasList);
2249
        putBack(session);
×
2250
        return;
×
2251
      } catch (IoTDBConnectionException e) {
×
2252
        // TException means the connection is broken, remove it and get a new one.
2253
        logger.warn("createMultiTimeseries failed", e);
×
2254
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2255
      } catch (StatementExecutionException | RuntimeException e) {
×
2256
        putBack(session);
×
2257
        throw e;
×
2258
      } catch (Throwable e) {
×
2259
        logger.error("unexpected error in createMultiTimeseries", e);
×
2260
        putBack(session);
×
2261
        throw new RuntimeException(e);
×
2262
      }
×
2263
    }
2264
  }
×
2265

2266
  @Override
2267
  public boolean checkTimeseriesExists(String path)
2268
      throws IoTDBConnectionException, StatementExecutionException {
2269
    for (int i = 0; i < RETRY; i++) {
×
2270
      ISession session = getSession();
×
2271
      try {
2272
        boolean resp = session.checkTimeseriesExists(path);
×
2273
        putBack(session);
×
2274
        return resp;
×
2275
      } catch (IoTDBConnectionException e) {
×
2276
        // TException means the connection is broken, remove it and get a new one.
2277
        logger.warn("checkTimeseriesExists failed", e);
×
2278
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2279
      } catch (StatementExecutionException | RuntimeException e) {
×
2280
        putBack(session);
×
2281
        throw e;
×
2282
      } catch (Throwable e) {
×
2283
        logger.error("unexpected error in checkTimeseriesExists", e);
×
2284
        putBack(session);
×
2285
        throw new RuntimeException(e);
×
2286
      }
×
2287
    }
2288
    // never go here.
2289
    return false;
×
2290
  }
2291

2292
  /**
2293
   * Construct Template at session and create it at server.
2294
   *
2295
   * @see Template
2296
   */
2297
  @Override
2298
  public void createSchemaTemplate(Template template)
2299
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2300
    for (int i = 0; i < RETRY; i++) {
×
2301
      ISession session = getSession();
×
2302
      try {
2303
        session.createSchemaTemplate(template);
×
2304
        putBack(session);
×
2305
        return;
×
2306
      } catch (IoTDBConnectionException e) {
×
2307
        // TException means the connection is broken, remove it and get a new one.
2308
        logger.warn(CREATE_SCHEMA_TEMPLATE_FAIL, e);
×
2309
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2310
      } catch (StatementExecutionException | RuntimeException e) {
×
2311
        putBack(session);
×
2312
        throw e;
×
2313
      } catch (Throwable e) {
×
2314
        logger.error(CREATE_SCHEMA_TEMPLATE_ERROR_MSG, e);
×
2315
        putBack(session);
×
2316
        throw new RuntimeException(e);
×
2317
      }
×
2318
    }
2319
  }
×
2320

2321
  /**
2322
   * Create a template with flat measurements, not tree structured. Need to specify datatype,
2323
   * encoding and compressor of each measurement, and alignment of these measurements at once.
2324
   *
2325
   * @oaram templateName name of template to create
2326
   * @param measurements flat measurements of the template, cannot contain character dot
2327
   * @param dataTypes datatype of each measurement in the template
2328
   * @param encodings encodings of each measurement in the template
2329
   * @param compressors compression type of each measurement in the template
2330
   * @param isAligned specify whether these flat measurements are aligned
2331
   */
2332
  @Override
2333
  public void createSchemaTemplate(
2334
      String templateName,
2335
      List<String> measurements,
2336
      List<TSDataType> dataTypes,
2337
      List<TSEncoding> encodings,
2338
      List<CompressionType> compressors,
2339
      boolean isAligned)
2340
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2341
    for (int i = 0; i < RETRY; i++) {
×
2342
      ISession session = getSession();
×
2343
      try {
2344
        session.createSchemaTemplate(
×
2345
            templateName, measurements, dataTypes, encodings, compressors, isAligned);
2346
        putBack(session);
×
2347
        return;
×
2348
      } catch (IoTDBConnectionException e) {
×
2349
        // TException means the connection is broken, remove it and get a new one.
2350
        logger.warn(CREATE_SCHEMA_TEMPLATE_FAIL, e);
×
2351
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2352
      } catch (StatementExecutionException | RuntimeException e) {
×
2353
        putBack(session);
×
2354
        throw e;
×
2355
      } catch (Throwable e) {
×
2356
        logger.error(CREATE_SCHEMA_TEMPLATE_ERROR_MSG, e);
×
2357
        putBack(session);
×
2358
        throw new RuntimeException(e);
×
2359
      }
×
2360
    }
2361
  }
×
2362

2363
  /**
2364
   * Compatible for rel/0.12, this method will create an unaligned flat template as a result. Notice
2365
   * that there is no aligned concept in 0.12, so only the first measurement in each nested list
2366
   * matters.
2367
   *
2368
   * @param name name of the template
2369
   * @param schemaNames it works as a virtual layer inside template in 0.12, and makes no difference
2370
   *     after 0.13
2371
   * @param measurements the first measurement in each nested list will constitute the final flat
2372
   *     template
2373
   * @param dataTypes the data type of each measurement, only the first one in each nested list
2374
   *     matters as above
2375
   * @param encodings the encoding of each measurement, only the first one in each nested list
2376
   *     matters as above
2377
   * @param compressors the compressor of each measurement
2378
   * @throws IoTDBConnectionException
2379
   * @throws StatementExecutionException
2380
   * @deprecated
2381
   */
2382
  @Deprecated
2383
  @Override
2384
  public void createSchemaTemplate(
2385
      String name,
2386
      List<String> schemaNames,
2387
      List<List<String>> measurements,
2388
      List<List<TSDataType>> dataTypes,
2389
      List<List<TSEncoding>> encodings,
2390
      List<CompressionType> compressors)
2391
      throws IoTDBConnectionException, StatementExecutionException {
2392
    for (int i = 0; i < RETRY; i++) {
×
2393
      ISession session = getSession();
×
2394
      try {
2395
        session.createSchemaTemplate(
×
2396
            name, schemaNames, measurements, dataTypes, encodings, compressors);
2397
        putBack(session);
×
2398
        return;
×
2399
      } catch (IoTDBConnectionException e) {
×
2400
        // TException means the connection is broken, remove it and get a new one.
2401
        logger.warn(CREATE_SCHEMA_TEMPLATE_FAIL, e);
×
2402
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2403
      } catch (StatementExecutionException | RuntimeException e) {
×
2404
        putBack(session);
×
2405
        throw e;
×
2406
      } catch (Throwable e) {
×
2407
        logger.error(CREATE_SCHEMA_TEMPLATE_ERROR_MSG, e);
×
2408
        putBack(session);
×
2409
        throw new RuntimeException(e);
×
2410
      }
×
2411
    }
2412
  }
×
2413

2414
  @Override
2415
  public void addAlignedMeasurementsInTemplate(
2416
      String templateName,
2417
      List<String> measurementsPath,
2418
      List<TSDataType> dataTypes,
2419
      List<TSEncoding> encodings,
2420
      List<CompressionType> compressors)
2421
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2422
    for (int i = 0; i < RETRY; i++) {
×
2423
      ISession session = getSession();
×
2424
      try {
2425
        session.addAlignedMeasurementsInTemplate(
×
2426
            templateName, measurementsPath, dataTypes, encodings, compressors);
2427
        putBack(session);
×
2428
        return;
×
2429
      } catch (IoTDBConnectionException e) {
×
2430
        // TException means the connection is broken, remove it and get a new one.
2431
        logger.warn("addAlignedMeasurementsInTemplate failed", e);
×
2432
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2433
      } catch (StatementExecutionException | RuntimeException e) {
×
2434
        putBack(session);
×
2435
        throw e;
×
2436
      } catch (Throwable e) {
×
2437
        logger.error("unexpected error in addAlignedMeasurementsInTemplate", e);
×
2438
        putBack(session);
×
2439
        throw new RuntimeException(e);
×
2440
      }
×
2441
    }
2442
  }
×
2443

2444
  @Override
2445
  public void addAlignedMeasurementInTemplate(
2446
      String templateName,
2447
      String measurementPath,
2448
      TSDataType dataType,
2449
      TSEncoding encoding,
2450
      CompressionType compressor)
2451
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2452
    for (int i = 0; i < RETRY; i++) {
×
2453
      ISession session = getSession();
×
2454
      try {
2455
        session.addAlignedMeasurementInTemplate(
×
2456
            templateName, measurementPath, dataType, encoding, compressor);
2457
        putBack(session);
×
2458
        return;
×
2459
      } catch (IoTDBConnectionException e) {
×
2460
        // TException means the connection is broken, remove it and get a new one.
2461
        logger.warn("addAlignedMeasurementInTemplate failed", e);
×
2462
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2463
      } catch (StatementExecutionException | RuntimeException e) {
×
2464
        putBack(session);
×
2465
        throw e;
×
2466
      } catch (Throwable e) {
×
2467
        logger.error("unexpected error in addAlignedMeasurementInTemplate", e);
×
2468
        putBack(session);
×
2469
        throw new RuntimeException(e);
×
2470
      }
×
2471
    }
2472
  }
×
2473

2474
  @Override
2475
  public void addUnalignedMeasurementsInTemplate(
2476
      String templateName,
2477
      List<String> measurementsPath,
2478
      List<TSDataType> dataTypes,
2479
      List<TSEncoding> encodings,
2480
      List<CompressionType> compressors)
2481
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2482
    for (int i = 0; i < RETRY; i++) {
×
2483
      ISession session = getSession();
×
2484
      try {
2485
        session.addUnalignedMeasurementsInTemplate(
×
2486
            templateName, measurementsPath, dataTypes, encodings, compressors);
2487
        putBack(session);
×
2488
        return;
×
2489
      } catch (IoTDBConnectionException e) {
×
2490
        // TException means the connection is broken, remove it and get a new one.
2491
        logger.warn("addUnalignedMeasurementsInTemplate failed", e);
×
2492
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2493
      } catch (StatementExecutionException | RuntimeException e) {
×
2494
        putBack(session);
×
2495
        throw e;
×
2496
      } catch (Throwable e) {
×
2497
        logger.error("unexpected error in addUnalignedMeasurementsInTemplate", e);
×
2498
        putBack(session);
×
2499
        throw new RuntimeException(e);
×
2500
      }
×
2501
    }
2502
  }
×
2503

2504
  @Override
2505
  public void addUnalignedMeasurementInTemplate(
2506
      String templateName,
2507
      String measurementPath,
2508
      TSDataType dataType,
2509
      TSEncoding encoding,
2510
      CompressionType compressor)
2511
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2512
    for (int i = 0; i < RETRY; i++) {
×
2513
      ISession session = getSession();
×
2514
      try {
2515
        session.addUnalignedMeasurementInTemplate(
×
2516
            templateName, measurementPath, dataType, encoding, compressor);
2517
        putBack(session);
×
2518
        return;
×
2519
      } catch (IoTDBConnectionException e) {
×
2520
        // TException means the connection is broken, remove it and get a new one.
2521
        logger.warn("addUnalignedMeasurementInTemplate failed", e);
×
2522
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2523
      } catch (StatementExecutionException | RuntimeException e) {
×
2524
        putBack(session);
×
2525
        throw e;
×
2526
      } catch (Throwable e) {
×
2527
        logger.error("unexpected error in addUnalignedMeasurementInTemplate", e);
×
2528
        putBack(session);
×
2529
        throw new RuntimeException(e);
×
2530
      }
×
2531
    }
2532
  }
×
2533

2534
  @Override
2535
  public void deleteNodeInTemplate(String templateName, String path)
2536
      throws IOException, IoTDBConnectionException, StatementExecutionException {
2537
    for (int i = 0; i < RETRY; i++) {
×
2538
      ISession session = getSession();
×
2539
      try {
2540
        session.deleteNodeInTemplate(templateName, path);
×
2541
        putBack(session);
×
2542
        return;
×
2543
      } catch (IoTDBConnectionException e) {
×
2544
        // TException means the connection is broken, remove it and get a new one.
2545
        logger.warn("deleteNodeInTemplate failed", e);
×
2546
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2547
      } catch (StatementExecutionException | RuntimeException e) {
×
2548
        putBack(session);
×
2549
        throw e;
×
2550
      } catch (Throwable e) {
×
2551
        logger.error("unexpected error in deleteNodeInTemplate", e);
×
2552
        putBack(session);
×
2553
        throw new RuntimeException(e);
×
2554
      }
×
2555
    }
2556
  }
×
2557

2558
  @Override
2559
  public int countMeasurementsInTemplate(String name)
2560
      throws StatementExecutionException, IoTDBConnectionException {
2561
    for (int i = 0; i < RETRY; i++) {
×
2562
      ISession session = getSession();
×
2563
      try {
2564
        int resp = session.countMeasurementsInTemplate(name);
×
2565
        putBack(session);
×
2566
        return resp;
×
2567
      } catch (IoTDBConnectionException e) {
×
2568
        // TException means the connection is broken, remove it and get a new one.
2569
        logger.warn("countMeasurementsInTemplate failed", e);
×
2570
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2571
      } catch (StatementExecutionException | RuntimeException e) {
×
2572
        putBack(session);
×
2573
        throw e;
×
2574
      } catch (Throwable e) {
×
2575
        logger.error("unexpected error in countMeasurementsInTemplate", e);
×
2576
        putBack(session);
×
2577
        throw new RuntimeException(e);
×
2578
      }
×
2579
    }
2580
    return -1;
×
2581
  }
2582

2583
  @Override
2584
  public boolean isMeasurementInTemplate(String templateName, String path)
2585
      throws StatementExecutionException, IoTDBConnectionException {
2586
    for (int i = 0; i < RETRY; i++) {
×
2587
      ISession session = getSession();
×
2588
      try {
2589
        boolean resp = session.isMeasurementInTemplate(templateName, path);
×
2590
        putBack(session);
×
2591
        return resp;
×
2592
      } catch (IoTDBConnectionException e) {
×
2593
        // TException means the connection is broken, remove it and get a new one.
2594
        logger.warn("isMeasurementInTemplate failed", e);
×
2595
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2596
      } catch (StatementExecutionException | RuntimeException e) {
×
2597
        putBack(session);
×
2598
        throw e;
×
2599
      } catch (Throwable e) {
×
2600
        logger.error("unexpected error in isMeasurementInTemplate", e);
×
2601
        putBack(session);
×
2602
        throw new RuntimeException(e);
×
2603
      }
×
2604
    }
2605
    return false;
×
2606
  }
2607

2608
  @Override
2609
  public boolean isPathExistInTemplate(String templateName, String path)
2610
      throws StatementExecutionException, IoTDBConnectionException {
2611
    for (int i = 0; i < RETRY; i++) {
×
2612
      ISession session = getSession();
×
2613
      try {
2614
        boolean resp = session.isPathExistInTemplate(templateName, path);
×
2615
        putBack(session);
×
2616
        return resp;
×
2617
      } catch (IoTDBConnectionException e) {
×
2618
        // TException means the connection is broken, remove it and get a new one.
2619
        logger.warn("isPathExistInTemplata failed", e);
×
2620
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2621
      } catch (StatementExecutionException | RuntimeException e) {
×
2622
        putBack(session);
×
2623
        throw e;
×
2624
      } catch (Throwable e) {
×
2625
        logger.error("unexpected error in isPathExistInTemplate", e);
×
2626
        putBack(session);
×
2627
        throw new RuntimeException(e);
×
2628
      }
×
2629
    }
2630
    return false;
×
2631
  }
2632

2633
  @Override
2634
  public List<String> showMeasurementsInTemplate(String templateName)
2635
      throws StatementExecutionException, IoTDBConnectionException {
2636
    for (int i = 0; i < RETRY; i++) {
×
2637
      ISession session = getSession();
×
2638
      try {
2639
        List<String> resp = session.showMeasurementsInTemplate(templateName);
×
2640
        putBack(session);
×
2641
        return resp;
×
2642
      } catch (IoTDBConnectionException e) {
×
2643
        // TException means the connection is broken, remove it and get a new one.
2644
        logger.warn("showMeasurementsInTemplate failed", e);
×
2645
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2646
      } catch (StatementExecutionException | RuntimeException e) {
×
2647
        putBack(session);
×
2648
        throw e;
×
2649
      } catch (Throwable e) {
×
2650
        logger.error("unexpected error in showMeasurementsInTemplate", e);
×
2651
        putBack(session);
×
2652
        throw new RuntimeException(e);
×
2653
      }
×
2654
    }
2655
    return null;
×
2656
  }
2657

2658
  @Override
2659
  public List<String> showMeasurementsInTemplate(String templateName, String pattern)
2660
      throws StatementExecutionException, IoTDBConnectionException {
2661
    for (int i = 0; i < RETRY; i++) {
×
2662
      ISession session = getSession();
×
2663
      try {
2664
        List<String> resp = session.showMeasurementsInTemplate(templateName, pattern);
×
2665
        putBack(session);
×
2666
        return resp;
×
2667
      } catch (IoTDBConnectionException e) {
×
2668
        // TException means the connection is broken, remove it and get a new one.
2669
        logger.warn("showMeasurementsInTemplate failed", e);
×
2670
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2671
      } catch (StatementExecutionException | RuntimeException e) {
×
2672
        putBack(session);
×
2673
        throw e;
×
2674
      } catch (Throwable e) {
×
2675
        logger.error("unexpected error in showMeasurementsInTemplate", e);
×
2676
        putBack(session);
×
2677
        throw new RuntimeException(e);
×
2678
      }
×
2679
    }
2680
    return null;
×
2681
  }
2682

2683
  @Override
2684
  public List<String> showAllTemplates()
2685
      throws StatementExecutionException, IoTDBConnectionException {
2686
    for (int i = 0; i < RETRY; i++) {
×
2687
      ISession session = getSession();
×
2688
      try {
2689
        List<String> resp = session.showAllTemplates();
×
2690
        putBack(session);
×
2691
        return resp;
×
2692
      } catch (IoTDBConnectionException e) {
×
2693
        // TException means the connection is broken, remove it and get a new one.
2694
        logger.warn("showAllTemplates failed", e);
×
2695
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2696
      } catch (StatementExecutionException | RuntimeException e) {
×
2697
        putBack(session);
×
2698
        throw e;
×
2699
      } catch (Throwable e) {
×
2700
        logger.error("unexpected error in showAllTemplates", e);
×
2701
        putBack(session);
×
2702
        throw new RuntimeException(e);
×
2703
      }
×
2704
    }
2705
    return null;
×
2706
  }
2707

2708
  @Override
2709
  public List<String> showPathsTemplateSetOn(String templateName)
2710
      throws StatementExecutionException, IoTDBConnectionException {
2711
    for (int i = 0; i < RETRY; i++) {
×
2712
      ISession session = getSession();
×
2713
      try {
2714
        List<String> resp = session.showPathsTemplateSetOn(templateName);
×
2715
        putBack(session);
×
2716
        return resp;
×
2717
      } catch (IoTDBConnectionException e) {
×
2718
        // TException means the connection is broken, remove it and get a new one.
2719
        logger.warn("showPathsTemplateSetOn failed", e);
×
2720
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2721
      } catch (StatementExecutionException | RuntimeException e) {
×
2722
        putBack(session);
×
2723
        throw e;
×
2724
      } catch (Throwable e) {
×
2725
        logger.error("unexpected error in showPathsTemplateSetOn", e);
×
2726
        putBack(session);
×
2727
        throw new RuntimeException(e);
×
2728
      }
×
2729
    }
2730
    return null;
×
2731
  }
2732

2733
  @Override
2734
  public List<String> showPathsTemplateUsingOn(String templateName)
2735
      throws StatementExecutionException, IoTDBConnectionException {
2736
    for (int i = 0; i < RETRY; i++) {
×
2737
      ISession session = getSession();
×
2738
      try {
2739
        List<String> resp = session.showPathsTemplateUsingOn(templateName);
×
2740
        putBack(session);
×
2741
        return resp;
×
2742
      } catch (IoTDBConnectionException e) {
×
2743
        // TException means the connection is broken, remove it and get a new one.
2744
        logger.warn("showPathsTemplateUsingOn failed", e);
×
2745
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2746
      } catch (StatementExecutionException | RuntimeException e) {
×
2747
        putBack(session);
×
2748
        throw e;
×
2749
      } catch (Throwable e) {
×
2750
        logger.error("unexpected error in showPathsTemplateUsingOn", e);
×
2751
        putBack(session);
×
2752
        throw new RuntimeException(e);
×
2753
      }
×
2754
    }
2755
    return null;
×
2756
  }
2757

2758
  @Override
2759
  public void sortTablet(Tablet tablet) throws IoTDBConnectionException {
2760
    ISession session = getSession();
×
2761
    session.sortTablet(tablet);
×
2762
    putBack(session);
×
2763
  }
×
2764

2765
  @Override
2766
  public void setSchemaTemplate(String templateName, String prefixPath)
2767
      throws StatementExecutionException, IoTDBConnectionException {
2768
    for (int i = 0; i < RETRY; i++) {
×
2769
      ISession session = getSession();
×
2770
      try {
2771
        session.setSchemaTemplate(templateName, prefixPath);
×
2772
        putBack(session);
×
2773
        return;
×
2774
      } catch (IoTDBConnectionException e) {
×
2775
        // TException means the connection is broken, remove it and get a new one.
2776
        logger.warn(
×
2777
            String.format("setSchemaTemplate [%s] on [%s] failed", templateName, prefixPath), e);
×
2778
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2779
      } catch (StatementExecutionException | RuntimeException e) {
×
2780
        putBack(session);
×
2781
        throw e;
×
2782
      } catch (Throwable e) {
×
2783
        logger.error("unexpected error in setSchemaTemplate", e);
×
2784
        putBack(session);
×
2785
        throw new RuntimeException(e);
×
2786
      }
×
2787
    }
2788
  }
×
2789

2790
  @Override
2791
  public void unsetSchemaTemplate(String prefixPath, String templateName)
2792
      throws StatementExecutionException, IoTDBConnectionException {
2793
    for (int i = 0; i < RETRY; i++) {
×
2794
      ISession session = getSession();
×
2795
      try {
2796
        session.unsetSchemaTemplate(prefixPath, templateName);
×
2797
        putBack(session);
×
2798
        return;
×
2799
      } catch (IoTDBConnectionException e) {
×
2800
        // TException means the connection is broken, remove it and get a new one.
2801
        logger.warn(
×
2802
            String.format("unsetSchemaTemplate [%s] on [%s] failed", templateName, prefixPath), e);
×
2803
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2804
      } catch (StatementExecutionException | RuntimeException e) {
×
2805
        putBack(session);
×
2806
        throw e;
×
2807
      } catch (Throwable e) {
×
2808
        logger.error("unexpected error in unsetSchemaTemplate", e);
×
2809
        putBack(session);
×
2810
        throw new RuntimeException(e);
×
2811
      }
×
2812
    }
2813
  }
×
2814

2815
  @Override
2816
  public void dropSchemaTemplate(String templateName)
2817
      throws StatementExecutionException, IoTDBConnectionException {
2818
    for (int i = 0; i < RETRY; i++) {
×
2819
      ISession session = getSession();
×
2820
      try {
2821
        session.dropSchemaTemplate(templateName);
×
2822
        putBack(session);
×
2823
        return;
×
2824
      } catch (IoTDBConnectionException e) {
×
2825
        // TException means the connection is broken, remove it and get a new one.
2826
        logger.warn(String.format("dropSchemaTemplate [%s] failed", templateName), e);
×
2827
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2828
      } catch (StatementExecutionException | RuntimeException e) {
×
2829
        putBack(session);
×
2830
        throw e;
×
2831
      } catch (Throwable e) {
×
2832
        logger.error("unexpected error in dropSchemaTemplate", e);
×
2833
        putBack(session);
×
2834
        throw new RuntimeException(e);
×
2835
      }
×
2836
    }
2837
  }
×
2838

2839
  public void createTimeseriesUsingSchemaTemplate(List<String> devicePathList)
2840
      throws StatementExecutionException, IoTDBConnectionException {
2841
    for (int i = 0; i < RETRY; i++) {
×
2842
      ISession session = getSession();
×
2843
      try {
2844
        session.createTimeseriesUsingSchemaTemplate(devicePathList);
×
2845
        putBack(session);
×
2846
        return;
×
2847
      } catch (IoTDBConnectionException e) {
×
2848
        // TException means the connection is broken, remove it and get a new one.
2849
        logger.warn(
×
2850
            String.format("createTimeseriesOfSchemaTemplate [%s] failed", devicePathList), e);
×
2851
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2852
      } catch (StatementExecutionException | RuntimeException e) {
×
2853
        putBack(session);
×
2854
        throw e;
×
2855
      } catch (Throwable e) {
×
2856
        logger.error("unexpected error in createTimeseriesUsingSchemaTemplate", e);
×
2857
        putBack(session);
×
2858
        throw new RuntimeException(e);
×
2859
      }
×
2860
    }
2861
  }
×
2862

2863
  /**
2864
   * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
2865
   * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
2866
   * more.
2867
   *
2868
   * @param sql query statement
2869
   * @return result set Notice that you must get the result instance. Otherwise a data leakage will
2870
   *     happen
2871
   */
2872
  @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
2873
  @Override
2874
  public SessionDataSetWrapper executeQueryStatement(String sql)
2875
      throws IoTDBConnectionException, StatementExecutionException {
2876
    for (int i = 0; i < RETRY; i++) {
×
2877
      ISession session = getSession();
×
2878
      try {
2879
        SessionDataSet resp = session.executeQueryStatement(sql);
×
2880
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
2881
        occupy(session);
×
2882
        return wrapper;
×
2883
      } catch (IoTDBConnectionException e) {
×
2884
        // TException means the connection is broken, remove it and get a new one.
2885
        logger.warn("executeQueryStatement failed", e);
×
2886
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2887
      } catch (StatementExecutionException | RuntimeException e) {
×
2888
        putBack(session);
×
2889
        throw e;
×
2890
      } catch (Throwable e) {
×
2891
        logger.error("unexpected error in executeQueryStatement", e);
×
2892
        putBack(session);
×
2893
        throw new RuntimeException(e);
×
2894
      }
×
2895
    }
2896
    // never go here
2897
    return null;
×
2898
  }
2899

2900
  /**
2901
   * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
2902
   * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
2903
   * more.
2904
   *
2905
   * @param sql query statement
2906
   * @param timeoutInMs the timeout of this query, in milliseconds
2907
   * @return result set Notice that you must get the result instance. Otherwise a data leakage will
2908
   *     happen
2909
   */
2910
  @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
2911
  @Override
2912
  public SessionDataSetWrapper executeQueryStatement(String sql, long timeoutInMs)
2913
      throws IoTDBConnectionException, StatementExecutionException {
2914
    for (int i = 0; i < RETRY; i++) {
×
2915
      ISession session = getSession();
×
2916
      try {
2917
        SessionDataSet resp = session.executeQueryStatement(sql, timeoutInMs);
×
2918
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
2919
        occupy(session);
×
2920
        return wrapper;
×
2921
      } catch (IoTDBConnectionException e) {
×
2922
        // TException means the connection is broken, remove it and get a new one.
2923
        logger.warn("executeQueryStatement failed", e);
×
2924
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2925
      } catch (StatementExecutionException | RuntimeException e) {
×
2926
        putBack(session);
×
2927
        throw e;
×
2928
      } catch (Throwable e) {
×
2929
        logger.error("unexpected error in executeQueryStatement", e);
×
2930
        putBack(session);
×
2931
        throw new RuntimeException(e);
×
2932
      }
×
2933
    }
2934
    // never go here
2935
    return null;
×
2936
  }
2937

2938
  /**
2939
   * execute non query statement
2940
   *
2941
   * @param sql non query statement
2942
   */
2943
  @Override
2944
  public void executeNonQueryStatement(String sql)
2945
      throws StatementExecutionException, IoTDBConnectionException {
2946
    for (int i = 0; i < RETRY; i++) {
×
2947
      ISession session = getSession();
×
2948
      try {
2949
        session.executeNonQueryStatement(sql);
×
2950
        putBack(session);
×
2951
        return;
×
2952
      } catch (IoTDBConnectionException e) {
×
2953
        // TException means the connection is broken, remove it and get a new one.
2954
        logger.warn("executeNonQueryStatement failed", e);
×
2955
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2956
      } catch (StatementExecutionException | RuntimeException e) {
×
2957
        putBack(session);
×
2958
        throw e;
×
2959
      } catch (Throwable e) {
×
2960
        logger.error("unexpected error in executeNonQueryStatement", e);
×
2961
        putBack(session);
×
2962
        throw new RuntimeException(e);
×
2963
      }
×
2964
    }
2965
  }
×
2966

2967
  @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
2968
  @Override
2969
  public SessionDataSetWrapper executeRawDataQuery(
2970
      List<String> paths, long startTime, long endTime, long timeOut)
2971
      throws IoTDBConnectionException, StatementExecutionException {
2972
    for (int i = 0; i < RETRY; i++) {
×
2973
      ISession session = getSession();
×
2974
      try {
2975
        SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime, timeOut);
×
2976
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
2977
        occupy(session);
×
2978
        return wrapper;
×
2979
      } catch (IoTDBConnectionException e) {
×
2980
        // TException means the connection is broken, remove it and get a new one.
2981
        logger.warn("executeRawDataQuery failed", e);
×
2982
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
2983
      } catch (StatementExecutionException | RuntimeException e) {
×
2984
        putBack(session);
×
2985
        throw e;
×
2986
      } catch (Throwable e) {
×
2987
        logger.error("unexpected error in executeRawDataQuery", e);
×
2988
        putBack(session);
×
2989
        throw new RuntimeException(e);
×
2990
      }
×
2991
    }
2992
    // never go here
2993
    return null;
×
2994
  }
2995

2996
  @Override
2997
  public SessionDataSetWrapper executeLastDataQuery(List<String> paths, long lastTime)
2998
      throws StatementExecutionException, IoTDBConnectionException {
2999
    for (int i = 0; i < RETRY; i++) {
×
3000
      ISession session = getSession();
×
3001
      try {
3002
        SessionDataSet resp = session.executeLastDataQuery(paths, lastTime);
×
3003
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3004
        occupy(session);
×
3005
        return wrapper;
×
3006
      } catch (IoTDBConnectionException e) {
×
3007
        // TException means the connection is broken, remove it and get a new one.
3008
        logger.warn(EXECUTE_LASTDATAQUERY_FAIL, e);
×
3009
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3010
      } catch (StatementExecutionException | RuntimeException e) {
×
3011
        putBack(session);
×
3012
        throw e;
×
3013
      } catch (Throwable e) {
×
3014
        logger.error(EXECUTE_LASTDATAQUERY_ERROR, e);
×
3015
        putBack(session);
×
3016
        throw new RuntimeException(e);
×
3017
      }
×
3018
    }
3019
    // never go here
3020
    return null;
×
3021
  }
3022

3023
  @Override
3024
  public SessionDataSetWrapper executeLastDataQuery(List<String> paths, long lastTime, long timeOut)
3025
      throws StatementExecutionException, IoTDBConnectionException {
3026
    for (int i = 0; i < RETRY; i++) {
×
3027
      ISession session = getSession();
×
3028
      try {
3029
        SessionDataSet resp = session.executeLastDataQuery(paths, lastTime, timeOut);
×
3030
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3031
        occupy(session);
×
3032
        return wrapper;
×
3033
      } catch (IoTDBConnectionException e) {
×
3034
        // TException means the connection is broken, remove it and get a new one.
3035
        logger.warn(EXECUTE_LASTDATAQUERY_FAIL, e);
×
3036
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3037
      } catch (StatementExecutionException | RuntimeException e) {
×
3038
        putBack(session);
×
3039
        throw e;
×
3040
      } catch (Throwable e) {
×
3041
        logger.error(EXECUTE_LASTDATAQUERY_ERROR, e);
×
3042
        putBack(session);
×
3043
        throw new RuntimeException(e);
×
3044
      }
×
3045
    }
3046
    // never go here
3047
    return null;
×
3048
  }
3049

3050
  @Override
3051
  public SessionDataSetWrapper executeLastDataQuery(List<String> paths)
3052
      throws StatementExecutionException, IoTDBConnectionException {
3053
    for (int i = 0; i < RETRY; i++) {
×
3054
      ISession session = getSession();
×
3055
      try {
3056
        SessionDataSet resp = session.executeLastDataQuery(paths);
×
3057
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3058
        occupy(session);
×
3059
        return wrapper;
×
3060
      } catch (IoTDBConnectionException e) {
×
3061
        // TException means the connection is broken, remove it and get a new one.
3062
        logger.warn(EXECUTE_LASTDATAQUERY_FAIL, e);
×
3063
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3064
      } catch (StatementExecutionException | RuntimeException e) {
×
3065
        putBack(session);
×
3066
        throw e;
×
3067
      } catch (Throwable e) {
×
3068
        logger.error(EXECUTE_LASTDATAQUERY_ERROR, e);
×
3069
        putBack(session);
×
3070
        throw new RuntimeException(e);
×
3071
      }
×
3072
    }
3073
    // never go here
3074
    return null;
×
3075
  }
3076

3077
  @Override
3078
  public SessionDataSetWrapper executeLastDataQueryForOneDevice(
3079
      String db, String device, List<String> sensors, boolean isLegalPathNodes)
3080
      throws StatementExecutionException, IoTDBConnectionException {
3081
    for (int i = 0; i < RETRY; i++) {
×
3082
      ISession session = getSession();
×
3083
      try {
3084
        SessionDataSet resp =
×
3085
            session.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes);
×
3086
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3087
        occupy(session);
×
3088
        return wrapper;
×
3089
      } catch (IoTDBConnectionException e) {
×
3090
        // TException means the connection is broken, remove it and get a new one.
3091
        logger.warn("executeLastDataQuery failed", e);
×
3092
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3093
      } catch (StatementExecutionException | RuntimeException e) {
×
3094
        putBack(session);
×
3095
        throw e;
×
3096
      }
×
3097
    }
3098
    // never go here
3099
    return null;
×
3100
  }
3101

3102
  @Override
3103
  public SessionDataSetWrapper executeAggregationQuery(
3104
      List<String> paths, List<TAggregationType> aggregations)
3105
      throws StatementExecutionException, IoTDBConnectionException {
3106
    for (int i = 0; i < RETRY; i++) {
×
3107
      ISession session = getSession();
×
3108
      try {
3109
        SessionDataSet resp = session.executeAggregationQuery(paths, aggregations);
×
3110
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3111
        occupy(session);
×
3112
        return wrapper;
×
3113
      } catch (IoTDBConnectionException e) {
×
3114
        // TException means the connection is broken, remove it and get a new one.
3115
        logger.warn(EXECUTE_AGGREGATION_QUERY_FAIL, e);
×
3116
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3117
      } catch (StatementExecutionException | RuntimeException e) {
×
3118
        putBack(session);
×
3119
        throw e;
×
3120
      } catch (Throwable e) {
×
3121
        logger.error(EXECUTE_AGGREGATION_QUERY_ERROR_MSG, e);
×
3122
        putBack(session);
×
3123
        throw new RuntimeException(e);
×
3124
      }
×
3125
    }
3126
    // never go here
3127
    return null;
×
3128
  }
3129

3130
  @Override
3131
  public SessionDataSetWrapper executeAggregationQuery(
3132
      List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime)
3133
      throws StatementExecutionException, IoTDBConnectionException {
3134
    for (int i = 0; i < RETRY; i++) {
×
3135
      ISession session = getSession();
×
3136
      try {
3137
        SessionDataSet resp =
×
3138
            session.executeAggregationQuery(paths, aggregations, startTime, endTime);
×
3139
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3140
        occupy(session);
×
3141
        return wrapper;
×
3142
      } catch (IoTDBConnectionException e) {
×
3143
        // TException means the connection is broken, remove it and get a new one.
3144
        logger.warn(EXECUTE_AGGREGATION_QUERY_FAIL, e);
×
3145
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3146
      } catch (StatementExecutionException | RuntimeException e) {
×
3147
        putBack(session);
×
3148
        throw e;
×
3149
      } catch (Throwable e) {
×
3150
        logger.error(EXECUTE_AGGREGATION_QUERY_ERROR_MSG, e);
×
3151
        putBack(session);
×
3152
        throw new RuntimeException(e);
×
3153
      }
×
3154
    }
3155
    // never go here
3156
    return null;
×
3157
  }
3158

3159
  @Override
3160
  public SessionDataSetWrapper executeAggregationQuery(
3161
      List<String> paths,
3162
      List<TAggregationType> aggregations,
3163
      long startTime,
3164
      long endTime,
3165
      long interval)
3166
      throws StatementExecutionException, IoTDBConnectionException {
3167
    for (int i = 0; i < RETRY; i++) {
×
3168
      ISession session = getSession();
×
3169
      try {
3170
        SessionDataSet resp =
×
3171
            session.executeAggregationQuery(paths, aggregations, startTime, endTime, interval);
×
3172
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3173
        occupy(session);
×
3174
        return wrapper;
×
3175
      } catch (IoTDBConnectionException e) {
×
3176
        // TException means the connection is broken, remove it and get a new one.
3177
        logger.warn(EXECUTE_AGGREGATION_QUERY_FAIL, e);
×
3178
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3179
      } catch (StatementExecutionException | RuntimeException e) {
×
3180
        putBack(session);
×
3181
        throw e;
×
3182
      } catch (Throwable e) {
×
3183
        logger.error(EXECUTE_AGGREGATION_QUERY_ERROR_MSG, e);
×
3184
        putBack(session);
×
3185
        throw new RuntimeException(e);
×
3186
      }
×
3187
    }
3188
    // never go here
3189
    return null;
×
3190
  }
3191

3192
  @Override
3193
  public SessionDataSetWrapper executeAggregationQuery(
3194
      List<String> paths,
3195
      List<TAggregationType> aggregations,
3196
      long startTime,
3197
      long endTime,
3198
      long interval,
3199
      long slidingStep)
3200
      throws StatementExecutionException, IoTDBConnectionException {
3201
    for (int i = 0; i < RETRY; i++) {
×
3202
      ISession session = getSession();
×
3203
      try {
3204
        SessionDataSet resp =
×
3205
            session.executeAggregationQuery(
×
3206
                paths, aggregations, startTime, endTime, interval, slidingStep);
3207
        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
×
3208
        occupy(session);
×
3209
        return wrapper;
×
3210
      } catch (IoTDBConnectionException e) {
×
3211
        // TException means the connection is broken, remove it and get a new one.
3212
        logger.warn(EXECUTE_AGGREGATION_QUERY_FAIL, e);
×
3213
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3214
      } catch (StatementExecutionException | RuntimeException e) {
×
3215
        putBack(session);
×
3216
        throw e;
×
3217
      } catch (Throwable e) {
×
3218
        logger.error(EXECUTE_AGGREGATION_QUERY_ERROR_MSG, e);
×
3219
        putBack(session);
×
3220
        throw new RuntimeException(e);
×
3221
      }
×
3222
    }
3223
    // never go here
3224
    return null;
×
3225
  }
3226

3227
  @Override
3228
  public int getMaxSize() {
3229
    return maxSize;
1✔
3230
  }
3231

3232
  @Override
3233
  public String getHost() {
3234
    return host;
1✔
3235
  }
3236

3237
  @Override
3238
  public int getPort() {
3239
    return port;
1✔
3240
  }
3241

3242
  @Override
3243
  public String getUser() {
3244
    return user;
1✔
3245
  }
3246

3247
  @Override
3248
  public String getPassword() {
3249
    return password;
1✔
3250
  }
3251

3252
  @Override
3253
  public void setFetchSize(int fetchSize) {
3254
    this.fetchSize = fetchSize;
×
3255
    for (ISession session : queue) {
×
3256
      session.setFetchSize(fetchSize);
×
3257
    }
×
3258
    for (ISession session : occupied.keySet()) {
×
3259
      session.setFetchSize(fetchSize);
×
3260
    }
×
3261
  }
×
3262

3263
  @Override
3264
  public int getFetchSize() {
3265
    return fetchSize;
1✔
3266
  }
3267

3268
  @Override
3269
  public void setTimeZone(String zoneId)
3270
      throws StatementExecutionException, IoTDBConnectionException {
3271
    for (int i = 0; i < RETRY; i++) {
×
3272
      ISession session = getSession();
×
3273
      try {
3274
        session.setTimeZone(zoneId);
×
3275
        putBack(session);
×
3276
      } catch (IoTDBConnectionException e) {
×
3277
        // TException means the connection is broken, remove it and get a new one.
3278
        logger.warn(String.format("setTimeZone to [%s] failed", zoneId), e);
×
3279
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3280
      } catch (StatementExecutionException | RuntimeException e) {
×
3281
        putBack(session);
×
3282
        throw e;
×
3283
      }
×
3284
    }
3285
    this.zoneId = ZoneId.of(zoneId);
×
3286
    for (ISession session : queue) {
×
3287
      session.setTimeZoneOfSession(zoneId);
×
3288
    }
×
3289
    for (ISession session : occupied.keySet()) {
×
3290
      session.setTimeZoneOfSession(zoneId);
×
3291
    }
×
3292
  }
×
3293

3294
  @Override
3295
  public ZoneId getZoneId() {
3296
    return zoneId;
1✔
3297
  }
3298

3299
  @Override
3300
  public long getWaitToGetSessionTimeoutInMs() {
3301
    return waitToGetSessionTimeoutInMs;
1✔
3302
  }
3303

3304
  @Override
3305
  public boolean isEnableCompression() {
3306
    return enableCompression;
1✔
3307
  }
3308

3309
  @Override
3310
  public void setEnableRedirection(boolean enableRedirection) {
3311
    this.enableRedirection = enableRedirection;
×
3312
    if (this.enableRedirection) {
×
3313
      deviceIdToEndpoint = new ConcurrentHashMap<>();
×
3314
    }
3315
    for (ISession session : queue) {
×
3316
      session.setEnableRedirection(enableRedirection);
×
3317
    }
×
3318
    for (ISession session : occupied.keySet()) {
×
3319
      session.setEnableRedirection(enableRedirection);
×
3320
    }
×
3321
  }
×
3322

3323
  @Override
3324
  public boolean isEnableRedirection() {
3325
    return enableRedirection;
1✔
3326
  }
3327

3328
  @Override
3329
  public void setEnableQueryRedirection(boolean enableQueryRedirection) {
3330
    this.enableQueryRedirection = enableQueryRedirection;
×
3331
    for (ISession session : queue) {
×
3332
      session.setEnableQueryRedirection(enableQueryRedirection);
×
3333
    }
×
3334
    for (ISession session : occupied.keySet()) {
×
3335
      session.setEnableQueryRedirection(enableQueryRedirection);
×
3336
    }
×
3337
  }
×
3338

3339
  @Override
3340
  public boolean isEnableQueryRedirection() {
3341
    return enableQueryRedirection;
×
3342
  }
3343

3344
  @Override
3345
  public int getConnectionTimeoutInMs() {
3346
    return connectionTimeoutInMs;
1✔
3347
  }
3348

3349
  @Override
3350
  public TSBackupConfigurationResp getBackupConfiguration()
3351
      throws IoTDBConnectionException, StatementExecutionException {
3352
    for (int i = 0; i < RETRY; i++) {
×
3353
      ISession session = getSession();
×
3354
      try {
3355
        TSBackupConfigurationResp resp = session.getBackupConfiguration();
×
3356
        putBack(session);
×
3357
        return resp;
×
3358
      } catch (IoTDBConnectionException e) {
×
3359
        // TException means the connection is broken, remove it and get a new one.
3360
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3361
      } catch (RuntimeException e) {
×
3362
        putBack(session);
×
3363
        throw e;
×
3364
      }
×
3365
    }
3366
    return null;
×
3367
  }
3368

3369
  @Override
3370
  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
3371

3372
    for (int i = 0; i < RETRY; i++) {
×
3373
      ISession session = getSession();
×
3374
      try {
3375
        TSConnectionInfoResp resp = session.fetchAllConnections();
×
3376
        putBack(session);
×
3377
        return resp;
×
3378
      } catch (IoTDBConnectionException e) {
×
3379
        // TException means the connection is broken, remove it and get a new one.
3380
        logger.warn("fetchAllConnections failed", e);
×
3381
        cleanSessionAndMayThrowConnectionException(session, i, e);
×
3382
      } catch (Throwable t) {
×
3383
        putBack(session);
×
3384
        throw t;
×
3385
      }
×
3386
    }
3387
    return null;
×
3388
  }
3389

3390
  @Override
3391
  public void setVersion(Version version) {
3392
    this.version = version;
×
3393
    for (ISession session : queue) {
×
3394
      session.setVersion(version);
×
3395
    }
×
3396
    for (ISession session : occupied.keySet()) {
×
3397
      session.setVersion(version);
×
3398
    }
×
3399
  }
×
3400

3401
  @Override
3402
  public Version getVersion() {
3403
    return version;
1✔
3404
  }
3405

3406
  @Override
3407
  public void setQueryTimeout(long timeoutInMs) {
3408
    this.queryTimeoutInMs = timeoutInMs;
×
3409
    for (ISession session : queue) {
×
3410
      session.setQueryTimeout(timeoutInMs);
×
3411
    }
×
3412
    for (ISession session : occupied.keySet()) {
×
3413
      session.setQueryTimeout(timeoutInMs);
×
3414
    }
×
3415
  }
×
3416

3417
  @Override
3418
  public long getQueryTimeout() {
3419
    return queryTimeoutInMs;
×
3420
  }
3421

3422
  public static class Builder {
1✔
3423

3424
    private String host = SessionConfig.DEFAULT_HOST;
1✔
3425
    private int port = SessionConfig.DEFAULT_PORT;
1✔
3426
    private List<String> nodeUrls = null;
1✔
3427
    private int maxSize = SessionConfig.DEFAULT_SESSION_POOL_MAX_SIZE;
1✔
3428
    private String user = SessionConfig.DEFAULT_USER;
1✔
3429
    private String pw = SessionConfig.DEFAULT_PASSWORD;
1✔
3430
    private int fetchSize = SessionConfig.DEFAULT_FETCH_SIZE;
1✔
3431
    private long waitToGetSessionTimeoutInMs = 60_000;
1✔
3432
    private int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
1✔
3433
    private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
1✔
3434
    private boolean enableCompression = false;
1✔
3435
    private ZoneId zoneId = null;
1✔
3436
    private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
1✔
3437
    private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
1✔
3438
    private Version version = SessionConfig.DEFAULT_VERSION;
1✔
3439

3440
    public Builder host(String host) {
3441
      this.host = host;
1✔
3442
      return this;
1✔
3443
    }
3444

3445
    public Builder port(int port) {
3446
      this.port = port;
1✔
3447
      return this;
1✔
3448
    }
3449

3450
    public Builder nodeUrls(List<String> nodeUrls) {
3451
      this.nodeUrls = nodeUrls;
×
3452
      return this;
×
3453
    }
3454

3455
    public Builder maxSize(int maxSize) {
3456
      this.maxSize = maxSize;
1✔
3457
      return this;
1✔
3458
    }
3459

3460
    public Builder user(String user) {
3461
      this.user = user;
1✔
3462
      return this;
1✔
3463
    }
3464

3465
    public Builder password(String password) {
3466
      this.pw = password;
1✔
3467
      return this;
1✔
3468
    }
3469

3470
    public Builder fetchSize(int fetchSize) {
3471
      this.fetchSize = fetchSize;
1✔
3472
      return this;
1✔
3473
    }
3474

3475
    public Builder zoneId(ZoneId zoneId) {
3476
      this.zoneId = zoneId;
1✔
3477
      return this;
1✔
3478
    }
3479

3480
    public Builder waitToGetSessionTimeoutInMs(long waitToGetSessionTimeoutInMs) {
3481
      this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
1✔
3482
      return this;
1✔
3483
    }
3484

3485
    public Builder thriftDefaultBufferSize(int thriftDefaultBufferSize) {
3486
      this.thriftDefaultBufferSize = thriftDefaultBufferSize;
×
3487
      return this;
×
3488
    }
3489

3490
    public Builder thriftMaxFrameSize(int thriftMaxFrameSize) {
3491
      this.thriftMaxFrameSize = thriftMaxFrameSize;
×
3492
      return this;
×
3493
    }
3494

3495
    public Builder enableCompression(boolean enableCompression) {
3496
      this.enableCompression = enableCompression;
1✔
3497
      return this;
1✔
3498
    }
3499

3500
    public Builder enableRedirection(boolean enableRedirection) {
3501
      this.enableRedirection = enableRedirection;
1✔
3502
      return this;
1✔
3503
    }
3504

3505
    public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
3506
      this.connectionTimeoutInMs = connectionTimeoutInMs;
1✔
3507
      return this;
1✔
3508
    }
3509

3510
    public Builder version(Version version) {
3511
      this.version = version;
1✔
3512
      return this;
1✔
3513
    }
3514

3515
    public SessionPool build() {
3516
      if (nodeUrls == null) {
1✔
3517
        return new SessionPool(
1✔
3518
            host,
3519
            port,
3520
            user,
3521
            pw,
3522
            maxSize,
3523
            fetchSize,
3524
            waitToGetSessionTimeoutInMs,
3525
            enableCompression,
3526
            zoneId,
3527
            enableRedirection,
3528
            connectionTimeoutInMs,
3529
            version,
3530
            thriftDefaultBufferSize,
3531
            thriftMaxFrameSize);
3532
      } else {
3533
        return new SessionPool(
×
3534
            nodeUrls,
3535
            user,
3536
            pw,
3537
            maxSize,
3538
            fetchSize,
3539
            waitToGetSessionTimeoutInMs,
3540
            enableCompression,
3541
            zoneId,
3542
            enableRedirection,
3543
            connectionTimeoutInMs,
3544
            version,
3545
            thriftDefaultBufferSize,
3546
            thriftMaxFrameSize);
3547
      }
3548
    }
3549
  }
3550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc