• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9770

pending completion
#9770

push

travis_ci

web-flow
[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)

Support tsfile cascade transport. For example, there are 3 iotdb clusters A, B and C. Now we can use pipe to transport tsfile from A to C (via B, A -> B -> C).

(cherry picked from commit b3a4bdf81)

5 of 5 new or added lines in 4 files covered. (100.0%)

79456 of 165675 relevant lines covered (47.96%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.53
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.resource.wal;
21

22
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23
import org.apache.iotdb.commons.concurrent.ThreadName;
24
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
25
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
26

27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
import java.io.IOException;
31
import java.util.Iterator;
32
import java.util.Map;
33
import java.util.concurrent.ConcurrentHashMap;
34
import java.util.concurrent.ScheduledExecutorService;
35
import java.util.concurrent.TimeUnit;
36
import java.util.concurrent.locks.ReentrantLock;
37

38
public abstract class PipeWALResourceManager {
39

40
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResourceManager.class);
1✔
41

42
  protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
43

44
  private static final int SEGMENT_LOCK_COUNT = 32;
45
  private final ReentrantLock[] memtableIdSegmentLocks;
46

47
  private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
1✔
48
      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
49
          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
1✔
50

51
  protected PipeWALResourceManager() {
1✔
52
    // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple threads
53
    memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
1✔
54

55
    memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
1✔
56
    for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
1✔
57
      memtableIdSegmentLocks[i] = new ReentrantLock();
1✔
58
    }
59

60
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
61
        PIPE_WAL_RESOURCE_TTL_CHECKER,
62
        () -> {
63
          final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
×
64
              memtableIdToPipeWALResourceMap.entrySet().iterator();
×
65
          while (iterator.hasNext()) {
×
66
            final Map.Entry<Long, PipeWALResource> entry = iterator.next();
×
67
            final ReentrantLock lock =
×
68
                memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)];
×
69

70
            lock.lock();
×
71
            try {
72
              if (entry.getValue().invalidateIfPossible()) {
×
73
                iterator.remove();
×
74
              } else {
75
                LOGGER.info(
×
76
                    "WAL (memtableId {}) is still referenced {} times",
77
                    entry.getKey(),
×
78
                    entry.getValue().getReferenceCount());
×
79
              }
80
            } finally {
81
              lock.unlock();
×
82
            }
83
          }
×
84
        },
×
85
        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
86
        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
87
        TimeUnit.MILLISECONDS);
88
  }
1✔
89

90
  public final void pin(final WALEntryHandler walEntryHandler) throws IOException {
91
    final long memtableId = walEntryHandler.getMemTableId();
1✔
92
    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
1✔
93

94
    lock.lock();
1✔
95
    try {
96
      pinInternal(memtableId, walEntryHandler);
1✔
97
    } finally {
98
      lock.unlock();
1✔
99
    }
100
  }
1✔
101

102
  protected abstract void pinInternal(long memtableId, WALEntryHandler walEntryHandler)
103
      throws IOException;
104

105
  public final void unpin(final WALEntryHandler walEntryHandler) throws IOException {
106
    final long memtableId = walEntryHandler.getMemTableId();
1✔
107
    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)];
1✔
108

109
    lock.lock();
1✔
110
    try {
111
      unpinInternal(memtableId, walEntryHandler);
1✔
112
    } finally {
113
      lock.unlock();
1✔
114
    }
115
  }
1✔
116

117
  protected abstract void unpinInternal(long memtableId, WALEntryHandler walEntryHandler)
118
      throws IOException;
119
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc