• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9647

pending completion
#9647

push

travis_ci

web-flow
[IOTDB-6075] Pipe: File resource races when different tsfile load operations concurrently modify the same tsfile at receiver (#10629)

* Add a parameter in iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties to control the connection timeout.
* Set the pipe connection timeout between sender and receiver to 15 mins to allow long time-cost load operation.
* Redesign the pipe receiver's dir to avoid file resource races when different tsfile load operations concurrently modify the same tsfile.

184 of 184 new or added lines in 9 files covered. (100.0%)

79058 of 165585 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.33
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.runtime;
21

22
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
23
import org.apache.iotdb.commons.exception.StartupException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
27
import org.apache.iotdb.commons.service.IService;
28
import org.apache.iotdb.commons.service.ServiceType;
29
import org.apache.iotdb.db.conf.IoTDBDescriptor;
30
import org.apache.iotdb.db.pipe.agent.PipeAgent;
31
import org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
32
import org.apache.iotdb.db.service.ResourcesInformationHolder;
33
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
34

35
import org.slf4j.Logger;
36
import org.slf4j.LoggerFactory;
37

38
import java.util.concurrent.atomic.AtomicBoolean;
39

40
public class PipeRuntimeAgent implements IService {
1✔
41

42
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
1✔
43
  private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
1✔
44

45
  private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
1✔
46

47
  private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
1✔
48
      new SimpleConsensusProgressIndexAssigner();
49

50
  //////////////////////////// System Service Interface ////////////////////////////
51

52
  public synchronized void preparePipeResources(
53
      ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
54
    PipeAgent.receiver().cleanPipeReceiverDir();
×
55
    PipeHardlinkFileDirStartupCleaner.clean();
×
56
    PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
×
57
    simpleConsensusProgressIndexAssigner.start();
×
58
  }
×
59

60
  @Override
61
  public synchronized void start() throws StartupException {
62
    PipeConfig.getInstance().printAllConfigs();
×
63
    PipeAgentLauncher.launchPipeTaskAgent();
×
64

65
    isShutdown.set(false);
×
66
  }
×
67

68
  @Override
69
  public synchronized void stop() {
70
    if (isShutdown.get()) {
×
71
      return;
×
72
    }
73
    isShutdown.set(true);
×
74

75
    PipeAgent.task().dropAllPipeTasks();
×
76
  }
×
77

78
  public boolean isShutdown() {
79
    return isShutdown.get();
×
80
  }
81

82
  @Override
83
  public ServiceType getID() {
84
    return ServiceType.PIPE_RUNTIME_AGENT;
×
85
  }
86

87
  ////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
88

89
  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) {
90
    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
1✔
91
  }
1✔
92

93
  ////////////////////// Recover ProgressIndex Assigner //////////////////////
94

95
  public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource tsFileResource) {
96
    tsFileResource.updateProgressIndex(
1✔
97
        new RecoverProgressIndex(
98
            DATA_NODE_ID,
99
            simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
1✔
100
  }
1✔
101

102
  //////////////////////////// Runtime Exception Handlers ////////////////////////////
103

104
  public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
105
    LOGGER.warn(
×
106
        "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
107
        pipeTaskMeta,
108
        pipeRuntimeException.getMessage(),
×
109
        pipeRuntimeException);
110
    pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
×
111
  }
×
112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc