• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.17
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.cq;
21

22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
25
import org.apache.iotdb.commons.conf.CommonDescriptor;
26
import org.apache.iotdb.commons.cq.TimeoutPolicy;
27
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
28
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
29
import org.apache.iotdb.confignode.manager.ConfigManager;
30
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
31
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
32
import org.apache.iotdb.consensus.exception.ConsensusException;
33
import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
34
import org.apache.iotdb.rpc.TSStatusCode;
35

36
import org.apache.thrift.async.AsyncMethodCallback;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
import java.util.Optional;
41
import java.util.concurrent.ScheduledExecutorService;
42
import java.util.concurrent.TimeUnit;
43

44
public class CQScheduleTask implements Runnable {
45

46
  private static final Logger LOGGER = LoggerFactory.getLogger(CQScheduleTask.class);
1✔
47

48
  private static final long DEFAULT_RETRY_WAIT_TIME_IN_MS = 20L * 1_000;
49

50
  // ms is 1
51
  // us is 1_000
52
  // ns is 1_000_000
53
  private static final long FACTOR;
54

55
  static {
56
    String timestampPrecision = CommonDescriptor.getInstance().getConfig().getTimestampPrecision();
1✔
57
    if ("us".equals(timestampPrecision)) {
1✔
58
      FACTOR = 1_000;
×
59
    } else if ("ns".equals(timestampPrecision)) {
1✔
60
      FACTOR = 1_000_000;
×
61
    } else {
62
      FACTOR = 1;
1✔
63
    }
64
  }
1✔
65

66
  private final String cqId;
67
  private final long everyInterval;
68
  private final long startTimeOffset;
69
  private final long endTimeOffset;
70
  private final TimeoutPolicy timeoutPolicy;
71
  private final String queryBody;
72
  private final String md5;
73

74
  private final String zoneId;
75

76
  private final String username;
77

78
  private final ScheduledExecutorService executor;
79

80
  private final ConfigManager configManager;
81

82
  private final long retryWaitTimeInMS;
83

84
  private long executionTime;
85

86
  public CQScheduleTask(
87
      TCreateCQReq req,
88
      long firstExecutionTime,
89
      String md5,
90
      ScheduledExecutorService executor,
91
      ConfigManager configManager) {
92
    this(
×
93
        req.cqId,
94
        req.everyInterval,
95
        req.startTimeOffset,
96
        req.endTimeOffset,
97
        TimeoutPolicy.deserialize(req.timeoutPolicy),
×
98
        req.queryBody,
99
        md5,
100
        req.zoneId,
101
        req.username,
102
        executor,
103
        configManager,
104
        firstExecutionTime);
105
  }
×
106

107
  public CQScheduleTask(
108
      CQInfo.CQEntry entry, ScheduledExecutorService executor, ConfigManager configManager) {
109
    this(
×
110
        entry.getCqId(),
×
111
        entry.getEveryInterval(),
×
112
        entry.getStartTimeOffset(),
×
113
        entry.getEndTimeOffset(),
×
114
        entry.getTimeoutPolicy(),
×
115
        entry.getQueryBody(),
×
116
        entry.getMd5(),
×
117
        entry.getZoneId(),
×
118
        entry.getUsername(),
×
119
        executor,
120
        configManager,
121
        entry.getLastExecutionTime() + entry.getEveryInterval());
×
122
  }
×
123

124
  @SuppressWarnings("squid:S107")
125
  public CQScheduleTask(
126
      String cqId,
127
      long everyInterval,
128
      long startTimeOffset,
129
      long endTimeOffset,
130
      TimeoutPolicy timeoutPolicy,
131
      String queryBody,
132
      String md5,
133
      String zoneId,
134
      String username,
135
      ScheduledExecutorService executor,
136
      ConfigManager configManager,
137
      long executionTime) {
×
138
    this.cqId = cqId;
×
139
    this.everyInterval = everyInterval;
×
140
    this.startTimeOffset = startTimeOffset;
×
141
    this.endTimeOffset = endTimeOffset;
×
142
    this.timeoutPolicy = timeoutPolicy;
×
143
    this.queryBody = queryBody;
×
144
    this.md5 = md5;
×
145
    this.zoneId = zoneId;
×
146
    this.username = username;
×
147
    this.executor = executor;
×
148
    this.configManager = configManager;
×
149
    this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval / FACTOR);
×
150
    this.executionTime = executionTime;
×
151
  }
×
152

153
  public static long getFirstExecutionTime(long boundaryTime, long everyInterval) {
154
    long now = System.currentTimeMillis() * FACTOR;
1✔
155
    return getFirstExecutionTime(boundaryTime, everyInterval, now);
1✔
156
  }
157

158
  public static long getFirstExecutionTime(long boundaryTime, long everyInterval, long now) {
159
    if (now <= boundaryTime) {
1✔
160
      return boundaryTime;
1✔
161
    } else {
162
      return (((now - boundaryTime - 1) / everyInterval) + 1) * everyInterval + boundaryTime;
1✔
163
    }
164
  }
165

166
  @Override
167
  public void run() {
168
    long startTime = executionTime - startTimeOffset;
×
169
    long endTime = executionTime - endTimeOffset;
×
170

171
    Optional<TDataNodeLocation> targetDataNode =
×
172
        configManager.getNodeManager().getLowestLoadDataNode();
×
173
    // no usable DataNode to execute CQ
174
    if (!targetDataNode.isPresent()) {
×
175
      LOGGER.warn("There is no RUNNING DataNode to execute CQ {}", cqId);
×
176
      if (needSubmit()) {
×
177
        submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
×
178
      }
179
    } else {
180
      LOGGER.info(
×
181
          "[StartExecuteCQ] execute CQ {} on DataNode[{}], time range is [{}, {}), current time is {}",
182
          cqId,
183
          targetDataNode.get().dataNodeId,
×
184
          startTime,
×
185
          endTime,
×
186
          System.currentTimeMillis() * FACTOR);
×
187
      TExecuteCQ executeCQReq =
×
188
          new TExecuteCQ(queryBody, startTime, endTime, everyInterval, zoneId, cqId, username);
189
      try {
190
        AsyncDataNodeInternalServiceClient client =
191
            AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
×
192
        client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, endTime));
×
193
      } catch (Exception t) {
×
194
        LOGGER.warn("Execute CQ {} failed", cqId, t);
×
195
        if (needSubmit()) {
×
196
          submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
×
197
        }
198
      }
×
199
    }
200
  }
×
201

202
  public void submitSelf() {
203
    submitSelf(
×
204
        Math.max(0, executionTime / FACTOR - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
×
205
  }
×
206

207
  private void submitSelf(long delay, TimeUnit unit) {
208
    executor.schedule(this, delay, unit);
×
209
  }
×
210

211
  private boolean needSubmit() {
212
    // current node is still leader and thread pool is not shut down.
213
    return configManager.getConsensusManager().isLeader() && !executor.isShutdown();
×
214
  }
215

216
  private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> {
217

218
    private final long startTime;
219
    private final long endTime;
220

221
    public AsyncExecuteCQCallback(long startTime, long endTime) {
×
222
      this.startTime = startTime;
×
223
      this.endTime = endTime;
×
224
    }
×
225

226
    private void updateExecutionTime() {
227
      if (timeoutPolicy == TimeoutPolicy.BLOCKED) {
×
228
        executionTime = executionTime + everyInterval;
×
229
      } else if (timeoutPolicy == TimeoutPolicy.DISCARD) {
×
230
        long now = System.currentTimeMillis() * FACTOR;
×
231
        executionTime =
×
232
            executionTime + ((now - executionTime - 1) / everyInterval + 1) * everyInterval;
×
233
      } else {
×
234
        throw new IllegalArgumentException("Unknown TimeoutPolicy: " + timeoutPolicy);
×
235
      }
236
    }
×
237

238
    @Override
239
    public void onComplete(TSStatus response) {
240
      if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
241

242
        LOGGER.info(
×
243
            "[EndExecuteCQ] {}, time range is [{}, {}), current time is {}",
244
            cqId,
×
245
            startTime,
×
246
            endTime,
×
247
            System.currentTimeMillis() * FACTOR);
×
248
        TSStatus result;
249
        try {
250
          result =
×
251
              configManager
×
252
                  .getConsensusManager()
×
253
                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5));
×
254
        } catch (ConsensusException e) {
×
255
          result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
256
          result.setMessage(e.getMessage());
×
257
        }
×
258

259
        // while leadership changed, the update last exec time operation for CQTasks in new leader
260
        // may still update failed because stale CQTask in old leader may update it in advance
261
        if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
262
          LOGGER.warn(
×
263
              "Failed to update the last execution time {} of CQ {}, because {}",
264
              executionTime,
×
265
              cqId,
×
266
              result.getMessage());
×
267
          // no such cq, we don't need to submit it again
268
          if (result.getCode() == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
×
269
            LOGGER.info("Stop submitting CQ {} because {}", cqId, result.getMessage());
×
270
            return;
×
271
          }
272
        }
273

274
        if (needSubmit()) {
×
275
          updateExecutionTime();
×
276
          submitSelf();
×
277
        } else {
278
          LOGGER.info(
×
279
              "Stop submitting CQ {} because current node is not leader or current scheduled thread pool is shut down.",
280
              cqId);
×
281
        }
282

283
      } else {
×
284
        LOGGER.warn("Execute CQ {} failed, TSStatus is {}", cqId, response);
×
285
        if (needSubmit()) {
×
286
          submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
×
287
        }
288
      }
289
    }
×
290

291
    @Override
292
    public void onError(Exception exception) {
293
      LOGGER.warn("Execute CQ {} failed", cqId, exception);
×
294
      if (needSubmit()) {
×
295
        submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
×
296
      }
297
    }
×
298
  }
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc