• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9967

30 Aug 2023 04:22PM CUT coverage: 47.7% (+0.04%) from 47.658%
#9967

push

travis_ci

web-flow
Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)

(cherry picked from commit 35736cc67)

12 of 12 new or added lines in 6 files covered. (100.0%)

80165 of 168062 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.24
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.iot.client;
21

22
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
23
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
24
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
25
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
26
import org.apache.iotdb.rpc.TSStatusCode;
27

28
import org.apache.thrift.async.AsyncMethodCallback;
29
import org.slf4j.Logger;
30
import org.slf4j.LoggerFactory;
31

32
import java.util.concurrent.TimeUnit;
33

34
public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> {
35

36
  private final Logger logger = LoggerFactory.getLogger(DispatchLogHandler.class);
1✔
37

38
  private final LogDispatcherThread thread;
39
  private final Batch batch;
40
  private final long createTime;
41
  private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
42
  private int retryCount;
43

44
  public DispatchLogHandler(
45
      LogDispatcherThread thread,
46
      LogDispatcherThreadMetrics logDispatcherThreadMetrics,
47
      Batch batch) {
1✔
48
    this.thread = thread;
1✔
49
    this.logDispatcherThreadMetrics = logDispatcherThreadMetrics;
1✔
50
    this.batch = batch;
1✔
51
    this.createTime = System.nanoTime();
1✔
52
  }
1✔
53

54
  @Override
55
  public void onComplete(TSyncLogEntriesRes response) {
56
    if (response.getStatuses().size() == 1 && needRetry(response.getStatuses().get(0).getCode())) {
1✔
57
      logger.warn(
1✔
58
          "Can not send {} to peer {} for {} times because {}",
59
          batch,
60
          thread.getPeer(),
1✔
61
          ++retryCount,
1✔
62
          response.getStatuses().get(0).getMessage());
1✔
63
      sleepCorrespondingTimeAndRetryAsynchronous();
×
64
    } else {
65
      thread.getSyncStatus().removeBatch(batch);
1✔
66
      // update safely deleted search index after current sync index is updated by removeBatch
67
      thread.updateSafelyDeletedSearchIndex();
1✔
68
    }
69
    logDispatcherThreadMetrics.recordSyncLogTimePerRequest(
1✔
70
        (System.nanoTime() - createTime) / batch.getLogEntries().size());
1✔
71
  }
1✔
72

73
  private boolean needRetry(int statusCode) {
74
    return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
1✔
75
        || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
1✔
76
        || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
1✔
77
  }
78

79
  @Override
80
  public void onError(Exception exception) {
81
    logger.warn(
1✔
82
        "Can not send {} to peer for {} times {} because {}",
83
        batch,
84
        thread.getPeer(),
1✔
85
        ++retryCount,
1✔
86
        exception);
87
    sleepCorrespondingTimeAndRetryAsynchronous();
1✔
88
  }
1✔
89

90
  private void sleepCorrespondingTimeAndRetryAsynchronous() {
91
    long sleepTime =
1✔
92
        Math.min(
1✔
93
            (long)
94
                (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
1✔
95
                    * Math.pow(2, retryCount)),
1✔
96
            thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
1✔
97
    thread
1✔
98
        .getImpl()
1✔
99
        .getRetryService()
1✔
100
        .schedule(
1✔
101
            () -> {
102
              if (thread.isStopped()) {
1✔
103
                logger.debug(
1✔
104
                    "LogDispatcherThread {} has been stopped, "
105
                        + "we will not retrying this Batch {} after {} times",
106
                    thread.getPeer(),
1✔
107
                    batch,
108
                    retryCount);
1✔
109
              } else {
110
                thread.sendBatchAsync(batch, this);
×
111
              }
112
            },
1✔
113
            sleepTime,
114
            TimeUnit.MILLISECONDS);
115
  }
1✔
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc