• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9647

pending completion
#9647

push

travis_ci

web-flow
[IOTDB-6075] Pipe: File resource races when different tsfile load operations concurrently modify the same tsfile at receiver (#10629)

* Add a parameter in iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties to control the connection timeout.
* Set the pipe connection timeout between sender and receiver to 15 mins to allow long time-cost load operation.
* Redesign the pipe receiver's dir to avoid file resource races when different tsfile load operations concurrently modify the same tsfile.

184 of 184 new or added lines in 9 files covered. (100.0%)

79058 of 165585 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.98
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.receiver;
21

22
import org.apache.iotdb.db.conf.IoTDBDescriptor;
23
import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
24
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftReceiverV1;
25
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
26
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
27
import org.apache.iotdb.rpc.RpcUtils;
28
import org.apache.iotdb.rpc.TSStatusCode;
29
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
30
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
31

32
import org.apache.commons.io.FileUtils;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
import java.io.File;
37
import java.io.IOException;
38

39
public class PipeReceiverAgent {
1✔
40

41
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverAgent.class);
1✔
42

43
  private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>();
1✔
44

45
  public TPipeTransferResp receive(
46
      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
47
    final byte reqVersion = req.getVersion();
×
48
    if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
×
49
      return getReceiver(reqVersion).receive(req, partitionFetcher, schemaFetcher);
×
50
    } else {
51
      return new TPipeTransferResp(
×
52
          RpcUtils.getStatus(
×
53
              TSStatusCode.PIPE_VERSION_ERROR,
54
              String.format("Unsupported pipe version %d", reqVersion)));
×
55
    }
56
  }
57

58
  private IoTDBThriftReceiver getReceiver(byte reqVersion) {
59
    if (receiverThreadLocal.get() == null) {
×
60
      return setAndGetReceiver(reqVersion);
×
61
    }
62

63
    final byte receiverThreadLocalVersion = receiverThreadLocal.get().getVersion().getVersion();
×
64
    if (receiverThreadLocalVersion != reqVersion) {
×
65
      LOGGER.warn(
×
66
          "The receiver version {} is different from the sender version {},"
67
              + " the receiver will be reset to the sender version.",
68
          receiverThreadLocalVersion,
×
69
          reqVersion);
×
70
      receiverThreadLocal.get().handleExit();
×
71
      receiverThreadLocal.remove();
×
72
      return setAndGetReceiver(reqVersion);
×
73
    }
74

75
    return receiverThreadLocal.get();
×
76
  }
77

78
  private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
79
    if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
×
80
      receiverThreadLocal.set(new IoTDBThriftReceiverV1());
×
81
    } else {
82
      throw new UnsupportedOperationException(
×
83
          String.format("Unsupported pipe version %d", reqVersion));
×
84
    }
85
    return receiverThreadLocal.get();
×
86
  }
87

88
  public void handleClientExit() {
89
    final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
×
90
    if (receiver != null) {
×
91
      receiver.handleExit();
×
92
      receiverThreadLocal.remove();
×
93
    }
94
  }
×
95

96
  public void cleanPipeReceiverDir() {
97
    final File receiverFileDir =
×
98
        new File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
×
99

100
    try {
101
      FileUtils.deleteDirectory(receiverFileDir);
×
102
      LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
×
103
    } catch (Exception e) {
×
104
      LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
×
105
    }
×
106

107
    try {
108
      FileUtils.forceMkdir(receiverFileDir);
×
109
      LOGGER.info("Create pipe receiver dir {} successfully.", receiverFileDir);
×
110
    } catch (IOException e) {
×
111
      LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
×
112
    }
×
113
  }
×
114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc