• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.46
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.commons.partition;
20

21
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
22
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
23
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
24
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
25
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
26
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
27

28
import org.apache.thrift.TException;
29
import org.apache.thrift.protocol.TProtocol;
30

31
import java.io.IOException;
32
import java.io.InputStream;
33
import java.io.OutputStream;
34
import java.nio.ByteBuffer;
35
import java.util.ArrayList;
36
import java.util.Comparator;
37
import java.util.HashMap;
38
import java.util.List;
39
import java.util.Map;
40
import java.util.Objects;
41
import java.util.concurrent.ConcurrentHashMap;
42
import java.util.concurrent.atomic.AtomicBoolean;
43
import java.util.concurrent.atomic.AtomicLong;
44
import java.util.stream.Collectors;
45

46
public class DataPartitionTable {
47

48
  private final Map<TSeriesPartitionSlot, SeriesPartitionTable> dataPartitionMap;
49

50
  public DataPartitionTable() {
1✔
51
    this.dataPartitionMap = new ConcurrentHashMap<>();
1✔
52
  }
1✔
53

54
  public DataPartitionTable(Map<TSeriesPartitionSlot, SeriesPartitionTable> dataPartitionMap) {
1✔
55
    this.dataPartitionMap = dataPartitionMap;
1✔
56
  }
1✔
57

58
  public Map<TSeriesPartitionSlot, SeriesPartitionTable> getDataPartitionMap() {
59
    return dataPartitionMap;
1✔
60
  }
61

62
  /**
63
   * Thread-safely get DataPartition within the specific StorageGroup
64
   *
65
   * @param partitionSlots SeriesPartitionSlots and TimePartitionSlots
66
   * @param dataPartitionTable Store the matched Partitions
67
   * @return True if all the PartitionSlots are matched, false otherwise
68
   */
69
  public boolean getDataPartition(
70
      Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots,
71
      DataPartitionTable dataPartitionTable) {
72
    AtomicBoolean result = new AtomicBoolean(true);
×
73
    if (partitionSlots.isEmpty()) {
×
74
      // Return all DataPartitions in one StorageGroup when the queried PartitionSlots are empty
75
      dataPartitionTable.getDataPartitionMap().putAll(dataPartitionMap);
×
76
    } else {
77
      // Return the DataPartition for each SeriesPartitionSlot
78
      partitionSlots.forEach(
×
79
          (seriesPartitionSlot, timePartitionSlotList) -> {
80
            if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
×
81
              SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
×
82
              if (!dataPartitionMap
×
83
                  .get(seriesPartitionSlot)
×
84
                  .getDataPartition(timePartitionSlotList, seriesPartitionTable)) {
×
85
                result.set(false);
×
86
              }
87

88
              if (!seriesPartitionTable.getSeriesPartitionMap().isEmpty()) {
×
89
                // Only return those non-empty DataPartitions
90
                dataPartitionTable
×
91
                    .getDataPartitionMap()
×
92
                    .put(seriesPartitionSlot, seriesPartitionTable);
×
93
              }
94
            } else {
×
95
              result.set(false);
×
96
            }
97
          });
×
98
    }
99
    return result.get();
×
100
  }
101

102
  /**
103
   * Checks whether the specified DataPartition has a successor and returns if it does.
104
   *
105
   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
106
   * @param timePartitionSlot Corresponding TimePartitionSlot
107
   * @return The specific DataPartition's successor if exists, null otherwise
108
   */
109
  public TConsensusGroupId getSuccessorDataPartition(
110
      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
111
    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
×
112
      return dataPartitionMap.get(seriesPartitionSlot).getSuccessorDataPartition(timePartitionSlot);
×
113
    } else {
114
      return null;
×
115
    }
116
  }
117

118
  /**
119
   * Checks whether the specified DataPartition has a predecessor and returns if it does.
120
   *
121
   * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
122
   * @param timePartitionSlot Corresponding TimePartitionSlot
123
   * @return The specific DataPartition's predecessor if exists, null otherwise
124
   */
125
  public TConsensusGroupId getPredecessorDataPartition(
126
      TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) {
127
    if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
×
128
      return dataPartitionMap
×
129
          .get(seriesPartitionSlot)
×
130
          .getPredecessorDataPartition(timePartitionSlot);
×
131
    } else {
132
      return null;
×
133
    }
134
  }
135

136
  /**
137
   * Create DataPartition within the specific StorageGroup
138
   *
139
   * @param assignedDataPartition Assigned result
140
   * @return Map<TConsensusGroupId, Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count>>
141
   */
142
  public Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> createDataPartition(
143
      DataPartitionTable assignedDataPartition) {
144
    Map<TConsensusGroupId, Map<TSeriesPartitionSlot, AtomicLong>> groupDeltaMap =
1✔
145
        new ConcurrentHashMap<>();
146

147
    assignedDataPartition
1✔
148
        .getDataPartitionMap()
1✔
149
        .forEach(
1✔
150
            ((seriesPartitionSlot, seriesPartitionTable) ->
151
                dataPartitionMap
152
                    .computeIfAbsent(seriesPartitionSlot, empty -> new SeriesPartitionTable())
1✔
153
                    .createDataPartition(
1✔
154
                        seriesPartitionTable, seriesPartitionSlot, groupDeltaMap)));
155

156
    return groupDeltaMap;
1✔
157
  }
158

159
  /**
160
   * Only Leader use this interface. Filter unassigned DataPartitionSlots within the specific
161
   * StorageGroup
162
   *
163
   * @param partitionSlots SeriesPartitionSlots and TimePartitionSlots
164
   * @return Unassigned PartitionSlots
165
   */
166
  public Map<TSeriesPartitionSlot, TTimeSlotList> filterUnassignedDataPartitionSlots(
167
      Map<TSeriesPartitionSlot, TTimeSlotList> partitionSlots) {
168
    Map<TSeriesPartitionSlot, TTimeSlotList> result = new ConcurrentHashMap<>();
×
169

170
    partitionSlots.forEach(
×
171
        (seriesPartitionSlot, timePartitionSlots) ->
172
            result.put(
×
173
                seriesPartitionSlot,
174
                new TTimeSlotList(
175
                    dataPartitionMap
176
                        .computeIfAbsent(seriesPartitionSlot, empty -> new SeriesPartitionTable())
×
177
                        .filterUnassignedDataPartitionSlots(
×
178
                            timePartitionSlots.getTimePartitionSlots()),
×
179
                    false,
180
                    false)));
181

182
    return result;
×
183
  }
184

185
  /**
186
   * Query a timePartition's corresponding dataRegionIds
187
   *
188
   * @param seriesSlotId SeriesPartitionSlot
189
   * @param timeSlotId TimePartitionSlot
190
   * @return the timePartition's corresponding dataRegionIds, if seriesSlotId==-1, then return all
191
   *     seriesPartitionTable's dataRegionIds; if timeSlotId == -1, then return all the seriesSlot's
192
   *     dataRegionIds.
193
   */
194
  public List<TConsensusGroupId> getRegionId(
195
      TSeriesPartitionSlot seriesSlotId, TTimePartitionSlot timeSlotId) {
196
    if (seriesSlotId.getSlotId() == -1) {
×
197
      List<TConsensusGroupId> regionIds = new ArrayList<>();
×
198
      dataPartitionMap.forEach(
×
199
          (seriesPartitionSlot, seriesPartitionTable) ->
200
              regionIds.addAll(seriesPartitionTable.getRegionId(timeSlotId)));
×
201
      return regionIds;
×
202
    } else if (!dataPartitionMap.containsKey(seriesSlotId)) {
×
203
      return new ArrayList<>();
×
204
    } else {
205
      SeriesPartitionTable seriesPartitionTable = dataPartitionMap.get(seriesSlotId);
×
206
      return seriesPartitionTable.getRegionId(timeSlotId);
×
207
    }
208
  }
209

210
  /**
211
   * Query timePartition
212
   *
213
   * @param seriesSlotId SeriesPartitionSlot
214
   * @param regionId TConsensusGroupId
215
   * @param startTime startTime
216
   * @return the timePartition if seriesSlotId==-1 && regionId == -1, then return all timePartition.
217
   */
218
  public List<TTimePartitionSlot> getTimeSlotList(
219
      TSeriesPartitionSlot seriesSlotId, TConsensusGroupId regionId, long startTime, long endTime) {
220
    if (seriesSlotId.getSlotId() == -1) {
×
221
      // query timePartition of specific database or region
222
      List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
×
223
      dataPartitionMap.forEach(
×
224
          (seriesPartitionSlot, seriesPartitionTable) ->
225
              timePartitionSlots.addAll(
×
226
                  seriesPartitionTable.getTimeSlotList(regionId, startTime, endTime)));
×
227
      return timePartitionSlots;
×
228
    } else if (!dataPartitionMap.containsKey(seriesSlotId)) {
×
229
      return new ArrayList<>();
×
230
    } else {
231
      // query timePartition of specific seriesPartition
232
      SeriesPartitionTable seriesPartitionTable = dataPartitionMap.get(seriesSlotId);
×
233
      return seriesPartitionTable.getTimeSlotList(regionId, startTime, endTime);
×
234
    }
235
  }
236

237
  /** Get timePartitionSlot count. */
238
  public long getTimeSlotCount() {
239
    AtomicLong sum = new AtomicLong();
×
240
    dataPartitionMap.forEach(
×
241
        (seriesPartitionSlot, seriesPartitionTable) ->
242
            sum.addAndGet(seriesPartitionTable.getSeriesPartitionMap().size()));
×
243
    return sum.get();
×
244
  }
245

246
  public List<TSeriesPartitionSlot> getSeriesSlotList() {
247
    return dataPartitionMap.keySet().stream()
×
248
        .sorted(Comparator.comparing(TSeriesPartitionSlot::getSlotId))
×
249
        .collect(Collectors.toList());
×
250
  }
251

252
  /**
253
   * Get the last DataAllotTable.
254
   *
255
   * @return The last DataAllotTable
256
   */
257
  public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
258
    Map<TSeriesPartitionSlot, TConsensusGroupId> result = new HashMap<>();
×
259
    dataPartitionMap.forEach(
×
260
        (seriesPartitionSlot, seriesPartitionTable) ->
261
            result.put(seriesPartitionSlot, seriesPartitionTable.getLastConsensusGroupId()));
×
262
    return result;
×
263
  }
264

265
  public void serialize(OutputStream outputStream, TProtocol protocol)
266
      throws IOException, TException {
267
    ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream);
1✔
268
    for (Map.Entry<TSeriesPartitionSlot, SeriesPartitionTable> seriesPartitionTableEntry :
269
        dataPartitionMap.entrySet()) {
1✔
270
      seriesPartitionTableEntry.getKey().write(protocol);
1✔
271
      seriesPartitionTableEntry.getValue().serialize(outputStream, protocol);
1✔
272
    }
1✔
273
  }
1✔
274

275
  /** Only for ConsensusRequest */
276
  public void deserialize(ByteBuffer buffer) {
277
    int length = buffer.getInt();
1✔
278
    for (int i = 0; i < length; i++) {
1✔
279
      TSeriesPartitionSlot seriesPartitionSlot =
1✔
280
          ThriftCommonsSerDeUtils.deserializeTSeriesPartitionSlot(buffer);
1✔
281
      SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
1✔
282
      seriesPartitionTable.deserialize(buffer);
1✔
283
      dataPartitionMap.put(seriesPartitionSlot, seriesPartitionTable);
1✔
284
    }
285
  }
1✔
286

287
  /** Only for Snapshot */
288
  public void deserialize(InputStream inputStream, TProtocol protocol)
289
      throws IOException, TException {
290
    int length = ReadWriteIOUtils.readInt(inputStream);
1✔
291
    for (int i = 0; i < length; i++) {
1✔
292
      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot();
1✔
293
      seriesPartitionSlot.read(protocol);
1✔
294
      SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
1✔
295
      seriesPartitionTable.deserialize(inputStream, protocol);
1✔
296
      dataPartitionMap.put(seriesPartitionSlot, seriesPartitionTable);
1✔
297
    }
298
  }
1✔
299

300
  @Override
301
  public boolean equals(Object o) {
302
    if (this == o) return true;
1✔
303
    if (o == null || getClass() != o.getClass()) return false;
1✔
304
    DataPartitionTable that = (DataPartitionTable) o;
1✔
305
    return dataPartitionMap.equals(that.dataPartitionMap);
1✔
306
  }
307

308
  @Override
309
  public int hashCode() {
310
    return Objects.hash(dataPartitionMap);
×
311
  }
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc