• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9453

pending completion
#9453

push

travis_ci

web-flow
Fix bugs & code smells identified by Sonar

446 of 446 new or added lines in 99 files covered. (100.0%)

72004 of 161570 relevant lines covered (44.57%)

0.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.db.mpp.plan.execution;
20

21
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.client.IClientManager;
24
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
25
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
26
import org.apache.iotdb.commons.conf.IoTDBConstant;
27
import org.apache.iotdb.commons.exception.IoTDBException;
28
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
29
import org.apache.iotdb.db.conf.IoTDBConfig;
30
import org.apache.iotdb.db.conf.IoTDBDescriptor;
31
import org.apache.iotdb.db.exception.query.KilledByOthersException;
32
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
33
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
34
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
35
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
36
import org.apache.iotdb.db.mpp.execution.QueryState;
37
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
38
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
39
import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
40
import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
41
import org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet;
42
import org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet;
43
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
44
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
45
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
46
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
47
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
48
import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
49
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
50
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
51
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceVisitor;
52
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
53
import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
54
import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
55
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
56
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
57
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
58
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
59
import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
60
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
61
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
62
import org.apache.iotdb.db.mpp.plan.statement.Statement;
63
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
64
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
65
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
66
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
67
import org.apache.iotdb.db.utils.SetThreadName;
68
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
69
import org.apache.iotdb.rpc.RpcUtils;
70
import org.apache.iotdb.rpc.TSStatusCode;
71
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
72

73
import com.google.common.util.concurrent.ListenableFuture;
74
import com.google.common.util.concurrent.SettableFuture;
75
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
77

78
import java.nio.ByteBuffer;
79
import java.util.ArrayList;
80
import java.util.List;
81
import java.util.Optional;
82
import java.util.concurrent.CancellationException;
83
import java.util.concurrent.ExecutionException;
84
import java.util.concurrent.ExecutorService;
85
import java.util.concurrent.ScheduledExecutorService;
86
import java.util.concurrent.atomic.AtomicBoolean;
87

88
import static com.google.common.base.Preconditions.checkArgument;
89
import static com.google.common.base.Throwables.throwIfUnchecked;
90
import static org.apache.iotdb.db.mpp.common.DataNodeEndPoints.isSameNode;
91
import static org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.WAIT_FOR_RESULT;
92
import static org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
93

94
/**
95
 * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
96
 * frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
97
 * to DistributedQueryPlan with fragment instances. 2. Dispatch all the fragment instances to
98
 * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
99
 */
100
public class QueryExecution implements IQueryExecution {
101
  private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class);
×
102

103
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
×
104
  private static final int MAX_RETRY_COUNT = 3;
105
  private static final long RETRY_INTERVAL_IN_MS = 2000;
106
  private int retryCount = 0;
×
107
  private final MPPQueryContext context;
108
  private IScheduler scheduler;
109
  private final QueryStateMachine stateMachine;
110

111
  private final List<PlanOptimizer> planOptimizers;
112

113
  private final Statement rawStatement;
114
  private Analysis analysis;
115
  private LogicalQueryPlan logicalPlan;
116
  private DistributedQueryPlan distributedPlan;
117

118
  private final ExecutorService executor;
119
  private final ExecutorService writeOperationExecutor;
120
  private final ScheduledExecutorService scheduledExecutor;
121
  // TODO need to use factory to decide standalone or cluster
122
  private final IPartitionFetcher partitionFetcher;
123
  // TODO need to use factory to decide standalone or cluster,
124
  private final ISchemaFetcher schemaFetcher;
125

126
  // The result of QueryExecution will be written to the MPPDataExchangeManager in current Node.
127
  // We use this SourceHandle to fetch the TsBlock from it.
128
  private ISourceHandle resultHandle;
129

130
  // used for cleaning resultHandle up exactly once
131
  private final AtomicBoolean resultHandleCleanUp;
132

133
  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
134
      syncInternalServiceClientManager;
135

136
  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
137
      asyncInternalServiceClientManager;
138

139
  private final AtomicBoolean stopped;
140

141
  // cost time in ns
142
  private long totalExecutionTime = 0;
×
143

144
  private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
145
      QueryExecutionMetricSet.getInstance();
×
146
  private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
147
      QueryPlanCostMetricSet.getInstance();
×
148
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
×
149
      PerformanceOverviewMetrics.getInstance();
×
150

151
  @SuppressWarnings("squid:S107")
152
  public QueryExecution(
153
      Statement statement,
154
      MPPQueryContext context,
155
      ExecutorService executor,
156
      ExecutorService writeOperationExecutor,
157
      ScheduledExecutorService scheduledExecutor,
158
      IPartitionFetcher partitionFetcher,
159
      ISchemaFetcher schemaFetcher,
160
      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
161
      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
162
          asyncInternalServiceClientManager) {
×
163
    this.rawStatement = statement;
×
164
    this.executor = executor;
×
165
    this.writeOperationExecutor = writeOperationExecutor;
×
166
    this.scheduledExecutor = scheduledExecutor;
×
167
    this.context = context;
×
168
    this.planOptimizers = new ArrayList<>();
×
169
    this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher);
×
170
    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
×
171
    this.partitionFetcher = partitionFetcher;
×
172
    this.schemaFetcher = schemaFetcher;
×
173
    this.syncInternalServiceClientManager = syncInternalServiceClientManager;
×
174
    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
×
175

176
    // We add the abort logic inside the QueryExecution.
177
    // So that the other components can only focus on the state change.
178
    stateMachine.addStateChangeListener(
×
179
        state -> {
180
          try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) {
×
181
            if (!state.isDone()) {
×
182
              return;
×
183
            }
184
            // TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
185
            // invoked
186
            if (state == QueryState.FAILED
×
187
                || state == QueryState.ABORTED
188
                || state == QueryState.CANCELED) {
189
              logger.debug("[ReleaseQueryResource] state is: {}", state);
×
190
              Throwable cause = stateMachine.getFailureException();
×
191
              releaseResource(cause);
×
192
            }
193
            this.stop(null);
×
194
          }
×
195
        });
×
196
    this.stopped = new AtomicBoolean(false);
×
197
    this.resultHandleCleanUp = new AtomicBoolean(false);
×
198
  }
×
199

200
  @FunctionalInterface
201
  interface ISourceHandleSupplier<T> {
202
    T get() throws IoTDBException;
203
  }
204

205
  public void start() {
206
    final long startTime = System.nanoTime();
×
207
    if (skipExecute()) {
×
208
      logger.debug("[SkipExecute]");
×
209
      if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
×
210
        stateMachine.transitionToFailed(analysis.getFailStatus());
×
211
      } else {
212
        constructResultForMemorySource();
×
213
        stateMachine.transitionToRunning();
×
214
      }
215
      return;
×
216
    }
217

218
    // check timeout for query first
219
    checkTimeOutForQuery();
×
220
    doLogicalPlan();
×
221
    doDistributedPlan();
×
222
    // update timeout after finishing plan stage
223
    context.setTimeOut(
×
224
        context.getTimeOut() - (System.currentTimeMillis() - context.getStartTime()));
×
225

226
    stateMachine.transitionToPlanned();
×
227
    if (context.getQueryType() == QueryType.READ) {
×
228
      initResultHandle();
×
229
    }
230
    PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
×
231
    schedule();
×
232

233
    // set partial insert error message
234
    // When some columns in one insert failed, other column will continue executing insertion.
235
    // The error message should be return to client, therefore we need to set it after the insertion
236
    // of other column finished.
237
    if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
×
238
      stateMachine.transitionToFailed(analysis.getFailStatus());
×
239
    }
240
  }
×
241

242
  private void checkTimeOutForQuery() {
243
    // only check query operation's timeout because we will never limit write operation's execution
244
    // time
245
    if (isQuery()) {
×
246
      long currentTime = System.currentTimeMillis();
×
247
      if (currentTime >= context.getTimeOut() + context.getStartTime()) {
×
248
        throw new QueryTimeoutRuntimeException(
×
249
            context.getStartTime(), currentTime, context.getTimeOut());
×
250
      }
251
    }
252
  }
×
253

254
  private ExecutionResult retry() {
255
    if (retryCount >= MAX_RETRY_COUNT) {
×
256
      logger.warn("[ReachMaxRetryCount]");
×
257
      stateMachine.transitionToFailed();
×
258
      return getStatus();
×
259
    }
260
    logger.warn("error when executing query. {}", stateMachine.getFailureMessage());
×
261
    // stop and clean up resources the QueryExecution used
262
    this.stopAndCleanup(stateMachine.getFailureException());
×
263
    logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
×
264
    try {
265
      Thread.sleep(RETRY_INTERVAL_IN_MS);
×
266
    } catch (InterruptedException e) {
×
267
      logger.warn("interrupted when waiting retry");
×
268
      Thread.currentThread().interrupt();
×
269
    }
×
270
    retryCount++;
×
271
    logger.info("[Retry] retry count is: {}", retryCount);
×
272
    stateMachine.transitionToQueued();
×
273
    // force invalid PartitionCache
274
    partitionFetcher.invalidAllCache();
×
275
    // clear runtime variables in MPPQueryContext
276
    context.prepareForRetry();
×
277
    // re-stop
278
    this.stopped.compareAndSet(true, false);
×
279
    this.resultHandleCleanUp.compareAndSet(true, false);
×
280
    // re-analyze the query
281
    this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
×
282
    // re-start the QueryExecution
283
    this.start();
×
284
    return getStatus();
×
285
  }
286

287
  private boolean skipExecute() {
288
    return analysis.isFinishQueryAfterAnalyze()
×
289
        || (context.getQueryType() == QueryType.READ && !analysis.hasDataSource());
×
290
  }
291

292
  private void constructResultForMemorySource() {
293
    StatementMemorySource memorySource =
×
294
        new StatementMemorySourceVisitor()
295
            .process(analysis.getStatement(), new StatementMemorySourceContext(context, analysis));
×
296
    this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock());
×
297
    this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader());
×
298
  }
×
299

300
  // Analyze the statement in QueryContext. Generate the analysis this query need
301
  private Analysis analyze(
302
      Statement statement,
303
      MPPQueryContext context,
304
      IPartitionFetcher partitionFetcher,
305
      ISchemaFetcher schemaFetcher) {
306
    final long startTime = System.nanoTime();
×
307
    Analysis result;
308
    try {
309
      result = new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement);
×
310
    } finally {
311
      PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(System.nanoTime() - startTime);
×
312
    }
313
    return result;
×
314
  }
315

316
  private void schedule() {
317
    final long startTime = System.nanoTime();
×
318
    if (rawStatement instanceof LoadTsFileStatement) {
×
319
      this.scheduler =
×
320
          new LoadTsFileScheduler(
321
              distributedPlan,
322
              context,
323
              stateMachine,
324
              syncInternalServiceClientManager,
325
              partitionFetcher);
326
      this.scheduler.start();
×
327
      return;
×
328
    }
329

330
    // TODO: (xingtanzjr) initialize the query scheduler according to configuration
331
    this.scheduler =
×
332
        new ClusterScheduler(
333
            context,
334
            stateMachine,
335
            distributedPlan.getInstances(),
×
336
            context.getQueryType(),
×
337
            executor,
338
            writeOperationExecutor,
339
            scheduledExecutor,
340
            syncInternalServiceClientManager,
341
            asyncInternalServiceClientManager);
342
    this.scheduler.start();
×
343
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime);
×
344
  }
×
345

346
  // Use LogicalPlanner to do the logical query plan and logical optimization
347
  public void doLogicalPlan() {
348
    LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
×
349
    this.logicalPlan = planner.plan(this.analysis);
×
350
    if (isQuery() && logger.isDebugEnabled()) {
×
351
      logger.debug(
×
352
          "logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
×
353
    }
354
    // check timeout after building logical plan because it could be time-consuming in some cases.
355
    checkTimeOutForQuery();
×
356
  }
×
357

358
  // Generate the distributed plan and split it into fragments
359
  public void doDistributedPlan() {
360
    long startTime = System.nanoTime();
×
361
    DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
×
362
    this.distributedPlan = planner.planFragments();
×
363

364
    if (rawStatement.isQuery()) {
×
365
      QUERY_PLAN_COST_METRIC_SET.recordPlanCost(
×
366
          DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
×
367
    }
368
    if (isQuery() && logger.isDebugEnabled()) {
×
369
      logger.debug(
×
370
          "distribution plan done. Fragment instance count is {}, details is: \n {}",
371
          distributedPlan.getInstances().size(),
×
372
          printFragmentInstances(distributedPlan.getInstances()));
×
373
    }
374
    // check timeout after building distribution plan because it could be time-consuming in some
375
    // cases.
376
    checkTimeOutForQuery();
×
377
  }
×
378

379
  private String printFragmentInstances(List<FragmentInstance> instances) {
380
    StringBuilder ret = new StringBuilder();
×
381
    for (FragmentInstance instance : instances) {
×
382
      ret.append(System.lineSeparator()).append(instance);
×
383
    }
×
384
    return ret.toString();
×
385
  }
386

387
  // Stop the workers for this query
388
  public void stop(Throwable t) {
389
    // only stop once
390
    if (stopped.compareAndSet(false, true) && this.scheduler != null) {
×
391
      this.scheduler.stop(t);
×
392
    }
393
  }
×
394

395
  // Stop the query and clean up all the resources this query occupied
396
  public void stopAndCleanup() {
397
    stop(null);
×
398
    releaseResource();
×
399
  }
×
400

401
  @Override
402
  public void cancel() {
403
    stateMachine.transitionToCanceled(
×
404
        new KilledByOthersException(),
405
        new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
×
406
            .setMessage(KilledByOthersException.MESSAGE));
×
407
  }
×
408

409
  /** Release the resources that current QueryExecution hold. */
410
  private void releaseResource() {
411
    // close ResultHandle to unblock client's getResult request
412
    // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
413
    // There are only two scenarios where the ResultHandle should be closed:
414
    //   1. The client fetch all the result and the ResultHandle is finished.
415
    //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
416
    // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
417
    // waiting it to be finished.
418
    if (resultHandle != null) {
×
419
      resultHandle.close();
×
420
      cleanUpResultHandle();
×
421
    }
422
  }
×
423

424
  private void cleanUpResultHandle() {
425
    // Result handle belongs to special fragment instance, so we need to deregister it alone
426
    // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool
427
    // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream
428
    // FragmentInstanceId to register
429
    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) {
×
430
      TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId();
×
431
      MPPDataExchangeService.getInstance()
×
432
          .getMPPDataExchangeManager()
×
433
          .deRegisterFragmentInstanceFromMemoryPool(
×
434
              fragmentInstanceId.queryId,
435
              FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
×
436
                  fragmentInstanceId));
437
    }
438
  }
×
439

440
  // Stop the query and clean up all the resources this query occupied
441
  public void stopAndCleanup(Throwable t) {
442
    stop(t);
×
443
    releaseResource(t);
×
444
  }
×
445

446
  /** Release the resources that current QueryExecution hold with a specified exception */
447
  private void releaseResource(Throwable t) {
448
    // close ResultHandle to unblock client's getResult request
449
    // Actually, we should not close the ResultHandle when the QueryExecution is Finished.
450
    // There are only two scenarios where the ResultHandle should be closed:
451
    //   1. The client fetch all the result and the ResultHandle is finished.
452
    //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
453
    // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
454
    // waiting it to be finished.
455
    if (resultHandle != null) {
×
456
      if (t != null) {
×
457
        resultHandle.abort(t);
×
458
      } else {
459
        resultHandle.close();
×
460
      }
461
      cleanUpResultHandle();
×
462
    }
463
  }
×
464

465
  /**
466
   * This method will be called by the request thread from client connection. This method will block
467
   * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
468
   * 3. The query has been cancelled 4. The query is timeout This method will fetch the result from
469
   * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
470
   * implemented with DataStreamManager)
471
   */
472
  private <T> Optional<T> getResult(ISourceHandleSupplier<T> dataSupplier) throws IoTDBException {
473
    checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly.");
×
474
    // iterate until we get a non-nullable TsBlock or result is finished
475
    while (true) {
476
      try {
477
        if (resultHandle.isAborted()) {
×
478
          logger.warn("[ResultHandleAborted]");
×
479
          stateMachine.transitionToAborted();
×
480
          if (stateMachine.getFailureStatus() != null) {
×
481
            throw new IoTDBException(
×
482
                stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
×
483
          } else {
484
            throw new IoTDBException(
×
485
                stateMachine.getFailureMessage(),
×
486
                TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
487
          }
488
        } else if (resultHandle.isFinished()) {
×
489
          logger.debug("[ResultHandleFinished]");
×
490
          stateMachine.transitionToFinished();
×
491
          return Optional.empty();
×
492
        }
493

494
        long startTime = System.nanoTime();
×
495
        try {
496
          ListenableFuture<?> blocked = resultHandle.isBlocked();
×
497
          blocked.get();
×
498
        } finally {
499
          QUERY_EXECUTION_METRIC_SET.recordExecutionCost(
×
500
              WAIT_FOR_RESULT, System.nanoTime() - startTime);
×
501
        }
502

503
        if (!resultHandle.isFinished()) {
×
504
          // use the getSerializedTsBlock instead of receive to get ByteBuffer result
505
          T res = dataSupplier.get();
×
506
          if (res == null) {
×
507
            continue;
×
508
          }
509
          return Optional.of(res);
×
510
        } else {
511
          return Optional.empty();
×
512
        }
513
      } catch (ExecutionException | CancellationException e) {
×
514
        dealWithException(e.getCause() != null ? e.getCause() : e);
×
515
      } catch (InterruptedException e) {
×
516
        Thread.currentThread().interrupt();
×
517
        dealWithException(e);
×
518
      } catch (Throwable t) {
×
519
        dealWithException(t);
×
520
      }
×
521
    }
522
  }
523

524
  private void dealWithException(Throwable t) throws IoTDBException {
525
    stateMachine.transitionToFailed(t);
×
526
    if (stateMachine.getFailureStatus() != null) {
×
527
      throw new IoTDBException(
×
528
          stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
×
529
    } else if (stateMachine.getFailureException() != null) {
×
530
      Throwable rootCause = stateMachine.getFailureException();
×
531
      throw new IoTDBException(rootCause, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
532
    } else {
533
      throwIfUnchecked(t);
×
534
      throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
×
535
    }
536
  }
537

538
  @Override
539
  public Optional<TsBlock> getBatchResult() throws IoTDBException {
540
    return getResult(this::getDeserializedTsBlock);
×
541
  }
542

543
  private TsBlock getDeserializedTsBlock() {
544
    return resultHandle.receive();
×
545
  }
546

547
  @Override
548
  public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
549
    return getResult(this::getSerializedTsBlock);
×
550
  }
551

552
  private ByteBuffer getSerializedTsBlock() throws IoTDBException {
553
    return resultHandle.getSerializedTsBlock();
×
554
  }
555

556
  /** @return true if there is more tsblocks, otherwise false */
557
  @Override
558
  public boolean hasNextResult() {
559
    return resultHandle != null && !resultHandle.isFinished();
×
560
  }
561

562
  /** return the result column count without the time column */
563
  @Override
564
  public int getOutputValueColumnCount() {
565
    return analysis.getRespDatasetHeader().getOutputValueColumnCount();
×
566
  }
567

568
  @Override
569
  public DatasetHeader getDatasetHeader() {
570
    return analysis.getRespDatasetHeader();
×
571
  }
572

573
  /**
574
   * This method is a synchronized method. For READ, it will block until all the FragmentInstances
575
   * have been submitted. For WRITE, it will block until all the FragmentInstances have finished.
576
   *
577
   * @return ExecutionStatus. Contains the QueryId and the TSStatus.
578
   */
579
  public ExecutionResult getStatus() {
580
    // Although we monitor the state to transition to RUNNING, the future will return if any
581
    // Terminated state is triggered
582
    try {
583
      if (stateMachine.getState() == QueryState.FINISHED) {
×
584
        return getExecutionResult(QueryState.FINISHED);
×
585
      }
586
      SettableFuture<QueryState> future = SettableFuture.create();
×
587
      stateMachine.addStateChangeListener(
×
588
          state -> {
589
            if (state == QueryState.RUNNING
×
590
                || state.isDone()
×
591
                || state == QueryState.PENDING_RETRY) {
592
              future.set(state);
×
593
            }
594
          });
×
595
      QueryState state = future.get();
×
596
      if (state == QueryState.PENDING_RETRY) {
×
597
        // That we put retry() here is aimed to leverage the ClientRPC thread rather than
598
        // create another new thread to do the retry() logic.
599
        // This way will lead to recursive call because retry() calls getStatus() inside.
600
        // The max depths of recursive call is equal to the max retry count.
601
        return retry();
×
602
      }
603
      // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED
604
      return getExecutionResult(state);
×
605
    } catch (InterruptedException e) {
×
606
      // TODO: (xingtanzjr) use more accurate error handling
607
      Thread.currentThread().interrupt();
×
608
      return new ExecutionResult(
×
609
          context.getQueryId(),
×
610
          stateMachine.getFailureStatus() == null
×
611
              ? RpcUtils.getStatus(
×
612
                  TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
×
613
              : stateMachine.getFailureStatus());
×
614
    } catch (ExecutionException e) {
×
615
      return new ExecutionResult(
×
616
          context.getQueryId(),
×
617
          stateMachine.getFailureStatus() == null
×
618
              ? RpcUtils.getStatus(
×
619
                  TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())
×
620
              : stateMachine.getFailureStatus());
×
621
    }
622
  }
623

624
  private void initResultHandle() {
625
    TEndPoint upstreamEndPoint = context.getResultNodeContext().getUpStreamEndpoint();
×
626

627
    this.resultHandle =
×
628
        isSameNode(upstreamEndPoint)
×
629
            ? MPPDataExchangeService.getInstance()
×
630
                .getMPPDataExchangeManager()
×
631
                .createLocalSourceHandleForFragment(
×
632
                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
×
633
                    context.getResultNodeContext().getVirtualResultNodeId().getId(),
×
634
                    context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
×
635
                    context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
×
636
                    0, // Upstream of result ExchangeNode will only have one child.
637
                    stateMachine::transitionToFailed)
×
638
            : MPPDataExchangeService.getInstance()
×
639
                .getMPPDataExchangeManager()
×
640
                .createSourceHandle(
×
641
                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
×
642
                    context.getResultNodeContext().getVirtualResultNodeId().getId(),
×
643
                    0,
644
                    upstreamEndPoint,
645
                    context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
×
646
                    stateMachine::transitionToFailed);
×
647
  }
×
648

649
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
650
  private ExecutionResult getExecutionResult(QueryState state) {
651
    TSStatusCode statusCode;
652
    if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
×
653
      // For WRITE, the state should be FINISHED
654
      statusCode =
655
          state == QueryState.FINISHED
×
656
              ? TSStatusCode.SUCCESS_STATUS
×
657
              : TSStatusCode.WRITE_PROCESS_ERROR;
×
658
    } else {
659
      // For READ, the state could be FINISHED and RUNNING
660
      statusCode =
661
          state == QueryState.FINISHED || state == QueryState.RUNNING
×
662
              ? TSStatusCode.SUCCESS_STATUS
×
663
              : TSStatusCode.EXECUTE_STATEMENT_ERROR;
×
664
    }
665

666
    TSStatus tsstatus =
×
667
        RpcUtils.getStatus(
×
668
            statusCode,
669
            statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage());
×
670

671
    // If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
672
    // not null. We should only return the failure status when QueryExecution is in Done state.
673
    if (state.isDone() && stateMachine.getFailureStatus() != null) {
×
674
      tsstatus = stateMachine.getFailureStatus();
×
675
    }
676

677
    // collect redirect info to client for writing
678
    // if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop returning redirect
679
    // info to client
680
    if (analysis.getStatement() instanceof InsertBaseStatement
×
681
        && !analysis.isFinishQueryAfterAnalyze()
×
682
        && (!config.isEnable13DataInsertAdapt()
×
683
            || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion()))) {
×
684
      InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
×
685
      List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
×
686
      if (insertStatement instanceof InsertRowsStatement
×
687
          || insertStatement instanceof InsertMultiTabletsStatement) {
688
        // multiple devices
689
        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
×
690
          boolean needRedirect = false;
×
691
          List<TSStatus> subStatus = new ArrayList<>();
×
692
          for (TEndPoint endPoint : redirectNodeList) {
×
693
            // redirect writing only if the redirectEndPoint is not the current node
694
            if (!config.getAddressAndPort().equals(endPoint)) {
×
695
              subStatus.add(
×
696
                  RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
×
697
              needRedirect = true;
×
698
            } else {
699
              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
700
            }
701
          }
×
702
          if (needRedirect) {
×
703
            tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
×
704
            tsstatus.setSubStatus(subStatus);
×
705
          }
706
        }
×
707
      } else {
708
        // single device
709
        TEndPoint redirectEndPoint = redirectNodeList.get(0);
×
710
        // redirect writing only if the redirectEndPoint is not the current node
711
        if (!config.getAddressAndPort().equals(redirectEndPoint)) {
×
712
          tsstatus.setRedirectNode(redirectEndPoint);
×
713
        }
714
      }
715
    }
716

717
    return new ExecutionResult(context.getQueryId(), tsstatus);
×
718
  }
719

720
  public DistributedQueryPlan getDistributedPlan() {
721
    return distributedPlan;
×
722
  }
723

724
  public LogicalQueryPlan getLogicalPlan() {
725
    return logicalPlan;
×
726
  }
727

728
  @Override
729
  public boolean isQuery() {
730
    return context.getQueryType() == QueryType.READ;
×
731
  }
732

733
  @Override
734
  public String getQueryId() {
735
    return context.getQueryId().getId();
×
736
  }
737

738
  @Override
739
  public long getStartExecutionTime() {
740
    return context.getStartTime();
×
741
  }
742

743
  @Override
744
  public void recordExecutionTime(long executionTime) {
745
    totalExecutionTime += executionTime;
×
746
  }
×
747

748
  @Override
749
  public long getTotalExecutionTime() {
750
    return totalExecutionTime;
×
751
  }
752

753
  @Override
754
  public Optional<String> getExecuteSQL() {
755
    return Optional.ofNullable(context.getSql());
×
756
  }
757

758
  @Override
759
  public Statement getStatement() {
760
    return analysis.getStatement();
×
761
  }
762

763
  public String toString() {
764
    return String.format("QueryExecution[%s]", context.getQueryId());
×
765
  }
766
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc