• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9940

28 Aug 2023 02:34PM CUT coverage: 47.667% (-0.02%) from 47.686%
#9940

Pull #10978

travis_ci

web-flow
Merge 64f220724 into ebd2a6f63
Pull Request #10978: [To rel/1.2] Pipe: Increase the injection frequency of HeartBeatEvent to reduce the delay in log transferring (#10970)

30 of 30 new or added lines in 7 files covered. (100.0%)

79985 of 167800 relevant lines covered (47.67%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.95
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.runtime;
21

22
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
23
import org.apache.iotdb.commons.exception.StartupException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
25
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
26
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
28
import org.apache.iotdb.commons.service.IService;
29
import org.apache.iotdb.commons.service.ServiceType;
30
import org.apache.iotdb.db.conf.IoTDBDescriptor;
31
import org.apache.iotdb.db.pipe.agent.PipeAgent;
32
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
33
import org.apache.iotdb.db.service.ResourcesInformationHolder;
34
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
35

36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
38

39
import java.util.concurrent.atomic.AtomicBoolean;
40

41
public class PipeRuntimeAgent implements IService {
1✔
42

43
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
1✔
44
  private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
1✔
45

46
  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
1✔
47

48
  private final PipeCronEventInjector pipeCronEventInjector = new PipeCronEventInjector();
1✔
49

50
  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
1✔
51
      new SimpleConsensusProgressIndexAssigner();
52

53
  //////////////////////////// System Service Interface ////////////////////////////
54

55
  public synchronized void preparePipeResources(
56
      ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
57
    // clean sender (connector) hardlink file dir
58
    PipeHardlinkFileDirStartupCleaner.clean();
×
59

60
    // clean receiver file dir
61
    PipeAgent.receiver().cleanPipeReceiverDir();
×
62

63
    PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
×
64
    simpleConsensusProgressIndexAssigner.start();
×
65
  }
×
66

67
  @Override
68
  public synchronized void start() throws StartupException {
69
    PipeConfig.getInstance().printAllConfigs();
×
70
    PipeAgentLauncher.launchPipeTaskAgent();
×
71
    pipeCronEventInjector.start();
×
72

73
    isShutdown.set(false);
×
74
  }
×
75

76
  @Override
77
  public synchronized void stop() {
78
    if (isShutdown.get()) {
×
79
      return;
×
80
    }
81
    isShutdown.set(true);
×
82

83
    pipeCronEventInjector.stop();
×
84
    PipeAgent.task().dropAllPipeTasks();
×
85
  }
×
86

87
  public boolean isShutdown() {
88
    return isShutdown.get();
×
89
  }
90

91
  @Override
92
  public ServiceType getID() {
93
    return ServiceType.PIPE_RUNTIME_AGENT;
×
94
  }
95

96
  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
97

98
  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
99
    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
1✔
100
  }
1✔
101

102
  ////////////////////// Recover ProgressIndex Assigner //////////////////////
103

104
  public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
105
    tsFileResource.recoverProgressIndex(
×
106
        new RecoverProgressIndex(
107
            DATA_NODE_ID,
108
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
×
109
  }
×
110

111
  public void assignUpdateProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
112
    tsFileResource.updateProgressIndex(
1✔
113
        new RecoverProgressIndex(
114
            DATA_NODE_ID,
115
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
1✔
116
  }
1✔
117

118
  //////////////////////////// Runtime Exception Handlers ////////////////////////////
119

120
  public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
121
    LOGGER.warn(
×
122
        "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
123
        pipeTaskMeta,
124
        pipeRuntimeException.getMessage(),
×
125
        pipeRuntimeException);
126

127
    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
×
128

129
    // Quick stop all pipes locally if critical exception occurs,
130
    // no need to wait for the next heartbeat cycle.
131
    if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
×
132
      PipeAgent.task().stopAllPipesWithCriticalException();
×
133
    }
134
  }
×
135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc