• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9875

21 Aug 2023 01:06AM UTC coverage: 47.992% (-0.001%) from 47.993%
#9875

push

travis_ci

web-flow
Implement intersect with prefix pattern for PartialPath and PathPatternTree (#10909)

71 of 71 new or added lines in 3 files covered. (100.0%)

79843 of 166368 relevant lines covered (47.99%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.57
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.runtime;
21

22
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
23
import org.apache.iotdb.commons.exception.StartupException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
25
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
26
import org.apache.iotdb.commons.pipe.config.PipeConfig;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
28
import org.apache.iotdb.commons.service.IService;
29
import org.apache.iotdb.commons.service.ServiceType;
30
import org.apache.iotdb.db.conf.IoTDBDescriptor;
31
import org.apache.iotdb.db.pipe.agent.PipeAgent;
32
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
33
import org.apache.iotdb.db.service.ResourcesInformationHolder;
34
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
35

36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
38

39
import java.util.concurrent.atomic.AtomicBoolean;
40

41
public class PipeRuntimeAgent implements IService {
1✔
42

43
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
1✔
44
  private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
1✔
45

46
  private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
1✔
47

48
  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
1✔
49
      new SimpleConsensusProgressIndexAssigner();
50

51
  //////////////////////////// System Service Interface ////////////////////////////
52

53
  public synchronized void preparePipeResources(
54
      ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
55
    // clean sender (connector) hardlink file dir
56
    PipeHardlinkFileDirStartupCleaner.clean();
×
57

58
    // clean receiver file dir
59
    PipeAgent.receiver().cleanPipeReceiverDir();
×
60

61
    PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
×
62
    simpleConsensusProgressIndexAssigner.start();
×
63
  }
×
64

65
  @Override
66
  public synchronized void start() throws StartupException {
67
    PipeConfig.getInstance().printAllConfigs();
×
68
    PipeAgentLauncher.launchPipeTaskAgent();
×
69

70
    isShutdown.set(false);
×
71
  }
×
72

73
  @Override
74
  public synchronized void stop() {
75
    if (isShutdown.get()) {
×
76
      return;
×
77
    }
78
    isShutdown.set(true);
×
79

80
    PipeAgent.task().dropAllPipeTasks();
×
81
  }
×
82

83
  public boolean isShutdown() {
84
    return isShutdown.get();
×
85
  }
86

87
  @Override
88
  public ServiceType getID() {
89
    return ServiceType.PIPE_RUNTIME_AGENT;
×
90
  }
91

92
  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
93

94
  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
95
    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
1✔
96
  }
1✔
97

98
  ////////////////////// Recover ProgressIndex Assigner //////////////////////
99
  public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
100
    tsFileResource.recoverProgressIndex(
×
101
        new RecoverProgressIndex(
102
            DATA_NODE_ID,
103
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
×
104
  }
×
105

106
  public void assignUpdateProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
107
    tsFileResource.updateProgressIndex(
1✔
108
        new RecoverProgressIndex(
109
            DATA_NODE_ID,
110
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
1✔
111
  }
1✔
112

113
  //////////////////////////// Runtime Exception Handlers ////////////////////////////
114

115
  public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
116
    LOGGER.warn(
×
117
        "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
118
        pipeTaskMeta,
119
        pipeRuntimeException.getMessage(),
×
120
        pipeRuntimeException);
121

122
    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
×
123

124
    // Quick stop all pipes locally if critical exception occurs,
125
    // no need to wait for the next heartbeat cycle.
126
    if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
×
127
      PipeAgent.task().stopAllPipesWithCriticalException();
×
128
    }
129
  }
×
130
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc