• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9817

pending completion
#9817

push

travis_ci

web-flow
[To rel/1.2] Enhance the event notification mechanism of StatisticsService (#10830)

14 of 14 new or added lines in 4 files covered. (100.0%)

79676 of 165756 relevant lines covered (48.07%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.98
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.receiver;
21

22
import org.apache.iotdb.db.conf.IoTDBDescriptor;
23
import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
24
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver;
25
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
26
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
27
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
28
import org.apache.iotdb.rpc.RpcUtils;
29
import org.apache.iotdb.rpc.TSStatusCode;
30
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
31
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
32

33
import org.apache.commons.io.FileUtils;
34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
36

37
import java.io.File;
38
import java.io.IOException;
39

40
public class PipeReceiverAgent {
1✔
41

42
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverAgent.class);
1✔
43

44
  private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>();
1✔
45

46
  public TPipeTransferResp receive(
47
      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
48
    final byte reqVersion = req.getVersion();
×
49
    if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
×
50
      return getReceiver(reqVersion).receive(req, partitionFetcher, schemaFetcher);
×
51
    } else {
52
      return new TPipeTransferResp(
×
53
          RpcUtils.getStatus(
×
54
              TSStatusCode.PIPE_VERSION_ERROR,
55
              String.format("Unsupported pipe version %d", reqVersion)));
×
56
    }
57
  }
58

59
  private IoTDBThriftReceiver getReceiver(byte reqVersion) {
60
    if (receiverThreadLocal.get() == null) {
×
61
      return setAndGetReceiver(reqVersion);
×
62
    }
63

64
    final byte receiverThreadLocalVersion = receiverThreadLocal.get().getVersion().getVersion();
×
65
    if (receiverThreadLocalVersion != reqVersion) {
×
66
      LOGGER.warn(
×
67
          "The receiver version {} is different from the sender version {},"
68
              + " the receiver will be reset to the sender version.",
69
          receiverThreadLocalVersion,
×
70
          reqVersion);
×
71
      receiverThreadLocal.get().handleExit();
×
72
      receiverThreadLocal.remove();
×
73
      return setAndGetReceiver(reqVersion);
×
74
    }
75

76
    return receiverThreadLocal.get();
×
77
  }
78

79
  private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
80
    if (reqVersion == IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
×
81
      receiverThreadLocal.set(new IoTDBThriftReceiverV1());
×
82
    } else {
83
      throw new UnsupportedOperationException(
×
84
          String.format("Unsupported pipe version %d", reqVersion));
×
85
    }
86
    return receiverThreadLocal.get();
×
87
  }
88

89
  public void handleClientExit() {
90
    final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
×
91
    if (receiver != null) {
×
92
      receiver.handleExit();
×
93
      receiverThreadLocal.remove();
×
94
    }
95
  }
×
96

97
  public void cleanPipeReceiverDir() {
98
    final File receiverFileDir =
×
99
        new File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
×
100

101
    try {
102
      FileUtils.deleteDirectory(receiverFileDir);
×
103
      LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
×
104
    } catch (Exception e) {
×
105
      LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
×
106
    }
×
107

108
    try {
109
      FileUtils.forceMkdir(receiverFileDir);
×
110
      LOGGER.info("Create pipe receiver dir {} successfully.", receiverFileDir);
×
111
    } catch (IOException e) {
×
112
      LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
×
113
    }
×
114
  }
×
115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc