• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9960

30 Aug 2023 04:02AM CUT coverage: 47.758% (+0.001%) from 47.757%
#9960

push

travis_ci

web-flow
[IOTDB-6119] Add ConfigNode leader service check (#10985)

43 of 43 new or added lines in 7 files covered. (100.0%)

80379 of 168305 relevant lines covered (47.76%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.74
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.cq;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
24
import org.apache.iotdb.commons.concurrent.ThreadName;
25
import org.apache.iotdb.commons.cq.CQState;
26
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
27
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
28
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
30
import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
31
import org.apache.iotdb.confignode.manager.ConfigManager;
32
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
33
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
34
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
35
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
36
import org.apache.iotdb.consensus.common.DataSet;
37
import org.apache.iotdb.consensus.exception.ConsensusException;
38
import org.apache.iotdb.rpc.TSStatusCode;
39

40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
import java.util.Collections;
44
import java.util.List;
45
import java.util.concurrent.ScheduledExecutorService;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.locks.ReadWriteLock;
48
import java.util.concurrent.locks.ReentrantReadWriteLock;
49

50
public class CQManager {
51

52
  private static final Logger LOGGER = LoggerFactory.getLogger(CQManager.class);
1✔
53

54
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
55

56
  private final ConfigManager configManager;
57

58
  private final ReadWriteLock lock;
59

60
  private ScheduledExecutorService executor;
61

62
  public CQManager(ConfigManager configManager) {
×
63
    this.configManager = configManager;
×
64
    this.lock = new ReentrantReadWriteLock();
×
65
    this.executor =
×
66
        IoTDBThreadPoolFactory.newScheduledThreadPool(
×
67
            CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
68
  }
×
69

70
  public TSStatus createCQ(TCreateCQReq req) {
71
    lock.readLock().lock();
×
72
    try {
73
      ScheduledExecutorService currentExecutor = executor;
×
74
      return configManager.getProcedureManager().createCQ(req, currentExecutor);
×
75
    } finally {
76
      lock.readLock().unlock();
×
77
    }
78
  }
79

80
  public TSStatus dropCQ(TDropCQReq req) {
81
    try {
82
      return configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
×
83
    } catch (ConsensusException e) {
×
84
      LOGGER.warn("Unexpected error happened while dropping cq {}: ", req.cqId, e);
×
85
      // consensus layer related errors
86
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
87
      res.setMessage(e.getMessage());
×
88
      return res;
×
89
    }
90
  }
91

92
  public TShowCQResp showCQ() {
93
    try {
94
      DataSet response = configManager.getConsensusManager().read(new ShowCQPlan());
×
95
      return ((ShowCQResp) response).convertToRpcShowCQResp();
×
96
    } catch (ConsensusException e) {
×
97
      LOGGER.warn("Unexpected error happened while showing cq: ", e);
×
98
      // consensus layer related errors
99
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
100
      res.setMessage(e.getMessage());
×
101
      return new TShowCQResp(res, Collections.emptyList());
×
102
    }
103
  }
104

105
  public ScheduledExecutorService getExecutor() {
106
    ScheduledExecutorService res;
107
    lock.readLock().lock();
×
108
    try {
109
      res = executor;
×
110
    } finally {
111
      lock.readLock().unlock();
×
112
    }
113
    return res;
×
114
  }
115

116
  public void startCQScheduler() {
117
    lock.writeLock().lock();
×
118
    try {
119
      // 1. shutdown previous cq schedule thread pool
120
      try {
121
        if (executor != null) {
×
122
          executor.shutdown();
×
123
        }
124
      } catch (Exception t) {
×
125
        // just print the error log because we should make sure we can start a new cq schedule pool
126
        // successfully in the next steps
127
        LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t);
×
128
      }
×
129

130
      // 2. start a new schedule thread pool
131
      executor =
×
132
          IoTDBThreadPoolFactory.newScheduledThreadPool(
×
133
              CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
134

135
      // 3. get all CQs
136
      List<CQInfo.CQEntry> allCQs = null;
×
137
      // wait for consensus layer ready
138
      while (configManager.getConsensusManager() == null) {
×
139
        try {
140
          LOGGER.info("consensus layer is not ready, sleep 1s...");
×
141
          TimeUnit.SECONDS.sleep(1);
×
142
        } catch (InterruptedException e) {
×
143
          Thread.currentThread().interrupt();
×
144
          LOGGER.warn("Unexpected interruption during waiting for consensus layer ready.");
×
145
        }
×
146
      }
147
      // keep fetching until we get all CQEntries if this node is still leader
148
      while (needFetch(allCQs)) {
×
149
        try {
150
          DataSet response = configManager.getConsensusManager().read(new ShowCQPlan());
×
151
          allCQs = ((ShowCQResp) response).getCqList();
×
152
        } catch (ConsensusException e) {
×
153
          // consensus layer related errors
154
          LOGGER.warn("Unexpected error happened while fetching cq list: ", e);
×
155
          try {
156
            Thread.sleep(500);
×
157
          } catch (InterruptedException ie) {
×
158
            Thread.currentThread().interrupt();
×
159
          }
×
160
        }
×
161
      }
162

163
      // 4. recover the scheduling of active CQs
164
      if (allCQs != null) {
×
165
        for (CQInfo.CQEntry entry : allCQs) {
×
166
          if (entry.getState() == CQState.ACTIVE) {
×
167
            CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
×
168
            cqScheduleTask.submitSelf();
×
169
          }
170
        }
×
171
      }
172

173
    } finally {
174
      lock.writeLock().unlock();
×
175
    }
176
  }
×
177

178
  private boolean needFetch(List<CQInfo.CQEntry> allCQs) {
179
    return allCQs == null && configManager.getConsensusManager().isLeader();
×
180
  }
181

182
  public void stopCQScheduler() {
183
    ScheduledExecutorService previous;
184
    lock.writeLock().lock();
×
185
    try {
186
      previous = executor;
×
187
      executor = null;
×
188
    } finally {
189
      lock.writeLock().unlock();
×
190
    }
191
    if (previous != null) {
×
192
      previous.shutdown();
×
193
    }
194
  }
×
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc