• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9695

pending completion
#9695

push

travis_ci

web-flow
[IOTDB-6085] NullPointerException in readAsync when Ratis leader is changing (#10706)

8 of 8 new or added lines in 1 file covered. (100.0%)

79237 of 165074 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.47
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.cq;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
24
import org.apache.iotdb.commons.concurrent.ThreadName;
25
import org.apache.iotdb.commons.cq.CQState;
26
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
27
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
28
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
30
import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
31
import org.apache.iotdb.confignode.manager.ConfigManager;
32
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
33
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
34
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
35
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
36
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
37
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
38
import org.apache.iotdb.rpc.TSStatusCode;
39

40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
import java.util.Collections;
44
import java.util.List;
45
import java.util.concurrent.ScheduledExecutorService;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.locks.ReadWriteLock;
48
import java.util.concurrent.locks.ReentrantReadWriteLock;
49

50
public class CQManager {
51

52
  private static final Logger LOGGER = LoggerFactory.getLogger(CQManager.class);
1✔
53

54
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
55

56
  private final ConfigManager configManager;
57

58
  private final ReadWriteLock lock;
59

60
  private ScheduledExecutorService executor;
61

62
  public CQManager(ConfigManager configManager) {
×
63
    this.configManager = configManager;
×
64
    this.lock = new ReentrantReadWriteLock();
×
65
    this.executor =
×
66
        IoTDBThreadPoolFactory.newScheduledThreadPool(
×
67
            CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
68
  }
×
69

70
  public TSStatus createCQ(TCreateCQReq req) {
71
    lock.readLock().lock();
×
72
    try {
73
      ScheduledExecutorService currentExecutor = executor;
×
74
      return configManager.getProcedureManager().createCQ(req, currentExecutor);
×
75
    } finally {
76
      lock.readLock().unlock();
×
77
    }
78
  }
79

80
  public TSStatus dropCQ(TDropCQReq req) {
81
    ConsensusWriteResponse response =
×
82
        configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
×
83
    if (response.getStatus() != null) {
×
84
      return response.getStatus();
×
85
    } else {
86
      LOGGER.warn(
×
87
          "Unexpected error happened while dropping cq {}: ", req.cqId, response.getException());
×
88
      // consensus layer related errors
89
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
90
      res.setMessage(response.getErrorMessage());
×
91
      return res;
×
92
    }
93
  }
94

95
  public TShowCQResp showCQ() {
96
    ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan());
×
97
    if (response.getDataset() != null) {
×
98
      return ((ShowCQResp) response.getDataset()).convertToRpcShowCQResp();
×
99
    } else {
100
      LOGGER.warn("Unexpected error happened while showing cq: ", response.getException());
×
101
      // consensus layer related errors
102
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
103
      res.setMessage(response.getException().toString());
×
104
      return new TShowCQResp(res, Collections.emptyList());
×
105
    }
106
  }
107

108
  public ScheduledExecutorService getExecutor() {
109
    ScheduledExecutorService res;
110
    lock.readLock().lock();
×
111
    try {
112
      res = executor;
×
113
    } finally {
114
      lock.readLock().unlock();
×
115
    }
116
    return res;
×
117
  }
118

119
  public void startCQScheduler() {
120
    try {
121
      /*
122
        TODO: remove this after fixing IOTDB-6085
123
        sleep here because IOTDB-6085: NullPointerException in readAsync when Ratis leader is changing
124
      */
125
      Thread.sleep(1000);
×
126
    } catch (InterruptedException e) {
×
127
      Thread.currentThread().interrupt();
×
128
    }
×
129
    lock.writeLock().lock();
×
130
    try {
131
      // 1. shutdown previous cq schedule thread pool
132
      try {
133
        if (executor != null) {
×
134
          executor.shutdown();
×
135
        }
136
      } catch (Exception t) {
×
137
        // just print the error log because we should make sure we can start a new cq schedule pool
138
        // successfully in the next steps
139
        LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t);
×
140
      }
×
141

142
      // 2. start a new schedule thread pool
143
      executor =
×
144
          IoTDBThreadPoolFactory.newScheduledThreadPool(
×
145
              CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
146

147
      // 3. get all CQs
148
      List<CQInfo.CQEntry> allCQs = null;
×
149
      // wait for consensus layer ready
150
      while (configManager.getConsensusManager() == null) {
×
151
        try {
152
          LOGGER.info("consensus layer is not ready, sleep 1s...");
×
153
          TimeUnit.SECONDS.sleep(1);
×
154
        } catch (InterruptedException e) {
×
155
          Thread.currentThread().interrupt();
×
156
          LOGGER.warn("Unexpected interruption during waiting for consensus layer ready.");
×
157
        }
×
158
      }
159
      // keep fetching until we get all CQEntries if this node is still leader
160
      while (needFetch(allCQs)) {
×
161
        ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan());
×
162
        if (response.getDataset() != null) {
×
163
          allCQs = ((ShowCQResp) response.getDataset()).getCqList();
×
164
        } else {
165
          // consensus layer related errors
166
          LOGGER.warn(
×
167
              "Unexpected error happened while fetching cq list: ", response.getException());
×
168
          try {
169
            Thread.sleep(500);
×
170
          } catch (InterruptedException e) {
×
171
            Thread.currentThread().interrupt();
×
172
          }
×
173
        }
174
      }
×
175

176
      // 4. recover the scheduling of active CQs
177
      if (allCQs != null) {
×
178
        for (CQInfo.CQEntry entry : allCQs) {
×
179
          if (entry.getState() == CQState.ACTIVE) {
×
180
            CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
×
181
            cqScheduleTask.submitSelf();
×
182
          }
183
        }
×
184
      }
185

186
    } finally {
187
      lock.writeLock().unlock();
×
188
    }
189
  }
×
190

191
  private boolean needFetch(List<CQInfo.CQEntry> allCQs) {
192
    return allCQs == null && configManager.getConsensusManager().isLeader();
×
193
  }
194

195
  public void stopCQScheduler() {
196
    ScheduledExecutorService previous;
197
    lock.writeLock().lock();
×
198
    try {
199
      previous = executor;
×
200
      executor = null;
×
201
    } finally {
202
      lock.writeLock().unlock();
×
203
    }
204
    if (previous != null) {
×
205
      previous.shutdown();
×
206
    }
207
  }
×
208
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc