• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9782

pending completion
#9782

push

travis_ci

web-flow
[To rel/1.2] Enhance cn_confignode_target_list comments (#10813)

Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>

79675 of 165692 relevant lines covered (48.09%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.81
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.queryengine.execution.memory;
21

22
import org.apache.iotdb.commons.utils.TestOnly;
23
import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
24
import org.apache.iotdb.tsfile.utils.Pair;
25

26
import com.google.common.util.concurrent.AbstractFuture;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import org.apache.commons.lang3.Validate;
30
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
32

33
import javax.annotation.Nullable;
34

35
import java.util.Iterator;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.Queue;
39
import java.util.concurrent.ConcurrentHashMap;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41
import java.util.concurrent.atomic.AtomicLong;
42
import java.util.stream.Collectors;
43

44
/** A thread-safe memory pool. */
45
public class MemoryPool {
46

47
  private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPool.class);
1✔
48

49
  public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
50

51
    private final String queryId;
52
    private final String fragmentInstanceId;
53
    private final String planNodeId;
54
    private final long bytesToReserve;
55
    /**
56
     * MemoryReservationFuture is created when SinkHandle or SourceHandle tries to reserve memory
57
     * from pool. This field is max Bytes that SinkHandle or SourceHandle can reserve.
58
     */
59
    private final long maxBytesCanReserve;
60

61
    private MemoryReservationFuture(
62
        String queryId,
63
        String fragmentInstanceId,
64
        String planNodeId,
65
        long bytesToReserve,
66
        long maxBytesCanReserve) {
1✔
67
      this.queryId = Validate.notNull(queryId, "queryId cannot be null");
1✔
68
      this.fragmentInstanceId =
1✔
69
          Validate.notNull(fragmentInstanceId, "fragmentInstanceId cannot be null");
1✔
70
      this.planNodeId = Validate.notNull(planNodeId, "planNodeId cannot be null");
1✔
71
      Validate.isTrue(bytesToReserve > 0L, "bytesToReserve should be greater than zero.");
1✔
72
      Validate.isTrue(maxBytesCanReserve > 0L, "maxBytesCanReserve should be greater than zero.");
1✔
73
      this.bytesToReserve = bytesToReserve;
1✔
74
      this.maxBytesCanReserve = maxBytesCanReserve;
1✔
75
    }
1✔
76

77
    public String getQueryId() {
78
      return queryId;
1✔
79
    }
80

81
    public String getFragmentInstanceId() {
82
      return fragmentInstanceId;
1✔
83
    }
84

85
    public String getPlanNodeId() {
86
      return planNodeId;
1✔
87
    }
88

89
    public long getBytesToReserve() {
90
      return bytesToReserve;
1✔
91
    }
92

93
    public long getMaxBytesCanReserve() {
94
      return maxBytesCanReserve;
1✔
95
    }
96

97
    public static <V> MemoryReservationFuture<V> create(
98
        String queryId,
99
        String fragmentInstanceId,
100
        String planNodeId,
101
        long bytesToReserve,
102
        long maxBytesCanReserve) {
103
      return new MemoryReservationFuture<>(
1✔
104
          queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
105
    }
106

107
    @Override
108
    public boolean set(@Nullable V value) {
109
      return super.set(value);
1✔
110
    }
111
  }
112

113
  private final String id;
114
  private final long maxBytes;
115
  private final long maxBytesPerFragmentInstance;
116

117
  private final AtomicLong remainingBytes;
118
  /** queryId -> fragmentInstanceId -> planNodeId -> bytesReserved. */
119
  private final Map<String, Map<String, Map<String, Long>>> queryMemoryReservations =
1✔
120
      new ConcurrentHashMap<>();
121

122
  private final Queue<MemoryReservationFuture<Void>> memoryReservationFutures =
1✔
123
      new ConcurrentLinkedQueue<>();
124

125
  public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
1✔
126
    this.id = Validate.notNull(id);
1✔
127
    Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
1✔
128
    this.maxBytes = maxBytes;
1✔
129
    Validate.isTrue(
1✔
130
        maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
131
        "max bytes per FI should be in (0,maxBytes]. maxBytesPerFI: %d, maxBytes: %d",
132
        maxBytesPerFragmentInstance,
1✔
133
        maxBytes);
1✔
134
    this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
1✔
135
    this.remainingBytes = new AtomicLong(maxBytes);
1✔
136
  }
1✔
137

138
  public String getId() {
139
    return id;
×
140
  }
141

142
  public long getMaxBytes() {
143
    return maxBytes;
×
144
  }
145

146
  public long getRemainingBytes() {
147
    return remainingBytes.get();
×
148
  }
149

150
  public int getQueryMemoryReservationSize() {
151
    return queryMemoryReservations.size();
×
152
  }
153

154
  public int getMemoryReservationSize() {
155
    return memoryReservationFutures.size();
×
156
  }
157

158
  /**
159
   * Before executing, we register memory map which is related to queryId, fragmentInstanceId, and
160
   * planNodeId to queryMemoryReservationsMap first.
161
   */
162
  public void registerPlanNodeIdToQueryMemoryMap(
163
      String queryId, String fragmentInstanceId, String planNodeId) {
164
    synchronized (queryMemoryReservations) {
1✔
165
      queryMemoryReservations
1✔
166
          .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
1✔
167
          .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
1✔
168
          .putIfAbsent(planNodeId, 0L);
1✔
169
    }
1✔
170
  }
1✔
171

172
  /**
173
   * If all fragmentInstanceIds related to one queryId have been registered, when the last fragment
174
   * instance is deregister, the queryId can be cleared.
175
   *
176
   * <p>If some fragmentInstanceIds have not been registered when queryId is cleared, they will
177
   * register queryId again with lock, so there is no concurrency problem.
178
   *
179
   * @throws MemoryLeakException throw {@link MemoryLeakException}
180
   */
181
  public void deRegisterFragmentInstanceFromQueryMemoryMap(
182
      String queryId, String fragmentInstanceId) {
183
    Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
×
184
    if (queryRelatedMemory != null) {
×
185
      Map<String, Long> fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId);
×
186
      boolean hasPotentialMemoryLeak = false;
×
187
      // fragmentRelatedMemory could be null if the FI has not reserved any memory(For example,
188
      // next() of root operator returns no data)
189
      if (fragmentRelatedMemory != null) {
×
190
        hasPotentialMemoryLeak =
×
191
            fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0L);
×
192
      }
193
      synchronized (queryMemoryReservations) {
×
194
        queryRelatedMemory.remove(fragmentInstanceId);
×
195
        if (queryRelatedMemory.isEmpty()) {
×
196
          queryMemoryReservations.remove(queryId);
×
197
        }
198
      }
×
199
      if (hasPotentialMemoryLeak) {
×
200
        // hasPotentialMemoryLeak means that fragmentRelatedMemory is not null
201
        List<Map.Entry<String, Long>> invalidEntryList =
×
202
            fragmentRelatedMemory.entrySet().stream()
×
203
                .filter(entry -> entry.getValue() != 0L)
×
204
                .collect(Collectors.toList());
×
205
        throw new MemoryLeakException(
×
206
            String.format(
×
207
                "PlanNode related memory is not zero when trying to deregister FI from query memory pool. QueryId is : %s, FragmentInstanceId is : %s, Non-zero PlanNode related memory is : %s.",
208
                queryId, fragmentInstanceId, invalidEntryList));
209
      }
210
    }
211
  }
×
212

213
  /**
214
   * Reserve memory with bytesToReserve.
215
   *
216
   * @return if reserve succeed, pair.right will be true, otherwise false
217
   * @throws IllegalArgumentException throw exception if current query requests more memory than can
218
   *     be allocated.
219
   */
220
  public Pair<ListenableFuture<Void>, Boolean> reserve(
221
      String queryId,
222
      String fragmentInstanceId,
223
      String planNodeId,
224
      long bytesToReserve,
225
      long maxBytesCanReserve) {
226
    Validate.notNull(queryId);
1✔
227
    Validate.notNull(fragmentInstanceId);
1✔
228
    Validate.notNull(planNodeId);
1✔
229
    Validate.isTrue(
1✔
230
        bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
231
        "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
232
        bytesToReserve);
233
    if (bytesToReserve > maxBytesCanReserve) {
1✔
234
      LOGGER.warn(
×
235
          "Cannot reserve {}(Max: {}) bytes memory from MemoryPool for planNodeId{}",
236
          bytesToReserve,
×
237
          maxBytesCanReserve,
×
238
          planNodeId);
239
      throw new IllegalArgumentException(
×
240
          "Query is aborted since it requests more memory than can be allocated.");
241
    }
242

243
    ListenableFuture<Void> result;
244
    if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) {
1✔
245
      result = Futures.immediateFuture(null);
1✔
246
      return new Pair<>(result, Boolean.TRUE);
1✔
247
    } else {
248
      LOGGER.debug(
1✔
249
          "Blocked reserve request: {} bytes memory for planNodeId{}", bytesToReserve, planNodeId);
1✔
250
      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
1✔
251
      result =
1✔
252
          MemoryReservationFuture.create(
1✔
253
              queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
254
      memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
1✔
255
      return new Pair<>(result, Boolean.FALSE);
1✔
256
    }
257
  }
258

259
  @TestOnly
260
  public boolean tryReserveForTest(
261
      String queryId,
262
      String fragmentInstanceId,
263
      String planNodeId,
264
      long bytesToReserve,
265
      long maxBytesCanReserve) {
266
    Validate.notNull(queryId);
1✔
267
    Validate.notNull(fragmentInstanceId);
1✔
268
    Validate.notNull(planNodeId);
1✔
269
    Validate.isTrue(
1✔
270
        bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
271
        "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
272
        bytesToReserve);
273

274
    if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) {
1✔
275
      return true;
1✔
276
    } else {
277
      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
1✔
278
      return false;
1✔
279
    }
280
  }
281

282
  /**
283
   * Cancel the specified memory reservation. If the reservation has finished, do nothing.
284
   *
285
   * @param future The future returned from {@link #reserve(String, String, String, long, long)}
286
   * @return If the future has not complete, return the number of bytes being reserved. Otherwise,
287
   *     return 0.
288
   */
289
  @SuppressWarnings("squid:S2445")
290
  public synchronized long tryCancel(ListenableFuture<Void> future) {
291
    // add synchronized on the future to avoid that the future is concurrently completed by
292
    // MemoryPool.free() which may lead to memory leak.
293
    synchronized (future) {
1✔
294
      Validate.notNull(future);
1✔
295
      // If the future is not a MemoryReservationFuture, it must have been completed.
296
      if (future.isDone()) {
1✔
297
        return 0L;
1✔
298
      }
299
      Validate.isTrue(
1✔
300
          future instanceof MemoryReservationFuture,
301
          "invalid future type " + future.getClass().getSimpleName());
1✔
302
      future.cancel(true);
1✔
303
    }
1✔
304
    return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
1✔
305
  }
306

307
  public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
308
    Validate.notNull(queryId);
1✔
309
    Validate.isTrue(bytes > 0L);
1✔
310

311
    try {
312
      queryMemoryReservations
1✔
313
          .get(queryId)
1✔
314
          .get(fragmentInstanceId)
1✔
315
          .computeIfPresent(
1✔
316
              planNodeId,
317
              (k, reservedMemory) -> {
318
                if (reservedMemory < bytes) {
1✔
319
                  throw new IllegalArgumentException("Free more memory than has been reserved.");
1✔
320
                }
321
                return reservedMemory - bytes;
1✔
322
              });
323
    } catch (NullPointerException e) {
×
324
      throw new IllegalArgumentException("RelatedMemoryReserved can't be null when freeing memory");
×
325
    }
1✔
326

327
    remainingBytes.addAndGet(bytes);
1✔
328

329
    if (memoryReservationFutures.isEmpty()) {
1✔
330
      return;
1✔
331
    }
332
    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
1✔
333
    while (iterator.hasNext()) {
1✔
334
      MemoryReservationFuture<Void> future = iterator.next();
1✔
335
      synchronized (future) {
1✔
336
        if (future.isCancelled() || future.isDone()) {
1✔
337
          continue;
1✔
338
        }
339
        long bytesToReserve = future.getBytesToReserve();
1✔
340
        String curQueryId = future.getQueryId();
1✔
341
        String curFragmentInstanceId = future.getFragmentInstanceId();
1✔
342
        String curPlanNodeId = future.getPlanNodeId();
1✔
343
        long maxBytesCanReserve = future.getMaxBytesCanReserve();
1✔
344
        if (tryReserve(
1✔
345
            curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve, maxBytesCanReserve)) {
346
          future.set(null);
1✔
347
          iterator.remove();
1✔
348
        } else {
349
          rollbackReserve(curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve);
1✔
350
        }
351
      }
1✔
352
    }
1✔
353
  }
1✔
354

355
  public long getQueryMemoryReservedBytes(String queryId) {
356
    if (!queryMemoryReservations.containsKey(queryId)) {
1✔
357
      return 0L;
×
358
    }
359
    long sum = 0;
1✔
360
    for (Map<String, Long> map : queryMemoryReservations.get(queryId).values()) {
1✔
361
      sum = sum + map.values().stream().reduce(0L, Long::sum);
1✔
362
    }
1✔
363
    return sum;
1✔
364
  }
365

366
  public long getReservedBytes() {
367
    return maxBytes - remainingBytes.get();
1✔
368
  }
369

370
  public boolean tryReserve(
371
      String queryId,
372
      String fragmentInstanceId,
373
      String planNodeId,
374
      long bytesToReserve,
375
      long maxBytesCanReserve) {
376
    long tryRemainingBytes = remainingBytes.addAndGet(-bytesToReserve);
1✔
377
    long queryRemainingBytes =
1✔
378
        maxBytesCanReserve
379
            - queryMemoryReservations
380
                .get(queryId)
1✔
381
                .get(fragmentInstanceId)
1✔
382
                .merge(planNodeId, bytesToReserve, Long::sum);
1✔
383
    return tryRemainingBytes >= 0 && queryRemainingBytes >= 0;
1✔
384
  }
385

386
  private void rollbackReserve(
387
      String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve) {
388
    queryMemoryReservations
1✔
389
        .get(queryId)
1✔
390
        .get(fragmentInstanceId)
1✔
391
        .merge(planNodeId, -bytesToReserve, Long::sum);
1✔
392
    remainingBytes.addAndGet(bytesToReserve);
1✔
393
  }
1✔
394
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc