• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.6
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.cq;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
24
import org.apache.iotdb.commons.concurrent.ThreadName;
25
import org.apache.iotdb.commons.cq.CQState;
26
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
27
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
28
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
30
import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
31
import org.apache.iotdb.confignode.manager.ConfigManager;
32
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
33
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
34
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
35
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
36
import org.apache.iotdb.consensus.common.DataSet;
37
import org.apache.iotdb.consensus.exception.ConsensusException;
38
import org.apache.iotdb.rpc.TSStatusCode;
39

40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
import java.util.Collections;
44
import java.util.List;
45
import java.util.concurrent.ScheduledExecutorService;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.locks.ReadWriteLock;
48
import java.util.concurrent.locks.ReentrantReadWriteLock;
49

50
public class CQManager {
51

52
  private static final Logger LOGGER = LoggerFactory.getLogger(CQManager.class);
1✔
53

54
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
55

56
  private final ConfigManager configManager;
57

58
  private final ReadWriteLock lock;
59

60
  private ScheduledExecutorService executor;
61

62
  public CQManager(ConfigManager configManager) {
×
63
    this.configManager = configManager;
×
64
    this.lock = new ReentrantReadWriteLock();
×
65
    this.executor =
×
66
        IoTDBThreadPoolFactory.newScheduledThreadPool(
×
67
            CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
68
  }
×
69

70
  public TSStatus createCQ(TCreateCQReq req) {
71
    lock.readLock().lock();
×
72
    try {
73
      ScheduledExecutorService currentExecutor = executor;
×
74
      return configManager.getProcedureManager().createCQ(req, currentExecutor);
×
75
    } finally {
76
      lock.readLock().unlock();
×
77
    }
78
  }
79

80
  public TSStatus dropCQ(TDropCQReq req) {
81
    try {
82
      return configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
×
83
    } catch (ConsensusException e) {
×
84
      LOGGER.warn("Unexpected error happened while dropping cq {}: ", req.cqId, e);
×
85
      // consensus layer related errors
86
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
87
      res.setMessage(e.getMessage());
×
88
      return res;
×
89
    }
90
  }
91

92
  public TShowCQResp showCQ() {
93
    try {
94
      DataSet response = configManager.getConsensusManager().read(new ShowCQPlan());
×
95
      return ((ShowCQResp) response).convertToRpcShowCQResp();
×
96
    } catch (ConsensusException e) {
×
97
      LOGGER.warn("Unexpected error happened while showing cq: ", e);
×
98
      // consensus layer related errors
99
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
100
      res.setMessage(e.getMessage());
×
101
      return new TShowCQResp(res, Collections.emptyList());
×
102
    }
103
  }
104

105
  public ScheduledExecutorService getExecutor() {
106
    ScheduledExecutorService res;
107
    lock.readLock().lock();
×
108
    try {
109
      res = executor;
×
110
    } finally {
111
      lock.readLock().unlock();
×
112
    }
113
    return res;
×
114
  }
115

116
  public void startCQScheduler() {
117
    try {
118
      /*
119
        TODO: remove this after fixing IOTDB-6085
120
        sleep here because IOTDB-6085: NullPointerException in readAsync when Ratis leader is changing
121
      */
122
      Thread.sleep(1000);
×
123
    } catch (InterruptedException e) {
×
124
      Thread.currentThread().interrupt();
×
125
    }
×
126
    lock.writeLock().lock();
×
127
    try {
128
      // 1. shutdown previous cq schedule thread pool
129
      try {
130
        if (executor != null) {
×
131
          executor.shutdown();
×
132
        }
133
      } catch (Exception t) {
×
134
        // just print the error log because we should make sure we can start a new cq schedule pool
135
        // successfully in the next steps
136
        LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t);
×
137
      }
×
138

139
      // 2. start a new schedule thread pool
140
      executor =
×
141
          IoTDBThreadPoolFactory.newScheduledThreadPool(
×
142
              CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
×
143

144
      // 3. get all CQs
145
      List<CQInfo.CQEntry> allCQs = null;
×
146
      // wait for consensus layer ready
147
      while (configManager.getConsensusManager() == null) {
×
148
        try {
149
          LOGGER.info("consensus layer is not ready, sleep 1s...");
×
150
          TimeUnit.SECONDS.sleep(1);
×
151
        } catch (InterruptedException e) {
×
152
          Thread.currentThread().interrupt();
×
153
          LOGGER.warn("Unexpected interruption during waiting for consensus layer ready.");
×
154
        }
×
155
      }
156
      // keep fetching until we get all CQEntries if this node is still leader
157
      while (needFetch(allCQs)) {
×
158
        try {
159
          DataSet response = configManager.getConsensusManager().read(new ShowCQPlan());
×
160
          allCQs = ((ShowCQResp) response).getCqList();
×
161
        } catch (ConsensusException e) {
×
162
          // consensus layer related errors
163
          LOGGER.warn("Unexpected error happened while fetching cq list: ", e);
×
164
          try {
165
            Thread.sleep(500);
×
166
          } catch (InterruptedException ie) {
×
167
            Thread.currentThread().interrupt();
×
168
          }
×
169
        }
×
170
      }
171

172
      // 4. recover the scheduling of active CQs
173
      if (allCQs != null) {
×
174
        for (CQInfo.CQEntry entry : allCQs) {
×
175
          if (entry.getState() == CQState.ACTIVE) {
×
176
            CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager);
×
177
            cqScheduleTask.submitSelf();
×
178
          }
179
        }
×
180
      }
181

182
    } finally {
183
      lock.writeLock().unlock();
×
184
    }
185
  }
×
186

187
  private boolean needFetch(List<CQInfo.CQEntry> allCQs) {
188
    return allCQs == null && configManager.getConsensusManager().isLeader();
×
189
  }
190

191
  public void stopCQScheduler() {
192
    ScheduledExecutorService previous;
193
    lock.writeLock().lock();
×
194
    try {
195
      previous = executor;
×
196
      executor = null;
×
197
    } finally {
198
      lock.writeLock().unlock();
×
199
    }
200
    if (previous != null) {
×
201
      previous.shutdown();
×
202
    }
203
  }
×
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc