• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9647

pending completion
#9647

push

travis_ci

web-flow
[IOTDB-6075] Pipe: File resource races when different tsfile load operations concurrently modify the same tsfile at receiver (#10629)

* Add a parameter in iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties to control the connection timeout.
* Set the pipe connection timeout between sender and receiver to 15 mins to allow long time-cost load operation.
* Redesign the pipe receiver's dir to avoid file resource races when different tsfile load operations concurrently modify the same tsfile.

184 of 184 new or added lines in 9 files covered. (100.0%)

79058 of 165585 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.31
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.connector.v1;
21

22
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
23
import org.apache.iotdb.commons.conf.CommonConfig;
24
import org.apache.iotdb.commons.conf.CommonDescriptor;
25
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
27
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
28
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
29
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
30
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
31
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
32
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
33
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
34
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
35
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
36
import org.apache.iotdb.pipe.api.PipeConnector;
37
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
38
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
39
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
40
import org.apache.iotdb.pipe.api.event.Event;
41
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
42
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
43
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
44
import org.apache.iotdb.pipe.api.exception.PipeException;
45
import org.apache.iotdb.rpc.TSStatusCode;
46
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
47

48
import org.apache.commons.lang.NotImplementedException;
49
import org.apache.thrift.TException;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52

53
import java.io.File;
54
import java.io.IOException;
55
import java.io.RandomAccessFile;
56
import java.util.Arrays;
57

58
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
59
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
60

61
public class IoTDBThriftConnectorV1 implements PipeConnector {
1✔
62

63
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
1✔
64

65
  private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
1✔
66

67
  private String ipAddress;
68
  private int port;
69

70
  private IoTDBThriftConnectorClient client;
71

72
  @Override
73
  public void validate(PipeParameterValidator validator) throws Exception {
74
    validator
1✔
75
        .validateRequiredAttribute(CONNECTOR_IOTDB_IP_KEY)
1✔
76
        .validateRequiredAttribute(CONNECTOR_IOTDB_PORT_KEY);
1✔
77
  }
1✔
78

79
  @Override
80
  public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
81
      throws Exception {
82
    this.ipAddress = parameters.getString(CONNECTOR_IOTDB_IP_KEY);
×
83
    this.port = parameters.getInt(CONNECTOR_IOTDB_PORT_KEY);
×
84
  }
×
85

86
  @Override
87
  public void handshake() throws Exception {
88
    if (client != null) {
×
89
      try {
90
        client.close();
×
91
      } catch (Exception e) {
×
92
        LOGGER.warn("Close client error, because: {}", e.getMessage(), e);
×
93
      }
×
94
    }
95

96
    client =
×
97
        new IoTDBThriftConnectorClient(
98
            new ThriftClientProperty.Builder()
99
                .setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
×
100
                .setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
×
101
                .build(),
×
102
            ipAddress,
103
            port);
104

105
    try {
106
      final TPipeTransferResp resp =
×
107
          client.pipeTransfer(
×
108
              PipeTransferHandshakeReq.toTPipeTransferReq(
×
109
                  CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
×
110
      if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
111
        throw new PipeException(String.format("Handshake error, result status %s.", resp.status));
×
112
      } else {
113
        LOGGER.info("Handshake success. Target server ip: {}, port: {}", ipAddress, port);
×
114
      }
115
    } catch (TException e) {
×
116
      throw new PipeConnectionException(
×
117
          String.format(
×
118
              "Connect to receiver %s:%s error, because: %s", ipAddress, port, e.getMessage()),
×
119
          e);
120
    }
×
121
  }
×
122

123
  @Override
124
  public void heartbeat() throws Exception {
125
    // Do nothing
126
  }
×
127

128
  @Override
129
  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
130
    // PipeProcessor can change the type of TabletInsertionEvent
131
    try {
132
      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
×
133
        doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
×
134
      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
×
135
        doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
×
136
      } else {
137
        throw new NotImplementedException(
×
138
            "IoTDBThriftConnectorV1 only support "
139
                + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
140
      }
141
    } catch (TException e) {
×
142
      throw new PipeConnectionException(
×
143
          String.format(
×
144
              "Network error when transfer tablet insertion event %s, because %s.",
145
              tabletInsertionEvent, e.getMessage()),
×
146
          e);
147
    }
×
148
  }
×
149

150
  @Override
151
  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
152
    // PipeProcessor can change the type of TabletInsertionEvent
153
    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
×
154
      throw new NotImplementedException(
×
155
          "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.");
156
    }
157

158
    try {
159
      doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
×
160
    } catch (TException e) {
×
161
      throw new PipeConnectionException(
×
162
          String.format(
×
163
              "Network error when transfer tsfile insertion event %s, because %s.",
164
              tsFileInsertionEvent, e.getMessage()),
×
165
          e);
166
    }
×
167
  }
×
168

169
  @Override
170
  public void transfer(Event event) {
171
    LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
×
172
  }
×
173

174
  private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
175
      throws PipeException, TException, WALPipeException {
176
    final TPipeTransferResp resp =
×
177
        client.pipeTransfer(
×
178
            PipeTransferInsertNodeReq.toTPipeTransferReq(
×
179
                pipeInsertNodeTabletInsertionEvent.getInsertNode()));
×
180

181
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
182
      throw new PipeException(
×
183
          String.format(
×
184
              "Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
185
              pipeInsertNodeTabletInsertionEvent, resp.status));
186
    }
187
  }
×
188

189
  private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
190
      throws PipeException, TException, IOException {
191
    final TPipeTransferResp resp =
×
192
        client.pipeTransfer(
×
193
            PipeTransferTabletReq.toTPipeTransferReq(
×
194
                pipeRawTabletInsertionEvent.convertToTablet(),
×
195
                pipeRawTabletInsertionEvent.isAligned()));
×
196

197
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
198
      throw new PipeException(
×
199
          String.format(
×
200
              "Transfer PipeRawTabletInsertionEvent %s error, result status %s",
201
              pipeRawTabletInsertionEvent, resp.status));
202
    }
203
  }
×
204

205
  private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
206
      throws PipeException, TException, InterruptedException, IOException {
207
    pipeTsFileInsertionEvent.waitForTsFileClose();
×
208

209
    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
×
210

211
    // 1. Transfer file piece by piece
212
    final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
×
213
    final byte[] readBuffer = new byte[readFileBufferSize];
×
214
    long position = 0;
×
215
    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
×
216
      while (true) {
217
        final int readLength = reader.read(readBuffer);
×
218
        if (readLength == -1) {
×
219
          break;
×
220
        }
221

222
        final PipeTransferFilePieceResp resp =
×
223
            PipeTransferFilePieceResp.fromTPipeTransferResp(
×
224
                client.pipeTransfer(
×
225
                    PipeTransferFilePieceReq.toTPipeTransferReq(
×
226
                        tsFile.getName(),
×
227
                        position,
228
                        readLength == readFileBufferSize
×
229
                            ? readBuffer
×
230
                            : Arrays.copyOfRange(readBuffer, 0, readLength))));
×
231
        position += readLength;
×
232

233
        // This case only happens when the connection is broken, and the connector is reconnected
234
        // to the receiver, then the receiver will redirect the file position to the last position
235
        if (resp.getStatus().getCode()
×
236
            == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
×
237
          position = resp.getEndWritingOffset();
×
238
          reader.seek(position);
×
239
          LOGGER.info("Redirect file position to {}.", position);
×
240
          continue;
×
241
        }
242

243
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
244
          throw new PipeException(
×
245
              String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus()));
×
246
        }
247
      }
×
248
    }
249

250
    // 2. Transfer file seal signal, which means the file is transferred completely
251
    final TPipeTransferResp resp =
×
252
        client.pipeTransfer(
×
253
            PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
×
254
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
255
      throw new PipeException(
×
256
          String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()));
×
257
    } else {
258
      LOGGER.info("Successfully transferred file {}.", tsFile);
×
259
    }
260
  }
×
261

262
  @Override
263
  public void close() throws Exception {
264
    if (client != null) {
×
265
      client.close();
×
266
    }
267
  }
×
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc