• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9703

pending completion
#9703

push

travis_ci

web-flow
[IOTDB-6067] Pipe: Improve the stability of iotdb-thrift-connector-v2 during fault tolerance (avoid OOM) (#10550) (#10719)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit f280381be)

283 of 283 new or added lines in 14 files covered. (100.0%)

79217 of 165033 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.25
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.runtime;
21

22
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
23
import org.apache.iotdb.commons.exception.StartupException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
25
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
26
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
28
import org.apache.iotdb.commons.service.IService;
29
import org.apache.iotdb.commons.service.ServiceType;
30
import org.apache.iotdb.db.conf.IoTDBDescriptor;
31
import org.apache.iotdb.db.pipe.agent.PipeAgent;
32
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
33
import org.apache.iotdb.db.service.ResourcesInformationHolder;
34
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
35

36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
38

39
import java.util.concurrent.atomic.AtomicBoolean;
40

41
public class PipeRuntimeAgent implements IService {
1✔
42

43
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
1✔
44
  private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
1✔
45

46
  private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
1✔
47

48
  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
1✔
49
      new SimpleConsensusProgressIndexAssigner();
50

51
  //////////////////////////// System Service Interface ////////////////////////////
52

53
  public synchronized void preparePipeResources(
54
      ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
55
    PipeAgent.receiver().cleanPipeReceiverDir();
×
56
    PipeHardlinkFileDirStartupCleaner.clean();
×
57
    PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
×
58
    simpleConsensusProgressIndexAssigner.start();
×
59
  }
×
60

61
  @Override
62
  public synchronized void start() throws StartupException {
63
    PipeConfig.getInstance().printAllConfigs();
×
64
    PipeAgentLauncher.launchPipeTaskAgent();
×
65

66
    isShutdown.set(false);
×
67
  }
×
68

69
  @Override
70
  public synchronized void stop() {
71
    if (isShutdown.get()) {
×
72
      return;
×
73
    }
74
    isShutdown.set(true);
×
75

76
    PipeAgent.task().dropAllPipeTasks();
×
77
  }
×
78

79
  public boolean isShutdown() {
80
    return isShutdown.get();
×
81
  }
82

83
  @Override
84
  public ServiceType getID() {
85
    return ServiceType.PIPE_RUNTIME_AGENT;
×
86
  }
87

88
  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
89

90
  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
91
    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
1✔
92
  }
1✔
93

94
  ////////////////////// Recover ProgressIndex Assigner //////////////////////
95

96
  public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
97
    tsFileResource.updateProgressIndex(
1✔
98
        new RecoverProgressIndex(
99
            DATA_NODE_ID,
100
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
1✔
101
  }
1✔
102

103
  //////////////////////////// Runtime Exception Handlers ////////////////////////////
104

105
  public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
106
    LOGGER.warn(
×
107
        "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
108
        pipeTaskMeta,
109
        pipeRuntimeException.getMessage(),
×
110
        pipeRuntimeException);
111

112
    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
×
113

114
    // Quick stop all pipes locally if critical exception occurs,
115
    // no need to wait for the next heartbeat cycle.
116
    if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
×
117
      PipeAgent.task().stopAllPipesWithCriticalException();
×
118
    }
119
  }
×
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc