• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9871

18 Aug 2023 08:01AM UTC coverage: 47.982% (-0.02%) from 48.003%
#9871

push

travis_ci

web-flow
[IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN send exactly one pipeMeta to each DN upon create/start/stop/drop pipe (#10875) (#10905)

Currently, CN sends pipeMeta of all existing pipes to each dn upon create/start/stop/drop pipe, which may be time-comsuming.

In this commit, CN will send exactly one pipeMeta to each DN upon create/start/stop/drop pipe.

---------

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 38b36006b)

95 of 95 new or added lines in 12 files covered. (100.0%)

79801 of 166313 relevant lines covered (47.98%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.pipe.task.meta;
21

22
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
23

24
import java.io.FileInputStream;
25
import java.io.FileOutputStream;
26
import java.io.IOException;
27
import java.util.Map;
28
import java.util.Objects;
29
import java.util.concurrent.ConcurrentHashMap;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.locks.ReentrantReadWriteLock;
32

33
public class PipeMetaKeeper {
34

35
  protected final Map<String, PipeMeta> pipeNameToPipeMetaMap;
36

37
  private final ReentrantReadWriteLock pipeMetaKeeperLock;
38

39
  public PipeMetaKeeper() {
1✔
40
    pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
1✔
41
    pipeMetaKeeperLock = new ReentrantReadWriteLock(true);
1✔
42
  }
1✔
43

44
  /////////////////////////////////  Lock  /////////////////////////////////
45

46
  public void acquireReadLock() {
47
    pipeMetaKeeperLock.readLock().lock();
1✔
48
  }
1✔
49

50
  public boolean tryReadLock(long timeOut) throws InterruptedException {
51
    return pipeMetaKeeperLock.readLock().tryLock(timeOut, TimeUnit.SECONDS);
×
52
  }
53

54
  public void releaseReadLock() {
55
    pipeMetaKeeperLock.readLock().unlock();
1✔
56
  }
1✔
57

58
  public void acquireWriteLock() {
59
    pipeMetaKeeperLock.writeLock().lock();
1✔
60
  }
1✔
61

62
  public void releaseWriteLock() {
63
    pipeMetaKeeperLock.writeLock().unlock();
1✔
64
  }
1✔
65

66
  /////////////////////////////////  PipeMeta  /////////////////////////////////
67

68
  public void addPipeMeta(String pipeName, PipeMeta pipeMeta) {
69
    pipeNameToPipeMetaMap.put(pipeName, pipeMeta);
1✔
70
  }
1✔
71

72
  public PipeMeta getPipeMeta(String pipeName) {
73
    return pipeNameToPipeMetaMap.get(pipeName);
1✔
74
  }
75

76
  public void removePipeMeta(String pipeName) {
77
    pipeNameToPipeMetaMap.remove(pipeName);
1✔
78
  }
1✔
79

80
  public boolean containsPipeMeta(String pipeName) {
81
    return pipeNameToPipeMetaMap.containsKey(pipeName);
1✔
82
  }
83

84
  public Iterable<PipeMeta> getPipeMetaList() {
85
    return pipeNameToPipeMetaMap.values();
×
86
  }
87

88
  public PipeMeta getPipeMetaByPipeName(String pipeName) {
89
    return pipeNameToPipeMetaMap.get(pipeName);
×
90
  }
91

92
  public void clear() {
93
    this.pipeNameToPipeMetaMap.clear();
1✔
94
  }
1✔
95

96
  public boolean isEmpty() {
97
    return pipeNameToPipeMetaMap.isEmpty();
1✔
98
  }
99

100
  /////////////////////////////////  Snapshot  /////////////////////////////////
101

102
  public void processTakeSnapshot(FileOutputStream fileOutputStream) throws IOException {
103
    ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
1✔
104
    for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet()) {
1✔
105
      ReadWriteIOUtils.write(entry.getKey(), fileOutputStream);
1✔
106
      entry.getValue().serialize(fileOutputStream);
1✔
107
    }
1✔
108
  }
1✔
109

110
  public void processLoadSnapshot(FileInputStream fileInputStream) throws IOException {
111
    clear();
1✔
112

113
    final int size = ReadWriteIOUtils.readInt(fileInputStream);
1✔
114
    for (int i = 0; i < size; i++) {
1✔
115
      final String pipeName = ReadWriteIOUtils.readString(fileInputStream);
1✔
116
      pipeNameToPipeMetaMap.put(pipeName, PipeMeta.deserialize(fileInputStream));
1✔
117
    }
118
  }
1✔
119

120
  /////////////////////////////////  Override  /////////////////////////////////
121

122
  @Override
123
  public boolean equals(Object o) {
124
    if (this == o) {
1✔
125
      return true;
×
126
    }
127
    if (o == null || getClass() != o.getClass()) {
1✔
128
      return false;
×
129
    }
130
    PipeMetaKeeper that = (PipeMetaKeeper) o;
1✔
131
    return Objects.equals(pipeNameToPipeMetaMap, that.pipeNameToPipeMetaMap);
1✔
132
  }
133

134
  @Override
135
  public int hashCode() {
136
    return Objects.hash(pipeNameToPipeMetaMap);
×
137
  }
138

139
  @Override
140
  public String toString() {
141
    return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" + pipeNameToPipeMetaMap + '}';
1✔
142
  }
143
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc