• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9647

pending completion
#9647

push

travis_ci

web-flow
[IOTDB-6075] Pipe: File resource races when different tsfile load operations concurrently modify the same tsfile at receiver (#10629)

* Add a parameter in iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties to control the connection timeout.
* Set the pipe connection timeout between sender and receiver to 15 mins to allow long time-cost load operation.
* Redesign the pipe receiver's dir to avoid file resource races when different tsfile load operations concurrently modify the same tsfile.

184 of 184 new or added lines in 9 files covered. (100.0%)

79058 of 165585 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.08
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.pipe.config;
21

22
import org.apache.iotdb.commons.conf.CommonConfig;
23
import org.apache.iotdb.commons.conf.CommonDescriptor;
24

25
import org.slf4j.Logger;
26
import org.slf4j.LoggerFactory;
27

28
public class PipeConfig {
29

30
  private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
1✔
31

32
  /////////////////////////////// File ///////////////////////////////
33

34
  public String getPipeHardlinkTsFileDirName() {
35
    return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
1✔
36
  }
37

38
  /////////////////////////////// Tablet ///////////////////////////////
39

40
  public int getPipeDataStructureTabletRowSize() {
41
    return COMMON_CONFIG.getPipeDataStructureTabletRowSize();
1✔
42
  }
43

44
  /////////////////////////////// Subtask Executor ///////////////////////////////
45

46
  public int getPipeSubtaskExecutorMaxThreadNum() {
47
    return COMMON_CONFIG.getPipeSubtaskExecutorMaxThreadNum();
1✔
48
  }
49

50
  public int getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
51
    return COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount();
1✔
52
  }
53

54
  public long getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration() {
55
    return COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration();
1✔
56
  }
57

58
  public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
59
    return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
1✔
60
  }
61

62
  /////////////////////////////// Extractor ///////////////////////////////
63

64
  public int getPipeExtractorAssignerDisruptorRingBufferSize() {
65
    return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferSize();
1✔
66
  }
67

68
  public int getPipeExtractorMatcherCacheSize() {
69
    return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
1✔
70
  }
71

72
  public int getPipeExtractorPendingQueueCapacity() {
73
    return COMMON_CONFIG.getPipeExtractorPendingQueueCapacity();
×
74
  }
75

76
  public int getPipeExtractorPendingQueueTabletLimit() {
77
    return COMMON_CONFIG.getPipeExtractorPendingQueueTabletLimit();
1✔
78
  }
79

80
  /////////////////////////////// Connector ///////////////////////////////
81

82
  public long getPipeConnectorTimeoutMs() {
83
    return COMMON_CONFIG.getPipeConnectorTimeoutMs();
×
84
  }
85

86
  public int getPipeConnectorReadFileBufferSize() {
87
    return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
×
88
  }
89

90
  public long getPipeConnectorRetryIntervalMs() {
91
    return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
×
92
  }
93

94
  public int getPipeConnectorPendingQueueSize() {
95
    return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
×
96
  }
97

98
  /////////////////////////////// Meta Consistency ///////////////////////////////
99

100
  public boolean isSeperatedPipeHeartbeatEnabled() {
101
    return COMMON_CONFIG.isSeperatedPipeHeartbeatEnabled();
×
102
  }
103

104
  public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() {
105
    return COMMON_CONFIG.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
×
106
  }
107

108
  public long getPipeMetaSyncerInitialSyncDelayMinutes() {
109
    return COMMON_CONFIG.getPipeMetaSyncerInitialSyncDelayMinutes();
×
110
  }
111

112
  public long getPipeMetaSyncerSyncIntervalMinutes() {
113
    return COMMON_CONFIG.getPipeMetaSyncerSyncIntervalMinutes();
×
114
  }
115

116
  public long getPipeMetaSyncerAutoRestartPipeCheckIntervalRound() {
117
    return COMMON_CONFIG.getPipeMetaSyncerAutoRestartPipeCheckIntervalRound();
×
118
  }
119

120
  public boolean getPipeAutoRestartEnabled() {
121
    return COMMON_CONFIG.getPipeAutoRestartEnabled();
×
122
  }
123

124
  /////////////////////////////// Utils ///////////////////////////////
125

126
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfig.class);
1✔
127

128
  public void printAllConfigs() {
129
    LOGGER.info("PipeHardlinkTsFileDirName: {}", getPipeHardlinkTsFileDirName());
×
130

131
    LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", getPipeSubtaskExecutorMaxThreadNum());
×
132
    LOGGER.info(
×
133
        "PipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount: {}",
134
        getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount());
×
135
    LOGGER.info(
×
136
        "PipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration: {}",
137
        getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration());
×
138
    LOGGER.info(
×
139
        "PipeSubtaskExecutorPendingQueueMaxBlockingTimeMs: {}",
140
        getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs());
×
141

142
    LOGGER.info(
×
143
        "PipeExtractorAssignerDisruptorRingBufferSize: {}",
144
        getPipeExtractorAssignerDisruptorRingBufferSize());
×
145
    LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeExtractorMatcherCacheSize());
×
146
    LOGGER.info("PipeExtractorPendingQueueCapacity: {}", getPipeExtractorPendingQueueCapacity());
×
147
    LOGGER.info(
×
148
        "PipeExtractorPendingQueueTabletLimit: {}", getPipeExtractorPendingQueueTabletLimit());
×
149

150
    LOGGER.info("PipeConnectorReadFileBufferSize: {}", getPipeConnectorReadFileBufferSize());
×
151
    LOGGER.info("PipeConnectorRetryIntervalMs: {}", getPipeConnectorRetryIntervalMs());
×
152
    LOGGER.info("PipeConnectorPendingQueueSize: {}", getPipeConnectorPendingQueueSize());
×
153

154
    LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled());
×
155
    LOGGER.info(
×
156
        "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
157
        getPipeHeartbeatIntervalSecondsForCollectingPipeMeta());
×
158
    LOGGER.info(
×
159
        "PipeMetaSyncerInitialSyncDelayMinutes: {}", getPipeMetaSyncerInitialSyncDelayMinutes());
×
160
    LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}", getPipeMetaSyncerSyncIntervalMinutes());
×
161
  }
×
162

163
  /////////////////////////////// Singleton ///////////////////////////////
164

165
  private PipeConfig() {}
166

167
  public static PipeConfig getInstance() {
168
    return PipeConfigHolder.INSTANCE;
1✔
169
  }
170

171
  private static class PipeConfigHolder {
172
    private static final PipeConfig INSTANCE = new PipeConfig();
1✔
173
  }
174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc