• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9770

pending completion
#9770

push

travis_ci

web-flow
[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)

Support tsfile cascade transport. For example, there are 3 iotdb clusters A, B and C. Now we can use pipe to transport tsfile from A to C (via B, A -> B -> C).

(cherry picked from commit b3a4bdf81)

5 of 5 new or added lines in 4 files covered. (100.0%)

79456 of 165675 relevant lines covered (47.96%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.43
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.resource.wal;
21

22
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
24
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
25
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
26

27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
import java.io.Closeable;
31
import java.util.concurrent.atomic.AtomicBoolean;
32
import java.util.concurrent.atomic.AtomicInteger;
33
import java.util.concurrent.atomic.AtomicLong;
34

35
public abstract class PipeWALResource implements Closeable {
36

37
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class);
1✔
38

39
  protected final WALEntryHandler walEntryHandler;
40

41
  private final AtomicInteger referenceCount;
42

43
  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60;
44
  private final AtomicLong lastLogicalPinTime;
45
  private final AtomicBoolean isPhysicallyPinned;
46

47
  protected PipeWALResource(WALEntryHandler walEntryHandler) {
1✔
48
    this.walEntryHandler = walEntryHandler;
1✔
49

50
    referenceCount = new AtomicInteger(0);
1✔
51

52
    lastLogicalPinTime = new AtomicLong(0);
1✔
53
    isPhysicallyPinned = new AtomicBoolean(false);
1✔
54
  }
1✔
55

56
  public final void pin() throws PipeRuntimeNonCriticalException {
57
    if (referenceCount.get() == 0) {
1✔
58
      if (!isPhysicallyPinned.get()) {
1✔
59
        try {
60
          pinInternal();
1✔
61
        } catch (MemTablePinException e) {
×
62
          throw new PipeRuntimeNonCriticalException(
×
63
              String.format(
×
64
                  "failed to pin wal %d, because %s",
65
                  walEntryHandler.getMemTableId(), e.getMessage()));
×
66
        }
1✔
67
        isPhysicallyPinned.set(true);
1✔
68
        LOGGER.info("wal {} is pinned by pipe engine", walEntryHandler.getMemTableId());
1✔
69
      } // else means the wal is already pinned, do nothing
70

71
      // no matter the wal is pinned or not, update the last pin time
72
      lastLogicalPinTime.set(System.currentTimeMillis());
1✔
73
    }
74

75
    referenceCount.incrementAndGet();
1✔
76
  }
1✔
77

78
  protected abstract void pinInternal()
79
      throws MemTablePinException, PipeRuntimeNonCriticalException;
80

81
  public final void unpin() throws PipeRuntimeNonCriticalException {
82
    final int finalReferenceCount = referenceCount.get();
1✔
83

84
    if (finalReferenceCount == 1) {
1✔
85
      unpinPhysicallyIfOutOfTimeToLive();
×
86
    } else if (finalReferenceCount < 1) {
1✔
87
      throw new PipeRuntimeCriticalException(
×
88
          String.format(
×
89
              "wal %d is unpinned more than pinned, this should not happen",
90
              walEntryHandler.getMemTableId()));
×
91
    }
92

93
    referenceCount.decrementAndGet();
1✔
94
  }
1✔
95

96
  protected abstract void unpinInternal()
97
      throws MemTablePinException, PipeRuntimeNonCriticalException;
98

99
  /**
100
   * Invalidate the wal if it is unpinned and out of time to live.
101
   *
102
   * @return true if the wal is invalidated, false otherwise
103
   */
104
  public final boolean invalidateIfPossible() {
105
    if (referenceCount.get() > 0) {
×
106
      return false;
×
107
    }
108

109
    // referenceCount.get() == 0
110
    return unpinPhysicallyIfOutOfTimeToLive();
×
111
  }
112

113
  /**
114
   * Unpin the wal if it is out of time to live.
115
   *
116
   * @return true if the wal is unpinned physically (then it can be invalidated), false otherwise
117
   * @throws PipeRuntimeNonCriticalException if failed to unpin WAL of memtable.
118
   */
119
  private boolean unpinPhysicallyIfOutOfTimeToLive() {
120
    if (isPhysicallyPinned.get()) {
×
121
      if (System.currentTimeMillis() - lastLogicalPinTime.get() > MIN_TIME_TO_LIVE_IN_MS) {
×
122
        try {
123
          unpinInternal();
×
124
        } catch (MemTablePinException e) {
×
125
          throw new PipeRuntimeNonCriticalException(
×
126
              String.format(
×
127
                  "failed to unpin wal %d, because %s",
128
                  walEntryHandler.getMemTableId(), e.getMessage()));
×
129
        }
×
130
        isPhysicallyPinned.set(false);
×
131
        LOGGER.info(
×
132
            "wal {} is unpinned by pipe engine when checking time to live",
133
            walEntryHandler.getMemTableId());
×
134
        return true;
×
135
      } else {
136
        return false;
×
137
      }
138
    } else {
139
      LOGGER.info(
×
140
          "wal {} is not pinned physically when checking time to live",
141
          walEntryHandler.getMemTableId());
×
142
      return true;
×
143
    }
144
  }
145

146
  @Override
147
  public final void close() {
148
    if (isPhysicallyPinned.get()) {
×
149
      try {
150
        unpinInternal();
×
151
      } catch (MemTablePinException e) {
×
152
        LOGGER.error(
×
153
            "failed to unpin wal {} when closing pipe wal resource, because {}",
154
            walEntryHandler.getMemTableId(),
×
155
            e.getMessage());
×
156
      }
×
157
      isPhysicallyPinned.set(false);
×
158
      LOGGER.info(
×
159
          "wal {} is unpinned by pipe engine when closing pipe wal resource",
160
          walEntryHandler.getMemTableId());
×
161
    }
162

163
    referenceCount.set(0);
×
164
  }
×
165

166
  public int getReferenceCount() {
167
    return referenceCount.get();
×
168
  }
169
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc