• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.43
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.partition;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
24
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
25
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
26
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
27
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
28

29
import org.apache.thrift.TException;
30
import org.apache.thrift.protocol.TProtocol;
31

32
import java.io.IOException;
33
import java.io.InputStream;
34
import java.io.OutputStream;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.List;
38
import java.util.Map;
39
import java.util.Objects;
40
import java.util.TreeMap;
41
import java.util.Vector;
42
import java.util.concurrent.ConcurrentHashMap;
43
import java.util.concurrent.atomic.AtomicBoolean;
44
import java.util.concurrent.atomic.AtomicLong;
45
import java.util.stream.Collectors;
46

47
public class SeriesPartitionTable {
48

49
  private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap;
50

51
  public SeriesPartitionTable() {
1✔
52
    this.seriesPartitionMap = new TreeMap<>();
1✔
53
  }
1✔
54

55
  public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap) {
1✔
56
    this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
1✔
57
  }
1✔
58

59
  public Map<TTimePartitionSlot, List<TConsensusGroupId>> getSeriesPartitionMap() {
60
    return seriesPartitionMap;
1✔
61
  }
62

63
  public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) {
64
    seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>()).add(groupId);
×
65
  }
×
66

67
  /**
68
   * Thread-safely get DataPartition within the specific Database.
69
   *
70
   * @param partitionSlotList TimePartitionSlotList
71
   * @param seriesPartitionTable Store the matched SeriesPartitions
72
   * @return True if all the SeriesPartitionSlots are matched, false otherwise
73
   */
74
  public boolean getDataPartition(
75
      TTimeSlotList partitionSlotList, SeriesPartitionTable seriesPartitionTable) {
76
    AtomicBoolean result = new AtomicBoolean(true);
×
77
    List<TTimePartitionSlot> partitionSlots = partitionSlotList.getTimePartitionSlots();
×
78

79
    if (partitionSlots.isEmpty()) {
×
80
      // Return all DataPartitions in one SeriesPartitionSlot
81
      // when the queried TimePartitionSlots are empty
82
      seriesPartitionTable.getSeriesPartitionMap().putAll(seriesPartitionMap);
×
83
    } else {
84
      boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
×
85
          isNeedRightAll = partitionSlotList.isNeedRightAll();
×
86
      if (isNeedLeftAll || isNeedRightAll) {
×
87
        // we need to calculate the leftMargin which contains all the time partition on the unclosed
88
        // left side: (-oo, leftMargin)
89
        // and the rightMargin which contains all the time partition on the unclosed right side:
90
        // (rightMargin, +oo)
91
        // all the remaining closed time range which locates in [leftMargin, rightMargin] will be
92
        // calculated outside if block
93
        long leftMargin = isNeedLeftAll ? partitionSlots.get(0).getStartTime() : Long.MIN_VALUE,
×
94
            rightMargin =
95
                isNeedRightAll
×
96
                    ? partitionSlots.get(partitionSlots.size() - 1).getStartTime()
×
97
                    : Long.MAX_VALUE;
×
98
        seriesPartitionTable
×
99
            .getSeriesPartitionMap()
×
100
            .putAll(
×
101
                seriesPartitionMap.entrySet().stream()
×
102
                    .filter(
×
103
                        entry -> {
104
                          long startTime = entry.getKey().getStartTime();
×
105
                          return startTime < leftMargin || startTime > rightMargin;
×
106
                        })
107
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
×
108
      }
109

110
      // Return the DataPartition for each match TimePartitionSlot
111
      partitionSlots.forEach(
×
112
          timePartitionSlot -> {
113
            if (seriesPartitionMap.containsKey(timePartitionSlot)) {
×
114
              seriesPartitionTable
×
115
                  .getSeriesPartitionMap()
×
116
                  .put(timePartitionSlot, seriesPartitionMap.get(timePartitionSlot));
×
117
            } else {
118
              result.set(false);
×
119
            }
120
          });
×
121
    }
122

123
    return result.get();
×
124
  }
125

126
  /**
127
   * Check and return the specified DataPartition's successor.
128
   *
129
   * @param timePartitionSlot Corresponding TimePartitionSlot
130
   * @return The specified DataPartition's successor if exists, null otherwise
131
   */
132
  public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot timePartitionSlot) {
133
    TTimePartitionSlot successorSlot = seriesPartitionMap.higherKey(timePartitionSlot);
×
134
    return successorSlot == null ? null : seriesPartitionMap.get(successorSlot).get(0);
×
135
  }
136

137
  /**
138
   * Check and return the specified DataPartition's predecessor.
139
   *
140
   * @param timePartitionSlot Corresponding TimePartitionSlot
141
   * @return The specified DataPartition's predecessor if exists, null otherwise
142
   */
143
  public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot timePartitionSlot) {
144
    TTimePartitionSlot predecessorSlot = seriesPartitionMap.lowerKey(timePartitionSlot);
×
145
    return predecessorSlot == null ? null : seriesPartitionMap.get(predecessorSlot).get(0);
×
146
  }
147

148
  /**
149
   * Query a timePartition's corresponding dataRegionIds.
150
   *
151
   * @param timeSlotId Time partition's timeSlotId
152
   * @return the timePartition's corresponding dataRegionIds
153
   */
154
  List<TConsensusGroupId> getRegionId(TTimePartitionSlot timeSlotId) {
155
    if (timeSlotId.getStartTime() != -1) {
×
156
      if (!seriesPartitionMap.containsKey(timeSlotId)) {
×
157
        return new ArrayList<>();
×
158
      }
159
      return seriesPartitionMap.get(timeSlotId);
×
160
    } else {
161
      return seriesPartitionMap.values().stream()
×
162
          .flatMap(List::stream)
×
163
          .collect(Collectors.toList());
×
164
    }
165
  }
166

167
  List<TTimePartitionSlot> getTimeSlotList(
168
      TConsensusGroupId regionId, long startTime, long endTime) {
169
    if (regionId.getId() == -1) {
×
170
      return seriesPartitionMap.keySet().stream()
×
171
          .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < endTime)
×
172
          .collect(Collectors.toList());
×
173
    } else {
174
      return seriesPartitionMap.keySet().stream()
×
175
          .filter(e -> e.getStartTime() >= startTime && e.getStartTime() < endTime)
×
176
          .filter(e -> seriesPartitionMap.get(e).contains(regionId))
×
177
          .collect(Collectors.toList());
×
178
    }
179
  }
180

181
  /**
182
   * Create DataPartition within the specific SeriesPartitionSlot.
183
   *
184
   * @param assignedSeriesPartitionTable Assigned result
185
   * @param seriesPartitionSlot Corresponding TSeriesPartitionSlot
186
   * @param groupDeltaMap Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta TTimePartitionSlot
187
   *     Count>>
188
   */
189
  public void createDataPartition(
190
      SeriesPartitionTable assignedSeriesPartitionTable,
191
      TSeriesPartitionSlot seriesPartitionSlot,
192
      Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> groupDeltaMap) {
193
    assignedSeriesPartitionTable
1✔
194
        .getSeriesPartitionMap()
1✔
195
        .forEach(
1✔
196
            ((timePartitionSlot, consensusGroupIds) -> {
197
              seriesPartitionMap.put(timePartitionSlot, new Vector<>(consensusGroupIds));
1✔
198
              consensusGroupIds.forEach(
1✔
199
                  consensusGroupId ->
200
                      groupDeltaMap
201
                          .computeIfAbsent(consensusGroupId, empty -> new ConcurrentHashMap<>())
1✔
202
                          .computeIfAbsent(seriesPartitionSlot, empty -> new AtomicLong(0))
1✔
203
                          .getAndIncrement());
1✔
204
            }));
1✔
205
  }
1✔
206

207
  /**
208
   * Only Leader use this interface. And this interface is synchronized. Thread-safely filter no
209
   * assigned DataPartitionSlots within the specific SeriesPartitionSlot.
210
   *
211
   * @param partitionSlots TimePartitionSlots
212
   * @return Unassigned PartitionSlots
213
   */
214
  public synchronized List<TTimePartitionSlot> filterUnassignedDataPartitionSlots(
215
      List<TTimePartitionSlot> partitionSlots) {
216
    List<TTimePartitionSlot> result = new Vector<>();
×
217

218
    partitionSlots.forEach(
×
219
        timePartitionSlot -> {
220
          if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
×
221
            result.add(timePartitionSlot);
×
222
          }
223
        });
×
224

225
    return result;
×
226
  }
227

228
  /**
229
   * Get the last DataPartition's ConsensusGroupId.
230
   *
231
   * @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet
232
   */
233
  public TConsensusGroupId getLastConsensusGroupId() {
234
    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
×
235
        seriesPartitionMap.lastEntry();
×
236
    if (lastEntry == null) {
×
237
      return null;
×
238
    }
239
    return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
×
240
  }
241

242
  public void serialize(OutputStream outputStream, TProtocol protocol)
243
      throws IOException, TException {
244
    ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream);
1✔
245
    for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionEntry :
246
        seriesPartitionMap.entrySet()) {
1✔
247
      seriesPartitionEntry.getKey().write(protocol);
1✔
248
      ReadWriteIOUtils.write(seriesPartitionEntry.getValue().size(), outputStream);
1✔
249
      for (TConsensusGroupId consensusGroupId : seriesPartitionEntry.getValue()) {
1✔
250
        consensusGroupId.write(protocol);
1✔
251
      }
1✔
252
    }
1✔
253
  }
1✔
254

255
  /** Only for ConsensusRequest. */
256
  public void deserialize(ByteBuffer buffer) {
257
    int timePartitionSlotNum = buffer.getInt();
1✔
258
    for (int i = 0; i < timePartitionSlotNum; i++) {
1✔
259
      TTimePartitionSlot timePartitionSlot =
1✔
260
          ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);
1✔
261

262
      int consensusGroupIdNum = buffer.getInt();
1✔
263
      List<TConsensusGroupId> consensusGroupIds = new Vector<>();
1✔
264
      for (int j = 0; j < consensusGroupIdNum; j++) {
1✔
265
        consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
1✔
266
      }
267

268
      seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
1✔
269
    }
270
  }
1✔
271

272
  /** Only for Snapshot. */
273
  public void deserialize(InputStream inputStream, TProtocol protocol)
274
      throws IOException, TException {
275
    int timePartitionSlotNum = ReadWriteIOUtils.readInt(inputStream);
1✔
276
    for (int i = 0; i < timePartitionSlotNum; i++) {
1✔
277
      TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
1✔
278
      timePartitionSlot.read(protocol);
1✔
279

280
      int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
1✔
281
      List<TConsensusGroupId> consensusGroupIds = new Vector<>();
1✔
282
      for (int j = 0; j < consensusGroupIdNum; j++) {
1✔
283
        TConsensusGroupId consensusGroupId = new TConsensusGroupId();
1✔
284
        consensusGroupId.read(protocol);
1✔
285
        consensusGroupIds.add(consensusGroupId);
1✔
286
      }
287

288
      seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
1✔
289
    }
290
  }
1✔
291

292
  @Override
293
  public boolean equals(Object o) {
294
    if (this == o) return true;
1✔
295
    if (o == null || getClass() != o.getClass()) return false;
1✔
296
    SeriesPartitionTable that = (SeriesPartitionTable) o;
1✔
297
    return seriesPartitionMap.equals(that.seriesPartitionMap);
1✔
298
  }
299

300
  @Override
301
  public int hashCode() {
302
    return Objects.hash(seriesPartitionMap);
×
303
  }
304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc