• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9749

pending completion
#9749

push

travis_ci

web-flow
[IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)

This commit fixes 2 issues:

* Subscrption running with the pattern option may cause OOM

  How to reproduce:

  1. execute sql:
  ```
  create pipe test1
  with extractor (
    'extractor.history.enable'='false',
    'extractor'='iotdb-extractor',
    'extractor.realtime.mode'='log',
    'extractor.pattern'='root'
  )
  with connector (
    'connector'='iotdb-thrift-connector-v1',
    'connector.node-urls'='127.0.0.1:6668'
  );

  start pipe test1;
  ```

  2. run benchmark: 1 database, 10 devices, 10 measurements.

* java.lang.UnsupportedOperationException: Unsupported PipeRuntimeException type 0 caused by de/ser issue of RecoverProgressIndex
  <img width="1194" alt="image" src="https://github.com/apache/iotdb/assets/30497621/d2d35ee7-293b-4594-92f3-fc10b2aa8313">

(cherry picked from commit f0f168249)

35 of 35 new or added lines in 7 files covered. (100.0%)

79409 of 165601 relevant lines covered (47.95%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.61
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.connector.v1;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
24
import org.apache.iotdb.commons.conf.CommonDescriptor;
25
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26
import org.apache.iotdb.db.pipe.connector.base.IoTDBThriftConnector;
27
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
28
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
29
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
30
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
31
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
32
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
33
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
34
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
35
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
36
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
37
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
38
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
39
import org.apache.iotdb.pipe.api.event.Event;
40
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
41
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
42
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
43
import org.apache.iotdb.pipe.api.exception.PipeException;
44
import org.apache.iotdb.rpc.TSStatusCode;
45
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
46

47
import org.apache.thrift.TException;
48
import org.slf4j.Logger;
49
import org.slf4j.LoggerFactory;
50

51
import java.io.File;
52
import java.io.IOException;
53
import java.io.RandomAccessFile;
54
import java.util.ArrayList;
55
import java.util.Arrays;
56
import java.util.List;
57

58
public class IoTDBThriftConnectorV1 extends IoTDBThriftConnector {
59

60
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
1✔
61

62
  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
1✔
63

64
  private final List<IoTDBThriftConnectorClient> clients = new ArrayList<>();
1✔
65
  private final List<Boolean> isClientAlive = new ArrayList<>();
1✔
66

67
  private long currentClientIndex = 0;
1✔
68

69
  public IoTDBThriftConnectorV1() {
1✔
70
    // Do nothing
71
  }
1✔
72

73
  public IoTDBThriftConnectorV1(String ipAddress, int port) {
×
74
    nodeUrls.add(new TEndPoint(ipAddress, port));
×
75
  }
×
76

77
  @Override
78
  public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
79
      throws Exception {
80
    super.customize(parameters, configuration);
×
81
    for (int i = 0; i < nodeUrls.size(); i++) {
×
82
      isClientAlive.add(false);
×
83
      clients.add(null);
×
84
    }
85
  }
×
86

87
  @Override
88
  public void handshake() throws Exception {
89
    for (int i = 0; i < clients.size(); i++) {
×
90
      if (isClientAlive.get(i)) {
×
91
        continue;
×
92
      }
93

94
      final String ip = nodeUrls.get(i).getIp();
×
95
      final int port = nodeUrls.get(i).getPort();
×
96

97
      // close the client if necessary
98
      if (clients.get(i) != null) {
×
99
        try {
100
          clients.set(i, null).close();
×
101
        } catch (Exception e) {
×
102
          LOGGER.warn(
×
103
              "Failed to close client with target server ip: {}, port: {}, because: {}. Ignore it.",
104
              ip,
105
              port,
×
106
              e.getMessage());
×
107
        }
×
108
      }
109

110
      clients.set(
×
111
          i,
112
          new IoTDBThriftConnectorClient(
113
              new ThriftClientProperty.Builder()
114
                  .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorTimeoutMs())
×
115
                  .setRpcThriftCompressionEnabled(
×
116
                      PIPE_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled())
×
117
                  .build(),
×
118
              ip,
119
              port));
120

121
      try {
122
        final TPipeTransferResp resp =
×
123
            clients
124
                .get(i)
×
125
                .pipeTransfer(
×
126
                    PipeTransferHandshakeReq.toTPipeTransferReq(
×
127
                        CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
×
128
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
129
          throw new PipeException(String.format("Handshake error, result status %s.", resp.status));
×
130
        } else {
131
          isClientAlive.set(i, true);
×
132
          LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip, port);
×
133
        }
134
      } catch (TException e) {
×
135
        throw new PipeConnectionException(
×
136
            String.format(
×
137
                "Handshake error with target server ip: %s, port: %s, because: %s",
138
                ip, port, e.getMessage()),
×
139
            e);
140
      }
×
141
    }
142

143
    for (int i = 0; i < clients.size(); i++) {
×
144
      if (isClientAlive.get(i)) {
×
145
        return;
×
146
      }
147
    }
148
    throw new PipeConnectionException(
×
149
        String.format("All target servers %s are not available.", nodeUrls));
×
150
  }
151

152
  @Override
153
  public void heartbeat() {
154
    try {
155
      handshake();
×
156
    } catch (Exception e) {
×
157
      LOGGER.warn(
×
158
          "Failed to reconnect to target server, because: {}. Try to reconnect later.",
159
          e.getMessage(),
×
160
          e);
161
    }
×
162
  }
×
163

164
  @Override
165
  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
166
    // PipeProcessor can change the type of TabletInsertionEvent
167

168
    final int clientIndex = nextClientIndex();
×
169
    final IoTDBThriftConnectorClient client = clients.get(clientIndex);
×
170

171
    try {
172
      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
×
173
        doTransfer(client, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
×
174
      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
×
175
        doTransfer(client, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
×
176
      } else {
177
        LOGGER.warn(
×
178
            "IoTDBThriftConnectorV1 only support "
179
                + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
180
                + "Ignore {}.",
181
            tabletInsertionEvent);
182
      }
183
    } catch (TException e) {
×
184
      isClientAlive.set(clientIndex, false);
×
185

186
      throw new PipeConnectionException(
×
187
          String.format(
×
188
              "Network error when transfer tablet insertion event %s, because %s.",
189
              tabletInsertionEvent, e.getMessage()),
×
190
          e);
191
    }
×
192
  }
×
193

194
  @Override
195
  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
196
    // PipeProcessor can change the type of TabletInsertionEvent
197
    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
×
198
      LOGGER.warn(
×
199
          "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent. Ignore {}.",
200
          tsFileInsertionEvent);
201
      return;
×
202
    }
203

204
    final int clientIndex = nextClientIndex();
×
205
    final IoTDBThriftConnectorClient client = clients.get(clientIndex);
×
206

207
    try {
208
      doTransfer(client, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
×
209
    } catch (TException e) {
×
210
      isClientAlive.set(clientIndex, false);
×
211

212
      throw new PipeConnectionException(
×
213
          String.format(
×
214
              "Network error when transfer tsfile insertion event %s, because %s.",
215
              tsFileInsertionEvent, e.getMessage()),
×
216
          e);
217
    }
×
218
  }
×
219

220
  @Override
221
  public void transfer(Event event) {
222
    LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
×
223
  }
×
224

225
  private void doTransfer(
226
      IoTDBThriftConnectorClient client,
227
      PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
228
      throws PipeException, TException, WALPipeException {
229
    final TPipeTransferResp resp =
×
230
        client.pipeTransfer(
×
231
            PipeTransferInsertNodeReq.toTPipeTransferReq(
×
232
                pipeInsertNodeTabletInsertionEvent.getInsertNode()));
×
233

234
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
235
      throw new PipeException(
×
236
          String.format(
×
237
              "Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
238
              pipeInsertNodeTabletInsertionEvent, resp.status));
239
    }
240
  }
×
241

242
  private void doTransfer(
243
      IoTDBThriftConnectorClient client, PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
244
      throws PipeException, TException, IOException {
245
    final TPipeTransferResp resp =
×
246
        client.pipeTransfer(
×
247
            PipeTransferTabletReq.toTPipeTransferReq(
×
248
                pipeRawTabletInsertionEvent.convertToTablet(),
×
249
                pipeRawTabletInsertionEvent.isAligned()));
×
250

251
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
252
      throw new PipeException(
×
253
          String.format(
×
254
              "Transfer PipeRawTabletInsertionEvent %s error, result status %s",
255
              pipeRawTabletInsertionEvent, resp.status));
256
    }
257
  }
×
258

259
  private void doTransfer(
260
      IoTDBThriftConnectorClient client, PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
261
      throws PipeException, TException, InterruptedException, IOException {
262
    pipeTsFileInsertionEvent.waitForTsFileClose();
×
263

264
    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
×
265

266
    // 1. Transfer file piece by piece
267
    final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
×
268
    final byte[] readBuffer = new byte[readFileBufferSize];
×
269
    long position = 0;
×
270
    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
×
271
      while (true) {
272
        final int readLength = reader.read(readBuffer);
×
273
        if (readLength == -1) {
×
274
          break;
×
275
        }
276

277
        final PipeTransferFilePieceResp resp =
×
278
            PipeTransferFilePieceResp.fromTPipeTransferResp(
×
279
                client.pipeTransfer(
×
280
                    PipeTransferFilePieceReq.toTPipeTransferReq(
×
281
                        tsFile.getName(),
×
282
                        position,
283
                        readLength == readFileBufferSize
×
284
                            ? readBuffer
×
285
                            : Arrays.copyOfRange(readBuffer, 0, readLength))));
×
286
        position += readLength;
×
287

288
        // This case only happens when the connection is broken, and the connector is reconnected
289
        // to the receiver, then the receiver will redirect the file position to the last position
290
        if (resp.getStatus().getCode()
×
291
            == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
×
292
          position = resp.getEndWritingOffset();
×
293
          reader.seek(position);
×
294
          LOGGER.info("Redirect file position to {}.", position);
×
295
          continue;
×
296
        }
297

298
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
299
          throw new PipeException(
×
300
              String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus()));
×
301
        }
302
      }
×
303
    }
304

305
    // 2. Transfer file seal signal, which means the file is transferred completely
306
    final TPipeTransferResp resp =
×
307
        client.pipeTransfer(
×
308
            PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
×
309
    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
310
      throw new PipeException(
×
311
          String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()));
×
312
    } else {
313
      LOGGER.info("Successfully transferred file {}.", tsFile);
×
314
    }
315
  }
×
316

317
  private int nextClientIndex() {
318
    final int clientSize = clients.size();
×
319
    // Round-robin, find the next alive client
320
    for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
×
321
      final int clientIndex = (int) (currentClientIndex++ % clientSize);
×
322
      if (isClientAlive.get(clientIndex)) {
×
323
        return clientIndex;
×
324
      }
325
    }
326
    throw new PipeConnectionException(
×
327
        "All clients are dead, please check the connection to the receiver.");
328
  }
329

330
  @Override
331
  public void close() {
332
    for (int i = 0; i < clients.size(); ++i) {
×
333
      try {
334
        if (clients.get(i) != null) {
×
335
          clients.set(i, null).close();
×
336
        }
337
      } catch (Exception e) {
×
338
        LOGGER.warn("Failed to close client {}.", i, e);
×
339
      } finally {
340
        isClientAlive.set(i, false);
×
341
      }
342
    }
343
  }
×
344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc