• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10018

07 Sep 2023 05:00AM UTC coverage: 47.717% (+0.03%) from 47.691%
#10018

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074) (#11075)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>
(cherry picked from commit ac0dd9d31)

1 of 1 new or added line in 1 file covered. (100.0%)

80262 of 168204 relevant lines covered (47.72%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.76
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.storageengine.dataregion.wal;
21

22
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23
import org.apache.iotdb.commons.concurrent.ThreadName;
24
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
25
import org.apache.iotdb.commons.conf.IoTDBConstant;
26
import org.apache.iotdb.commons.exception.StartupException;
27
import org.apache.iotdb.commons.service.IService;
28
import org.apache.iotdb.commons.service.ServiceType;
29
import org.apache.iotdb.commons.utils.TestOnly;
30
import org.apache.iotdb.consensus.ConsensusFactory;
31
import org.apache.iotdb.db.conf.IoTDBConfig;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.service.metrics.WritingMetrics;
34
import org.apache.iotdb.db.storageengine.dataregion.wal.allocation.ElasticStrategy;
35
import org.apache.iotdb.db.storageengine.dataregion.wal.allocation.FirstCreateStrategy;
36
import org.apache.iotdb.db.storageengine.dataregion.wal.allocation.NodeAllocationStrategy;
37
import org.apache.iotdb.db.storageengine.dataregion.wal.allocation.RoundRobinStrategy;
38
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
39
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALFakeNode;
40
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
41
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
42

43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
import java.util.List;
47
import java.util.concurrent.ExecutionException;
48
import java.util.concurrent.ExecutorService;
49
import java.util.concurrent.Future;
50
import java.util.concurrent.ScheduledExecutorService;
51
import java.util.concurrent.TimeUnit;
52
import java.util.concurrent.atomic.AtomicLong;
53

54
/** This class is used to manage and allocate wal nodes. */
55
public class WALManager implements IService {
56
  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
1✔
57
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
58

59
  // manage all wal nodes and decide how to allocate them
60
  private final NodeAllocationStrategy walNodesManager;
61
  // single thread to delete old .wal files
62
  private ScheduledExecutorService walDeleteThread;
63
  // total disk usage of wal files
64
  private final AtomicLong totalDiskUsage = new AtomicLong();
1✔
65
  // total number of wal files
66
  private final AtomicLong totalFileNum = new AtomicLong();
1✔
67

68
  private WALManager() {
1✔
69
    if (config.isClusterMode()
1✔
70
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
1✔
71
      walNodesManager = new FirstCreateStrategy();
×
72
    } else if (config.getMaxWalNodesNum() == 0) {
1✔
73
      walNodesManager = new ElasticStrategy();
1✔
74
    } else {
75
      walNodesManager = new RoundRobinStrategy(config.getMaxWalNodesNum());
×
76
    }
77
  }
1✔
78

79
  public static String getApplicantUniqueId(String storageGroupName, boolean sequence) {
80
    return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
1✔
81
        ? storageGroupName
×
82
        : storageGroupName
83
            + IoTDBConstant.FILE_NAME_SEPARATOR
84
            + (sequence ? "sequence" : "unsequence");
1✔
85
  }
86

87
  /** Apply for a wal node. */
88
  public IWALNode applyForWALNode(String applicantUniqueId) {
89
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
90
      return WALFakeNode.getSuccessInstance();
1✔
91
    }
92

93
    return walNodesManager.applyForWALNode(applicantUniqueId);
1✔
94
  }
95

96
  /** WAL node will be registered only when using iot consensus protocol. */
97
  public void registerWALNode(
98
      String applicantUniqueId, String logDirectory, long startFileVersion, long startSearchIndex) {
99
    if (config.getWalMode() == WALMode.DISABLE
×
100
        || !config.isClusterMode()
×
101
        || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
102
      return;
×
103
    }
104

105
    ((FirstCreateStrategy) walNodesManager)
×
106
        .registerWALNode(applicantUniqueId, logDirectory, startFileVersion, startSearchIndex);
×
107
    WritingMetrics.getInstance().createWALNodeInfoMetrics(applicantUniqueId);
×
108
  }
×
109

110
  /** WAL node will be deleted only when using iot consensus protocol. */
111
  public void deleteWALNode(String applicantUniqueId) {
112
    if (config.getWalMode() == WALMode.DISABLE
×
113
        || !config.isClusterMode()
×
114
        || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
115
      return;
×
116
    }
117

118
    ((FirstCreateStrategy) walNodesManager).deleteWALNode(applicantUniqueId);
×
119
    WritingMetrics.getInstance().removeWALNodeInfoMetrics(applicantUniqueId);
×
120
  }
×
121

122
  @Override
123
  public void start() throws StartupException {
124
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
125
      return;
×
126
    }
127

128
    try {
129
      registerScheduleTask(
1✔
130
          config.getDeleteWalFilesPeriodInMs(), config.getDeleteWalFilesPeriodInMs());
1✔
131
    } catch (Exception e) {
×
132
      throw new StartupException(this.getID().getName(), e.getMessage());
×
133
    }
1✔
134
  }
1✔
135

136
  /** Reboot wal delete thread to hot modify delete wal period. */
137
  public void rebootWALDeleteThread() {
138
    if (config.getWalMode() == WALMode.DISABLE) {
×
139
      return;
×
140
    }
141

142
    logger.info("Start rebooting wal delete thread.");
×
143
    if (walDeleteThread != null) {
×
144
      shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
×
145
    }
146
    logger.info("Stop wal delete thread successfully, and now restart it.");
×
147
    registerScheduleTask(0, config.getDeleteWalFilesPeriodInMs());
×
148
    logger.info(
×
149
        "Reboot wal delete thread successfully, current period is {} ms",
150
        config.getDeleteWalFilesPeriodInMs());
×
151
  }
×
152

153
  /** Submit delete outdated wal files task and wait for result. */
154
  public void deleteOutdatedWALFiles() {
155
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
156
      return;
×
157
    }
158

159
    if (walDeleteThread == null) {
1✔
160
      return;
×
161
    }
162

163
    Future<?> future = walDeleteThread.submit(this::deleteOutdatedFiles);
1✔
164
    try {
165
      future.get();
1✔
166
    } catch (ExecutionException e) {
×
167
      logger.warn("Exception occurs when deleting wal files", e);
×
168
    } catch (InterruptedException e) {
×
169
      logger.warn("Interrupted when deleting wal files", e);
×
170
      Thread.currentThread().interrupt();
×
171
    }
1✔
172
  }
1✔
173

174
  private void deleteOutdatedFiles() {
175
    // Normally, only need to delete the expired file once. When the WAL disk file size exceeds the
176
    // threshold, the system continues to delete expired files until the disk size is smaller than
177
    // the threshold.
178
    boolean firstLoop = true;
1✔
179
    while (firstLoop || shouldThrottle()) {
1✔
180
      List<WALNode> walNodes = walNodesManager.getNodesSnapshot();
1✔
181
      walNodes.sort((node1, node2) -> Long.compare(node2.getDiskUsage(), node1.getDiskUsage()));
1✔
182
      for (WALNode walNode : walNodes) {
1✔
183
        walNode.deleteOutdatedFiles();
1✔
184
      }
1✔
185
      if (firstLoop && shouldThrottle()) {
1✔
186
        logger.warn(
×
187
            "WAL disk usage {} is larger than the iot_consensus_throttle_threshold_in_byte {}, please check your write load, iot consensus and the pipe module. It's better to allocate more disk for WAL.",
188
            getTotalDiskUsage(),
×
189
            getThrottleThreshold());
×
190
      }
191
      firstLoop = false;
1✔
192
    }
1✔
193
  }
1✔
194

195
  /** Wait until all write-ahead logs are flushed. */
196
  public void waitAllWALFlushed() {
197
    if (config.getWalMode() == WALMode.DISABLE) {
×
198
      return;
×
199
    }
200

201
    for (WALNode walNode : walNodesManager.getNodesSnapshot()) {
×
202
      while (!walNode.isAllWALEntriesConsumed()) {
×
203
        try {
204
          Thread.sleep(50);
×
205
        } catch (InterruptedException e) {
×
206
          logger.error("Interrupted when waiting for all write-ahead logs flushed.");
×
207
          Thread.currentThread().interrupt();
×
208
        }
×
209
      }
210
    }
×
211
  }
×
212

213
  public boolean shouldThrottle() {
214
    return getTotalDiskUsage() >= getThrottleThreshold();
1✔
215
  }
216

217
  public long getThrottleThreshold() {
218
    return (long) (config.getThrottleThreshold() * 0.8);
1✔
219
  }
220

221
  public long getTotalDiskUsage() {
222
    return totalDiskUsage.get();
1✔
223
  }
224

225
  public long getWALNodesNum() {
226
    return walNodesManager.getNodesNum();
×
227
  }
228

229
  public void addTotalDiskUsage(long size) {
230
    totalDiskUsage.accumulateAndGet(size, Long::sum);
1✔
231
  }
1✔
232

233
  public void subtractTotalDiskUsage(long size) {
234
    totalDiskUsage.accumulateAndGet(size, (x, y) -> x - y);
1✔
235
  }
1✔
236

237
  public long getTotalFileNum() {
238
    return totalFileNum.get();
×
239
  }
240

241
  public void addTotalFileNum(long num) {
242
    totalFileNum.accumulateAndGet(num, Long::sum);
1✔
243
  }
1✔
244

245
  public void subtractTotalFileNum(long num) {
246
    totalFileNum.accumulateAndGet(num, (x, y) -> x - y);
1✔
247
  }
1✔
248

249
  @Override
250
  public void stop() {
251
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
252
      return;
×
253
    }
254

255
    if (walDeleteThread != null) {
1✔
256
      shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
1✔
257
      walDeleteThread = null;
1✔
258
    }
259
    clear();
1✔
260
  }
1✔
261

262
  private void shutdownThread(ExecutorService thread, ThreadName threadName) {
263
    thread.shutdown();
1✔
264
    try {
265
      if (!thread.awaitTermination(30, TimeUnit.SECONDS)) {
1✔
266
        logger.warn("Waiting thread {} to be terminated is timeout", threadName.getName());
×
267
      }
268
    } catch (InterruptedException e) {
×
269
      logger.warn("Thread {} still doesn't exit after 30s", threadName.getName());
×
270
      Thread.currentThread().interrupt();
×
271
    }
1✔
272
  }
1✔
273

274
  private void registerScheduleTask(long initDelayMs, long periodMs) {
275
    walDeleteThread =
1✔
276
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.WAL_DELETE.getName());
1✔
277
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
278
        walDeleteThread, this::deleteOutdatedFiles, initDelayMs, periodMs, TimeUnit.MILLISECONDS);
279
  }
1✔
280

281
  @TestOnly
282
  public void clear() {
283
    totalDiskUsage.set(0);
1✔
284
    walNodesManager.clear();
1✔
285
  }
1✔
286

287
  @Override
288
  public ServiceType getID() {
289
    return ServiceType.WAL_SERVICE;
×
290
  }
291

292
  public static WALManager getInstance() {
293
    return InstanceHolder.INSTANCE;
1✔
294
  }
295

296
  private static class InstanceHolder {
297
    private InstanceHolder() {}
298

299
    private static final WALManager INSTANCE = new WALManager();
1✔
300
  }
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc