• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9684

pending completion
#9684

push

travis_ci

web-flow
Fix potential memory leak in MemoryPool

16 of 16 new or added lines in 3 files covered. (100.0%)

79205 of 164952 relevant lines covered (48.02%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.51
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.queryengine.execution.memory;
21

22
import org.apache.iotdb.commons.utils.TestOnly;
23
import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
24
import org.apache.iotdb.tsfile.utils.Pair;
25

26
import com.google.common.util.concurrent.AbstractFuture;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import org.apache.commons.lang3.Validate;
30
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
32

33
import javax.annotation.Nullable;
34

35
import java.util.Iterator;
36
import java.util.Map;
37
import java.util.Queue;
38
import java.util.concurrent.ConcurrentHashMap;
39
import java.util.concurrent.ConcurrentLinkedQueue;
40
import java.util.concurrent.atomic.AtomicLong;
41

42
/** A thread-safe memory pool. */
43
public class MemoryPool {
44

45
  private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPool.class);
1✔
46

47
  public static class MemoryReservationFuture<V> extends AbstractFuture<V> {
48

49
    private final String queryId;
50
    private final String fragmentInstanceId;
51
    private final String planNodeId;
52
    private final long bytesToReserve;
53
    /**
54
     * MemoryReservationFuture is created when SinkHandle or SourceHandle tries to reserve memory
55
     * from pool. This field is max Bytes that SinkHandle or SourceHandle can reserve.
56
     */
57
    private final long maxBytesCanReserve;
58

59
    private MemoryReservationFuture(
60
        String queryId,
61
        String fragmentInstanceId,
62
        String planNodeId,
63
        long bytesToReserve,
64
        long maxBytesCanReserve) {
1✔
65
      this.queryId = Validate.notNull(queryId, "queryId cannot be null");
1✔
66
      this.fragmentInstanceId =
1✔
67
          Validate.notNull(fragmentInstanceId, "fragmentInstanceId cannot be null");
1✔
68
      this.planNodeId = Validate.notNull(planNodeId, "planNodeId cannot be null");
1✔
69
      Validate.isTrue(bytesToReserve > 0L, "bytesToReserve should be greater than zero.");
1✔
70
      Validate.isTrue(maxBytesCanReserve > 0L, "maxBytesCanReserve should be greater than zero.");
1✔
71
      this.bytesToReserve = bytesToReserve;
1✔
72
      this.maxBytesCanReserve = maxBytesCanReserve;
1✔
73
    }
1✔
74

75
    public String getQueryId() {
76
      return queryId;
1✔
77
    }
78

79
    public String getFragmentInstanceId() {
80
      return fragmentInstanceId;
1✔
81
    }
82

83
    public String getPlanNodeId() {
84
      return planNodeId;
1✔
85
    }
86

87
    public long getBytesToReserve() {
88
      return bytesToReserve;
1✔
89
    }
90

91
    public long getMaxBytesCanReserve() {
92
      return maxBytesCanReserve;
1✔
93
    }
94

95
    public static <V> MemoryReservationFuture<V> create(
96
        String queryId,
97
        String fragmentInstanceId,
98
        String planNodeId,
99
        long bytesToReserve,
100
        long maxBytesCanReserve) {
101
      return new MemoryReservationFuture<>(
1✔
102
          queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
103
    }
104

105
    @Override
106
    public boolean set(@Nullable V value) {
107
      return super.set(value);
1✔
108
    }
109
  }
110

111
  private final String id;
112
  private final long maxBytes;
113
  private final long maxBytesPerFragmentInstance;
114

115
  private final AtomicLong remainingBytes;
116
  /** queryId -> fragmentInstanceId -> planNodeId -> bytesReserved. */
117
  private final Map<String, Map<String, Map<String, Long>>> queryMemoryReservations =
1✔
118
      new ConcurrentHashMap<>();
119

120
  private final Queue<MemoryReservationFuture<Void>> memoryReservationFutures =
1✔
121
      new ConcurrentLinkedQueue<>();
122

123
  public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
1✔
124
    this.id = Validate.notNull(id);
1✔
125
    Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
1✔
126
    this.maxBytes = maxBytes;
1✔
127
    Validate.isTrue(
1✔
128
        maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
129
        "max bytes per FI should be in (0,maxBytes]. maxBytesPerFI: %d, maxBytes: %d",
130
        maxBytesPerFragmentInstance,
1✔
131
        maxBytes);
1✔
132
    this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
1✔
133
    this.remainingBytes = new AtomicLong(maxBytes);
1✔
134
  }
1✔
135

136
  public String getId() {
137
    return id;
×
138
  }
139

140
  public long getMaxBytes() {
141
    return maxBytes;
×
142
  }
143

144
  public long getRemainingBytes() {
145
    return remainingBytes.get();
×
146
  }
147

148
  public int getQueryMemoryReservationSize() {
149
    return queryMemoryReservations.size();
×
150
  }
151

152
  public int getMemoryReservationSize() {
153
    return memoryReservationFutures.size();
×
154
  }
155

156
  /**
157
   * Before executing, we register memory map which is related to queryId, fragmentInstanceId, and
158
   * planNodeId to queryMemoryReservationsMap first.
159
   */
160
  public void registerPlanNodeIdToQueryMemoryMap(
161
      String queryId, String fragmentInstanceId, String planNodeId) {
162
    synchronized (queryMemoryReservations) {
1✔
163
      queryMemoryReservations
1✔
164
          .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>())
1✔
165
          .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>())
1✔
166
          .putIfAbsent(planNodeId, 0L);
1✔
167
    }
1✔
168
  }
1✔
169

170
  /**
171
   * If all fragmentInstanceIds related to one queryId have been registered, when the last fragment
172
   * instance is deregister, the queryId can be cleared.
173
   *
174
   * <p>If some fragmentInstanceIds have not been registered when queryId is cleared, they will
175
   * register queryId again with lock, so there is no concurrency problem.
176
   *
177
   * @throws MemoryLeakException throw {@link MemoryLeakException}
178
   */
179
  public void deRegisterFragmentInstanceFromQueryMemoryMap(
180
      String queryId, String fragmentInstanceId) {
181
    Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
×
182
    if (queryRelatedMemory != null) {
×
183
      Map<String, Long> fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId);
×
184
      boolean hasPotentialMemoryLeak = false;
×
185
      // fragmentRelatedMemory could be null if the FI has not reserved any memory(For example,
186
      // next() of root operator returns no data)
187
      if (fragmentRelatedMemory != null) {
×
188
        hasPotentialMemoryLeak =
×
189
            fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0);
×
190
      }
191
      synchronized (queryMemoryReservations) {
×
192
        queryRelatedMemory.remove(fragmentInstanceId);
×
193
        if (queryRelatedMemory.isEmpty()) {
×
194
          queryMemoryReservations.remove(queryId);
×
195
        }
196
      }
×
197
      if (hasPotentialMemoryLeak) {
×
198
        throw new MemoryLeakException(
×
199
            "PlanNode related memory is not zero when trying to deregister FI from query memory pool. QueryId is : {}, FragmentInstanceId is : {}, PlanNode related memory is : {}.");
200
      }
201
    }
202
  }
×
203

204
  /**
205
   * Reserve memory with bytesToReserve.
206
   *
207
   * @return if reserve succeed, pair.right will be true, otherwise false
208
   * @throws IllegalArgumentException throw exception if current query requests more memory than can
209
   *     be allocated.
210
   */
211
  public Pair<ListenableFuture<Void>, Boolean> reserve(
212
      String queryId,
213
      String fragmentInstanceId,
214
      String planNodeId,
215
      long bytesToReserve,
216
      long maxBytesCanReserve) {
217
    Validate.notNull(queryId);
1✔
218
    Validate.notNull(fragmentInstanceId);
1✔
219
    Validate.notNull(planNodeId);
1✔
220
    Validate.isTrue(
1✔
221
        bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
222
        "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
223
        bytesToReserve);
224
    if (bytesToReserve > maxBytesCanReserve) {
1✔
225
      LOGGER.warn(
×
226
          "Cannot reserve {}(Max: {}) bytes memory from MemoryPool for planNodeId{}",
227
          bytesToReserve,
×
228
          maxBytesCanReserve,
×
229
          planNodeId);
230
      throw new IllegalArgumentException(
×
231
          "Query is aborted since it requests more memory than can be allocated.");
232
    }
233

234
    ListenableFuture<Void> result;
235
    if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) {
1✔
236
      result = Futures.immediateFuture(null);
1✔
237
      return new Pair<>(result, Boolean.TRUE);
1✔
238
    } else {
239
      LOGGER.debug(
1✔
240
          "Blocked reserve request: {} bytes memory for planNodeId{}", bytesToReserve, planNodeId);
1✔
241
      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
1✔
242
      result =
1✔
243
          MemoryReservationFuture.create(
1✔
244
              queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
245
      memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
1✔
246
      return new Pair<>(result, Boolean.FALSE);
1✔
247
    }
248
  }
249

250
  @TestOnly
251
  public boolean tryReserveForTest(
252
      String queryId,
253
      String fragmentInstanceId,
254
      String planNodeId,
255
      long bytesToReserve,
256
      long maxBytesCanReserve) {
257
    Validate.notNull(queryId);
1✔
258
    Validate.notNull(fragmentInstanceId);
1✔
259
    Validate.notNull(planNodeId);
1✔
260
    Validate.isTrue(
1✔
261
        bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
262
        "bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
263
        bytesToReserve);
264

265
    if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) {
1✔
266
      return true;
1✔
267
    } else {
268
      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
1✔
269
      return false;
1✔
270
    }
271
  }
272

273
  /**
274
   * Cancel the specified memory reservation. If the reservation has finished, do nothing.
275
   *
276
   * @param future The future returned from {@link #reserve(String, String, String, long, long)}
277
   * @return If the future has not complete, return the number of bytes being reserved. Otherwise,
278
   *     return 0.
279
   */
280
  @SuppressWarnings("squid:S2445")
281
  public synchronized long tryCancel(ListenableFuture<Void> future) {
282
    // add synchronized on the future to avoid that the future is concurrently completed by
283
    // MemoryPool.free() which may lead to memory leak.
284
    synchronized (future) {
1✔
285
      Validate.notNull(future);
1✔
286
      // If the future is not a MemoryReservationFuture, it must have been completed.
287
      if (future.isDone()) {
1✔
288
        return 0L;
1✔
289
      }
290
      Validate.isTrue(
1✔
291
          future instanceof MemoryReservationFuture,
292
          "invalid future type " + future.getClass().getSimpleName());
1✔
293
      future.cancel(true);
1✔
294
    }
1✔
295
    return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
1✔
296
  }
297

298
  public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
299
    Validate.notNull(queryId);
1✔
300
    Validate.isTrue(bytes > 0L);
1✔
301

302
    try {
303
      queryMemoryReservations
1✔
304
          .get(queryId)
1✔
305
          .get(fragmentInstanceId)
1✔
306
          .computeIfPresent(
1✔
307
              planNodeId,
308
              (k, reservedMemory) -> {
309
                if (reservedMemory < bytes) {
1✔
310
                  throw new IllegalArgumentException("Free more memory than has been reserved.");
1✔
311
                }
312
                return reservedMemory - bytes;
1✔
313
              });
314
    } catch (NullPointerException e) {
×
315
      throw new IllegalArgumentException("RelatedMemoryReserved can't be null when freeing memory");
×
316
    }
1✔
317

318
    remainingBytes.addAndGet(bytes);
1✔
319

320
    if (memoryReservationFutures.isEmpty()) {
1✔
321
      return;
1✔
322
    }
323
    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
1✔
324
    while (iterator.hasNext()) {
1✔
325
      MemoryReservationFuture<Void> future = iterator.next();
1✔
326
      synchronized (future) {
1✔
327
        if (future.isCancelled() || future.isDone()) {
1✔
328
          continue;
1✔
329
        }
330
        long bytesToReserve = future.getBytesToReserve();
1✔
331
        String curQueryId = future.getQueryId();
1✔
332
        String curFragmentInstanceId = future.getFragmentInstanceId();
1✔
333
        String curPlanNodeId = future.getPlanNodeId();
1✔
334
        long maxBytesCanReserve = future.getMaxBytesCanReserve();
1✔
335
        if (tryReserve(
1✔
336
            curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve, maxBytesCanReserve)) {
337
          future.set(null);
1✔
338
          iterator.remove();
1✔
339
        } else {
340
          rollbackReserve(curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve);
1✔
341
        }
342
      }
1✔
343
    }
1✔
344
  }
1✔
345

346
  public long getQueryMemoryReservedBytes(String queryId) {
347
    if (!queryMemoryReservations.containsKey(queryId)) {
1✔
348
      return 0L;
×
349
    }
350
    long sum = 0;
1✔
351
    for (Map<String, Long> map : queryMemoryReservations.get(queryId).values()) {
1✔
352
      sum = sum + map.values().stream().reduce(0L, Long::sum);
1✔
353
    }
1✔
354
    return sum;
1✔
355
  }
356

357
  public long getReservedBytes() {
358
    return maxBytes - remainingBytes.get();
1✔
359
  }
360

361
  public boolean tryReserve(
362
      String queryId,
363
      String fragmentInstanceId,
364
      String planNodeId,
365
      long bytesToReserve,
366
      long maxBytesCanReserve) {
367
    long tryRemainingBytes = remainingBytes.addAndGet(-bytesToReserve);
1✔
368
    long queryRemainingBytes =
1✔
369
        maxBytesCanReserve
370
            - queryMemoryReservations
371
                .get(queryId)
1✔
372
                .get(fragmentInstanceId)
1✔
373
                .merge(planNodeId, bytesToReserve, Long::sum);
1✔
374
    return tryRemainingBytes >= 0 && queryRemainingBytes >= 0;
1✔
375
  }
376

377
  private void rollbackReserve(
378
      String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve) {
379
    queryMemoryReservations
1✔
380
        .get(queryId)
1✔
381
        .get(fragmentInstanceId)
1✔
382
        .merge(planNodeId, -bytesToReserve, Long::sum);
1✔
383
    remainingBytes.addAndGet(bytesToReserve);
1✔
384
  }
1✔
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc