• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9999

05 Sep 2023 08:10AM CUT coverage: 47.669% (-0.03%) from 47.697%
#9999

push

travis_ci

web-flow
[IOTDB-6130] Delete data by specific pattern didn't work

80151 of 168139 relevant lines covered (47.67%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.82
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.storageengine.rescon.memory;
21

22
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23
import org.apache.iotdb.commons.concurrent.ThreadName;
24
import org.apache.iotdb.commons.utils.TestOnly;
25
import org.apache.iotdb.db.conf.IoTDBConfig;
26
import org.apache.iotdb.db.conf.IoTDBDescriptor;
27
import org.apache.iotdb.db.exception.WriteProcessRejectException;
28
import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
29
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
30
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
31
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
32
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
33

34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
36

37
import java.util.HashMap;
38
import java.util.Map;
39
import java.util.PriorityQueue;
40
import java.util.concurrent.ExecutorService;
41
import java.util.concurrent.atomic.AtomicInteger;
42
import java.util.concurrent.atomic.AtomicLong;
43

44
public class SystemInfo {
45

46
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
47
  private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class);
1✔
48

49
  private long totalStorageGroupMemCost = 0L;
1✔
50
  private volatile boolean rejected = false;
1✔
51

52
  private long memorySizeForMemtable;
53
  private long memorySizeForCompaction;
54
  private Map<DataRegionInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>();
1✔
55

56
  private long flushingMemTablesCost = 0L;
1✔
57
  private AtomicLong compactionMemoryCost = new AtomicLong(0L);
1✔
58

59
  private AtomicInteger compactionFileNumCost = new AtomicInteger(0);
1✔
60

61
  private int totalFileLimitForCrossTask = config.getTotalFileLimitForCrossTask();
1✔
62

63
  private ExecutorService flushTaskSubmitThreadPool =
1✔
64
      IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
1✔
65
  private double FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
1✔
66
  private double REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
1✔
67

68
  private volatile boolean isEncodingFasterThanIo = true;
1✔
69

70
  private SystemInfo() {
1✔
71
    allocateWriteMemory();
1✔
72
  }
1✔
73

74
  /**
75
   * Report current mem cost of database to system. Called when the memory of database newly
76
   * accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()
77
   *
78
   * @param dataRegionInfo database
79
   * @throws WriteProcessRejectException
80
   */
81
  public synchronized boolean reportStorageGroupStatus(
82
      DataRegionInfo dataRegionInfo, TsFileProcessor tsFileProcessor)
83
      throws WriteProcessRejectException {
84
    long currentDataRegionMemCost = dataRegionInfo.getMemCost();
1✔
85
    long delta =
1✔
86
        currentDataRegionMemCost - reportedStorageGroupMemCostMap.getOrDefault(dataRegionInfo, 0L);
1✔
87
    totalStorageGroupMemCost += delta;
1✔
88
    if (logger.isDebugEnabled()) {
1✔
89
      logger.debug(
×
90
          "Report database Status to the system. " + "After adding {}, current sg mem cost is {}.",
91
          delta,
×
92
          totalStorageGroupMemCost);
×
93
    }
94
    reportedStorageGroupMemCostMap.put(dataRegionInfo, currentDataRegionMemCost);
1✔
95
    dataRegionInfo.setLastReportedSize(currentDataRegionMemCost);
1✔
96
    if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
1✔
97
      return true;
1✔
98
    } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
×
99
        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
100
      logger.debug(
×
101
          "The total database mem costs are too large, call for flushing. "
102
              + "Current sg cost is {}",
103
          totalStorageGroupMemCost);
×
104
      chooseMemTablesToMarkFlush(tsFileProcessor);
×
105
      return true;
×
106
    } else {
107
      logger.info(
×
108
          "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}), REJECT_THERSHOLD ({})",
109
          dataRegionInfo.getDataRegion().getDatabaseName(),
×
110
          delta,
×
111
          totalStorageGroupMemCost,
×
112
          REJECT_THERSHOLD);
×
113
      rejected = true;
×
114
      if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
×
115
        if (totalStorageGroupMemCost < memorySizeForMemtable) {
×
116
          return true;
×
117
        } else {
118
          throw new WriteProcessRejectException(
×
119
              "Total database MemCost "
120
                  + totalStorageGroupMemCost
121
                  + " is over than memorySizeForWriting "
122
                  + memorySizeForMemtable);
123
        }
124
      } else {
125
        return false;
×
126
      }
127
    }
128
  }
129

130
  /**
131
   * Report resetting the mem cost of sg to system. It will be called after flushing, closing and
132
   * failed to insert
133
   *
134
   * @param dataRegionInfo database
135
   */
136
  public synchronized void resetStorageGroupStatus(DataRegionInfo dataRegionInfo) {
137
    long currentDataRegionMemCost = dataRegionInfo.getMemCost();
1✔
138
    long delta = 0;
1✔
139
    if (reportedStorageGroupMemCostMap.containsKey(dataRegionInfo)) {
1✔
140
      delta = reportedStorageGroupMemCostMap.get(dataRegionInfo) - currentDataRegionMemCost;
1✔
141
      this.totalStorageGroupMemCost -= delta;
1✔
142
      dataRegionInfo.setLastReportedSize(currentDataRegionMemCost);
1✔
143
      // report after reset sg status, because slow write may not reach the report threshold
144
      dataRegionInfo.setNeedToReportToSystem(true);
1✔
145
      reportedStorageGroupMemCostMap.put(dataRegionInfo, currentDataRegionMemCost);
1✔
146
    }
147

148
    if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
1✔
149
        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
150
      logger.debug(
×
151
          "SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.",
152
          dataRegionInfo.getDataRegion().getDatabaseName(),
×
153
          delta,
×
154
          totalStorageGroupMemCost);
×
155
      if (rejected) {
×
156
        logger.info(
×
157
            "SG ({}) released memory (delta: {}), set system to normal status (totalSgMemCost: {}).",
158
            dataRegionInfo.getDataRegion().getDatabaseName(),
×
159
            delta,
×
160
            totalStorageGroupMemCost);
×
161
      }
162
      logCurrentTotalSGMemory();
×
163
      rejected = false;
×
164
    } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
1✔
165
      logger.warn(
×
166
          "SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).",
167
          dataRegionInfo.getDataRegion().getDatabaseName(),
×
168
          delta,
×
169
          totalStorageGroupMemCost);
×
170
      logCurrentTotalSGMemory();
×
171
      rejected = true;
×
172
    } else {
173
      logger.debug(
1✔
174
          "SG ({}) released memory (delta: {}), system is in normal status (totalSgMemCost: {}).",
175
          dataRegionInfo.getDataRegion().getDatabaseName(),
1✔
176
          delta,
1✔
177
          totalStorageGroupMemCost);
1✔
178
      logCurrentTotalSGMemory();
1✔
179
      rejected = false;
1✔
180
    }
181
  }
1✔
182

183
  public synchronized void addFlushingMemTableCost(long flushingMemTableCost) {
184
    this.flushingMemTablesCost += flushingMemTableCost;
1✔
185
  }
1✔
186

187
  public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) {
188
    this.flushingMemTablesCost -= flushingMemTableCost;
1✔
189
  }
1✔
190

191
  public void addCompactionFileNum(int fileNum, long timeOutInSecond)
192
      throws InterruptedException, CompactionFileCountExceededException {
193
    if (fileNum > totalFileLimitForCrossTask) {
1✔
194
      // source file num is greater than the max file num for compaction
195
      throw new CompactionFileCountExceededException(
1✔
196
          String.format(
1✔
197
              "Required file num %d is greater than the max file num %d for compaction.",
198
              fileNum, totalFileLimitForCrossTask));
1✔
199
    }
200
    long startTime = System.currentTimeMillis();
1✔
201
    int originFileNum = this.compactionFileNumCost.get();
1✔
202
    while (originFileNum + fileNum > totalFileLimitForCrossTask
1✔
203
        || !compactionFileNumCost.compareAndSet(originFileNum, originFileNum + fileNum)) {
1✔
204
      if (System.currentTimeMillis() - startTime >= timeOutInSecond * 1000L) {
×
205
        throw new CompactionFileCountExceededException(
×
206
            String.format(
×
207
                "Failed to allocate %d files for compaction after %d seconds, max file num for compaction module is %d, %d files is used.",
208
                fileNum, timeOutInSecond, totalFileLimitForCrossTask, originFileNum));
×
209
      }
210
      Thread.sleep(100);
×
211
      originFileNum = this.compactionFileNumCost.get();
×
212
    }
213
  }
1✔
214

215
  public void addCompactionMemoryCost(long memoryCost, long timeOutInSecond)
216
      throws InterruptedException, CompactionMemoryNotEnoughException {
217
    if (!config.isEnableCompactionMemControl()) {
1✔
218
      return;
×
219
    }
220
    if (memoryCost > memorySizeForCompaction) {
1✔
221
      // required memory cost is greater than the total memory budget for compaction
222
      throw new CompactionMemoryNotEnoughException(
1✔
223
          String.format(
1✔
224
              "Required memory cost %d bytes is greater than "
225
                  + "the total memory budget for compaction %d bytes",
226
              memoryCost, memorySizeForCompaction));
1✔
227
    }
228
    long startTime = System.currentTimeMillis();
1✔
229
    long originSize = this.compactionMemoryCost.get();
1✔
230
    while (originSize + memoryCost > memorySizeForCompaction
1✔
231
        || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) {
1✔
232
      if (System.currentTimeMillis() - startTime >= timeOutInSecond * 1000L) {
×
233
        throw new CompactionMemoryNotEnoughException(
×
234
            String.format(
×
235
                "Failed to allocate %d bytes memory for compaction after %d seconds, "
236
                    + "total memory budget for compaction module is %d bytes, %d bytes is used",
237
                memoryCost, timeOutInSecond, memorySizeForCompaction, originSize));
×
238
      }
239
      Thread.sleep(100);
×
240
      originSize = this.compactionMemoryCost.get();
×
241
    }
242
  }
1✔
243

244
  public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) {
245
    if (!config.isEnableCompactionMemControl()) {
1✔
246
      return;
×
247
    }
248
    this.compactionMemoryCost.addAndGet(-compactionMemoryCost);
1✔
249
  }
1✔
250

251
  public synchronized void decreaseCompactionFileNumCost(int fileNum) {
252
    this.compactionFileNumCost.addAndGet(-fileNum);
1✔
253
  }
1✔
254

255
  public long getMemorySizeForCompaction() {
256
    if (config.isEnableMemControl()) {
1✔
257
      return memorySizeForCompaction;
1✔
258
    } else {
259
      return Long.MAX_VALUE;
×
260
    }
261
  }
262

263
  public void allocateWriteMemory() {
264
    memorySizeForMemtable =
1✔
265
        (long)
266
            (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable());
1✔
267
    memorySizeForCompaction =
1✔
268
        (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion());
1✔
269
    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
1✔
270
    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
1✔
271
  }
1✔
272

273
  @TestOnly
274
  public void setMemorySizeForCompaction(long size) {
275
    memorySizeForCompaction = size;
1✔
276
  }
1✔
277

278
  @TestOnly
279
  public void setTotalFileLimitForCrossTask(int totalFileLimitForCrossTask) {
280
    this.totalFileLimitForCrossTask = totalFileLimitForCrossTask;
1✔
281
  }
1✔
282

283
  @TestOnly
284
  public int getTotalFileLimitForCrossTask() {
285
    return totalFileLimitForCrossTask;
1✔
286
  }
287

288
  @TestOnly
289
  public AtomicLong getCompactionMemoryCost() {
290
    return compactionMemoryCost;
1✔
291
  }
292

293
  @TestOnly
294
  public AtomicInteger getCompactionFileNumCost() {
295
    return compactionFileNumCost;
1✔
296
  }
297

298
  private void logCurrentTotalSGMemory() {
299
    logger.debug("Current Sg cost is {}", totalStorageGroupMemCost);
1✔
300
  }
1✔
301

302
  /**
303
   * Order all working memtables in system by memory cost of actual data points in memtable. Mark
304
   * the top K TSPs as to be flushed, so that after flushing the K TSPs, the memory cost should be
305
   * less than FLUSH_THRESHOLD
306
   */
307
  private boolean chooseMemTablesToMarkFlush(TsFileProcessor currentTsFileProcessor) {
308
    // If invoke flush by replaying logs, do not flush now!
309
    if (reportedStorageGroupMemCostMap.size() == 0) {
×
310
      return false;
×
311
    }
312
    PriorityQueue<TsFileProcessor> allTsFileProcessors =
×
313
        new PriorityQueue<>(
314
            (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost()));
×
315
    for (DataRegionInfo dataRegionInfo : reportedStorageGroupMemCostMap.keySet()) {
×
316
      allTsFileProcessors.addAll(dataRegionInfo.getAllReportedTsp());
×
317
    }
×
318
    boolean isCurrentTsFileProcessorSelected = false;
×
319
    long memCost = 0;
×
320
    long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost;
×
321
    while (activeMemSize - memCost > FLUSH_THERSHOLD) {
×
322
      if (allTsFileProcessors.isEmpty()
×
323
          || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
×
324
        return false;
×
325
      }
326
      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
×
327
      memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
×
328
      selectedTsFileProcessor.setWorkMemTableShouldFlush();
×
329
      flushTaskSubmitThreadPool.submit(
×
330
          () -> {
331
            selectedTsFileProcessor.submitAFlushTask();
×
332
          });
×
333
      if (selectedTsFileProcessor == currentTsFileProcessor) {
×
334
        isCurrentTsFileProcessorSelected = true;
×
335
      }
336
      allTsFileProcessors.poll();
×
337
    }
×
338
    return isCurrentTsFileProcessorSelected;
×
339
  }
340

341
  public boolean isRejected() {
342
    return rejected;
1✔
343
  }
344

345
  public void setEncodingFasterThanIo(boolean isEncodingFasterThanIo) {
346
    this.isEncodingFasterThanIo = isEncodingFasterThanIo;
1✔
347
  }
1✔
348

349
  public boolean isEncodingFasterThanIo() {
350
    return isEncodingFasterThanIo;
1✔
351
  }
352

353
  public void close() {
354
    reportedStorageGroupMemCostMap.clear();
1✔
355
    totalStorageGroupMemCost = 0;
1✔
356
    rejected = false;
1✔
357
  }
1✔
358

359
  public static SystemInfo getInstance() {
360
    return InstanceHolder.instance;
1✔
361
  }
362

363
  private static class InstanceHolder {
364

365
    private InstanceHolder() {}
366

367
    private static SystemInfo instance = new SystemInfo();
1✔
368
  }
369

370
  public synchronized void applyTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
371
    memorySizeForMemtable -= estimatedTemporaryMemSize;
1✔
372
    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
1✔
373
    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
1✔
374
  }
1✔
375

376
  public synchronized void releaseTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) {
377
    memorySizeForMemtable += estimatedTemporaryMemSize;
1✔
378
    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
1✔
379
    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
1✔
380
  }
1✔
381

382
  public long getTotalMemTableSize() {
383
    return totalStorageGroupMemCost;
1✔
384
  }
385

386
  public double getFlushThershold() {
387
    return FLUSH_THERSHOLD;
1✔
388
  }
389

390
  public double getRejectThershold() {
391
    return REJECT_THERSHOLD;
1✔
392
  }
393

394
  public int flushingMemTableNum() {
395
    return FlushManager.getInstance().getNumberOfWorkingTasks();
×
396
  }
397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc