• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9867

18 Aug 2023 06:05AM UTC coverage: 48.003% (-0.08%) from 48.081%
#9867

push

travis_ci

web-flow
[To rel/1.2] [IOTDB-6115] Fix Limit & Offset push down doesn't take effect while there exist null value

103 of 103 new or added lines in 23 files covered. (100.0%)

79802 of 166243 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.receiver.airgap;
21

22
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
23
import org.apache.iotdb.commons.pipe.config.PipeConfig;
24
import org.apache.iotdb.db.pipe.agent.PipeAgent;
25
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
26
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
27
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
28
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
29
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
30
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
31
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
32
import org.apache.iotdb.rpc.TSStatusCode;
33
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
34
import org.apache.iotdb.tsfile.utils.BytesUtils;
35
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
36

37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
import java.io.BufferedInputStream;
41
import java.io.IOException;
42
import java.io.InputStream;
43
import java.io.OutputStream;
44
import java.net.Socket;
45
import java.nio.ByteBuffer;
46
import java.util.Arrays;
47
import java.util.zip.CRC32;
48

49
import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
50

51
public class IoTDBAirGapReceiver extends WrappedRunnable {
52

53
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapReceiver.class);
×
54

55
  private final Socket socket;
56
  private final long receiverId;
57

58
  private final IoTDBThriftReceiverAgent agent;
59
  private final IPartitionFetcher partitionFetcher;
60
  private final ISchemaFetcher schemaFetcher;
61

62
  public IoTDBAirGapReceiver(Socket socket, long receiverId) {
×
63
    this.socket = socket;
×
64
    this.receiverId = receiverId;
×
65

66
    agent = PipeAgent.receiver().thrift();
×
67
    partitionFetcher = ClusterPartitionFetcher.getInstance();
×
68
    schemaFetcher = ClusterSchemaFetcher.getInstance();
×
69
  }
×
70

71
  @Override
72
  public void runMayThrow() throws Throwable {
73
    socket.setSoTimeout((int) PipeConfig.getInstance().getPipeConnectorTimeoutMs());
×
74
    socket.setKeepAlive(true);
×
75

76
    LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);
×
77

78
    try {
79
      while (!socket.isClosed()) {
×
80
        receive();
×
81
      }
82
      LOGGER.info(
×
83
          "Pipe air gap receiver {} closed because socket is closed. Socket: {}",
84
          receiverId,
×
85
          socket);
86
    } catch (Exception e) {
×
87
      LOGGER.warn(
×
88
          "Pipe air gap receiver {} closed because of exception. Socket: {}",
89
          receiverId,
×
90
          socket,
91
          e);
92
      throw e;
×
93
    } finally {
94
      PipeAgent.receiver().thrift().handleClientExit();
×
95
      socket.close();
×
96
    }
97
  }
×
98

99
  private void receive() throws IOException {
100
    final InputStream inputStream = new BufferedInputStream(socket.getInputStream());
×
101

102
    try {
103
      final byte[] data = readData(inputStream);
×
104

105
      if (!checkSum(data)) {
×
106
        LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
×
107
        fail();
×
108
        return;
×
109
      }
110

111
      // Removed the used checksum
112
      final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN);
×
113
      // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
114
      final AirGapPseudoTPipeTransferRequest req =
×
115
          (AirGapPseudoTPipeTransferRequest)
116
              new AirGapPseudoTPipeTransferRequest()
117
                  .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
×
118
                  .setType(ReadWriteIOUtils.readShort(byteBuffer))
×
119
                  .setBody(byteBuffer.slice());
×
120
      final TPipeTransferResp resp = agent.receive(req, partitionFetcher, schemaFetcher);
×
121

122
      if (resp.getStatus().code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
123
        ok();
×
124
      } else {
125
        LOGGER.warn(
×
126
            "Handle data failed, receiverId: {}, status: {}, req: {}",
127
            receiverId,
×
128
            resp.getStatus(),
×
129
            req);
130
        fail();
×
131
      }
132
    } catch (Exception e) {
×
133
      LOGGER.warn("Exception during handling receiving, receiverId: {}", receiverId, e);
×
134
      fail();
×
135
    }
×
136
  }
×
137

138
  private void ok() throws IOException {
139
    final OutputStream outputStream = socket.getOutputStream();
×
140
    outputStream.write(AirGapOneByteResponse.OK);
×
141
    outputStream.flush();
×
142
  }
×
143

144
  private void fail() throws IOException {
145
    final OutputStream outputStream = socket.getOutputStream();
×
146
    outputStream.write(AirGapOneByteResponse.FAIL);
×
147
    outputStream.flush();
×
148
  }
×
149

150
  private boolean checkSum(byte[] bytes) {
151
    try {
152
      final CRC32 crc32 = new CRC32();
×
153
      crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
×
154
      return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) == crc32.getValue();
×
155
    } catch (Exception e) {
×
156
      // ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
157
      return false;
×
158
    }
159
  }
160

161
  private byte[] readData(InputStream inputStream) throws IOException {
162
    final int length = readLength(inputStream);
×
163

164
    if (length == 0) {
×
165
      // Will fail() after checkSum()
166
      return new byte[0];
×
167
    }
168

169
    final ByteBuffer resultBuffer = ByteBuffer.allocate(length);
×
170
    final byte[] readBuffer = new byte[length];
×
171

172
    int alreadyReadBytes = 0;
×
173
    int currentReadBytes;
174
    while (alreadyReadBytes < length) {
×
175
      currentReadBytes = inputStream.read(readBuffer, 0, length - alreadyReadBytes);
×
176
      resultBuffer.put(readBuffer, 0, currentReadBytes);
×
177
      alreadyReadBytes += currentReadBytes;
×
178
    }
179
    return resultBuffer.array();
×
180
  }
181

182
  /**
183
   * Read the length of the following data. The thread may typically block here when there is no
184
   * data to read.
185
   */
186
  private int readLength(InputStream inputStream) throws IOException {
187
    byte[] lengthBytes0 = new byte[4];
×
188
    if (inputStream.read(lengthBytes0) < 4) {
×
189
      return 0;
×
190
    }
191

192
    // for double check
193
    byte[] lengthBytes1 = new byte[4];
×
194
    if (inputStream.read(lengthBytes1) < 4) {
×
195
      return 0;
×
196
    }
197

198
    return Arrays.equals(lengthBytes0, lengthBytes1) ? BytesUtils.bytesToInt(lengthBytes0) : 0;
×
199
  }
200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc