• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9711

pending completion
#9711

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6089] Improved the lock behaviour of the pipe heartbeat to avoid causing DataNode unknown (#10714) (#10718)

12 of 12 new or added lines in 2 files covered. (100.0%)

79232 of 165180 relevant lines covered (47.97%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.37
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.pipe.task.meta;
21

22
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
23

24
import java.io.FileInputStream;
25
import java.io.FileOutputStream;
26
import java.io.IOException;
27
import java.util.Map;
28
import java.util.Objects;
29
import java.util.concurrent.ConcurrentHashMap;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.locks.ReentrantReadWriteLock;
32

33
public class PipeMetaKeeper {
34

35
  protected final Map<String, PipeMeta> pipeNameToPipeMetaMap;
36

37
  private final ReentrantReadWriteLock pipeMetaKeeperLock;
38

39
  public PipeMetaKeeper() {
1✔
40
    pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
1✔
41
    pipeMetaKeeperLock = new ReentrantReadWriteLock(true);
1✔
42
  }
1✔
43

44
  /////////////////////////////////  Lock  /////////////////////////////////
45

46
  public void acquireReadLock() {
47
    pipeMetaKeeperLock.readLock().lock();
1✔
48
  }
1✔
49

50
  public boolean tryReadLock(long timeOut) throws InterruptedException {
51
    return pipeMetaKeeperLock.readLock().tryLock(timeOut, TimeUnit.SECONDS);
×
52
  }
53

54
  public void releaseReadLock() {
55
    pipeMetaKeeperLock.readLock().unlock();
1✔
56
  }
1✔
57

58
  public void acquireWriteLock() {
59
    pipeMetaKeeperLock.writeLock().lock();
1✔
60
  }
1✔
61

62
  public void releaseWriteLock() {
63
    pipeMetaKeeperLock.writeLock().unlock();
1✔
64
  }
1✔
65

66
  /////////////////////////////////  PipeMeta  /////////////////////////////////
67

68
  public void addPipeMeta(String pipeName, PipeMeta pipeMeta) {
69
    pipeNameToPipeMetaMap.put(pipeName, pipeMeta);
1✔
70
  }
1✔
71

72
  public PipeMeta getPipeMeta(String pipeName) {
73
    return pipeNameToPipeMetaMap.get(pipeName);
1✔
74
  }
75

76
  public void removePipeMeta(String pipeName) {
77
    pipeNameToPipeMetaMap.remove(pipeName);
1✔
78
  }
1✔
79

80
  public boolean containsPipeMeta(String pipeName) {
81
    return pipeNameToPipeMetaMap.containsKey(pipeName);
1✔
82
  }
83

84
  public Iterable<PipeMeta> getPipeMetaList() {
85
    return pipeNameToPipeMetaMap.values();
×
86
  }
87

88
  public void clear() {
89
    this.pipeNameToPipeMetaMap.clear();
1✔
90
  }
1✔
91

92
  public boolean isEmpty() {
93
    return pipeNameToPipeMetaMap.isEmpty();
1✔
94
  }
95

96
  /////////////////////////////////  Snapshot  /////////////////////////////////
97

98
  public void processTakeSnapshot(FileOutputStream fileOutputStream) throws IOException {
99
    ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
1✔
100
    for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet()) {
1✔
101
      ReadWriteIOUtils.write(entry.getKey(), fileOutputStream);
1✔
102
      entry.getValue().serialize(fileOutputStream);
1✔
103
    }
1✔
104
  }
1✔
105

106
  public void processLoadSnapshot(FileInputStream fileInputStream) throws IOException {
107
    clear();
1✔
108

109
    final int size = ReadWriteIOUtils.readInt(fileInputStream);
1✔
110
    for (int i = 0; i < size; i++) {
1✔
111
      final String pipeName = ReadWriteIOUtils.readString(fileInputStream);
1✔
112
      pipeNameToPipeMetaMap.put(pipeName, PipeMeta.deserialize(fileInputStream));
1✔
113
    }
114
  }
1✔
115

116
  /////////////////////////////////  Override  /////////////////////////////////
117

118
  @Override
119
  public boolean equals(Object o) {
120
    if (this == o) {
1✔
121
      return true;
×
122
    }
123
    if (o == null || getClass() != o.getClass()) {
1✔
124
      return false;
×
125
    }
126
    PipeMetaKeeper that = (PipeMetaKeeper) o;
1✔
127
    return Objects.equals(pipeNameToPipeMetaMap, that.pipeNameToPipeMetaMap);
1✔
128
  }
129

130
  @Override
131
  public int hashCode() {
132
    return Objects.hash(pipeNameToPipeMetaMap);
×
133
  }
134

135
  @Override
136
  public String toString() {
137
    return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" + pipeNameToPipeMetaMap + '}';
1✔
138
  }
139
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc