• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.load;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
26
import org.apache.iotdb.commons.cluster.NodeStatus;
27
import org.apache.iotdb.commons.cluster.NodeType;
28
import org.apache.iotdb.commons.cluster.RegionStatus;
29
import org.apache.iotdb.commons.partition.DataPartitionTable;
30
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
31
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
32
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
33
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
34
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
35
import org.apache.iotdb.confignode.manager.IManager;
36
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
37
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
38
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
39
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
40
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
41
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
42
import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
43
import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
44
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
45
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
46

47
import com.google.common.eventbus.AsyncEventBus;
48
import com.google.common.eventbus.EventBus;
49

50
import java.util.List;
51
import java.util.Map;
52
import java.util.concurrent.Executors;
53
import java.util.concurrent.atomic.AtomicInteger;
54

55
/**
56
 * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
57
 * dynamic load balancing policy and passively accepts the PartitionTable expansion request.
58
 */
59
public class LoadManager {
60

61
  private final IManager configManager;
62

63
  /** Balancers. */
64
  private final RegionBalancer regionBalancer;
65

66
  private final PartitionBalancer partitionBalancer;
67
  private final RouteBalancer routeBalancer;
68

69
  /** Cluster load services. */
70
  private final LoadCache loadCache;
71

72
  private final HeartbeatService heartbeatService;
73
  private final StatisticsService statisticsService;
74

75
  private final EventBus loadPublisher =
×
76
      new AsyncEventBus("Cluster-LoadPublisher-Thread", Executors.newFixedThreadPool(5));
×
77

78
  public LoadManager(IManager configManager) {
×
79
    this.configManager = configManager;
×
80

81
    this.regionBalancer = new RegionBalancer(configManager);
×
82
    this.partitionBalancer = new PartitionBalancer(configManager);
×
83
    this.routeBalancer = new RouteBalancer(configManager);
×
84

85
    this.loadCache = new LoadCache();
×
86
    this.heartbeatService = new HeartbeatService(configManager, loadCache);
×
87
    this.statisticsService =
×
88
        new StatisticsService(configManager, routeBalancer, loadCache, loadPublisher);
89

90
    loadPublisher.register(statisticsService);
×
91
    loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
×
92
  }
×
93

94
  /**
95
   * Generate an optimal CreateRegionGroupsPlan.
96
   *
97
   * @param allotmentMap Map<DatabaseName, Region allotment>
98
   * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
99
   * @return CreateRegionGroupsPlan
100
   * @throws NotEnoughDataNodeException If there are not enough DataNodes
101
   * @throws DatabaseNotExistsException If some specific StorageGroups don't exist
102
   */
103
  public CreateRegionGroupsPlan allocateRegionGroups(
104
      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
105
      throws NotEnoughDataNodeException, DatabaseNotExistsException {
106
    return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
×
107
  }
108

109
  /**
110
   * Allocate SchemaPartitions.
111
   *
112
   * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
113
   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
114
   */
115
  public Map<String, SchemaPartitionTable> allocateSchemaPartition(
116
      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
117
      throws NoAvailableRegionGroupException {
118
    return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
×
119
  }
120

121
  /**
122
   * Allocate DataPartitions.
123
   *
124
   * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
125
   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
126
   */
127
  public Map<String, DataPartitionTable> allocateDataPartition(
128
      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
129
      throws NoAvailableRegionGroupException {
130
    return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
×
131
  }
132

133
  public void reBalanceDataPartitionPolicy(String database) {
134
    partitionBalancer.reBalanceDataPartitionPolicy(database);
×
135
  }
×
136

137
  public void broadcastLatestRegionRouteMap() {
138
    statisticsService.broadcastLatestRegionRouteMap();
×
139
  }
×
140

141
  public void startLoadServices() {
142
    loadCache.initHeartbeatCache(configManager);
×
143
    heartbeatService.startHeartbeatService();
×
144
    statisticsService.startLoadStatisticsService();
×
145
    partitionBalancer.setupPartitionBalancer();
×
146
  }
×
147

148
  public void stopLoadServices() {
149
    heartbeatService.stopHeartbeatService();
×
150
    statisticsService.stopLoadStatisticsService();
×
151
    loadCache.clearHeartbeatCache();
×
152
    partitionBalancer.clearPartitionBalancer();
×
153
  }
×
154

155
  public void clearDataPartitionPolicyTable(String database) {
156
    partitionBalancer.clearDataPartitionPolicyTable(database);
×
157
  }
×
158

159
  /**
160
   * Safely get NodeStatus by NodeId.
161
   *
162
   * @param nodeId The specified NodeId
163
   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
164
   */
165
  public NodeStatus getNodeStatus(int nodeId) {
166
    return loadCache.getNodeStatus(nodeId);
×
167
  }
168

169
  /**
170
   * Safely get the specified Node's current status with reason.
171
   *
172
   * @param nodeId The specified NodeId
173
   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
174
   */
175
  public String getNodeStatusWithReason(int nodeId) {
176
    return loadCache.getNodeStatusWithReason(nodeId);
×
177
  }
178

179
  /**
180
   * Get all Node's current status with reason.
181
   *
182
   * @return Map<NodeId, NodeStatus with reason>
183
   */
184
  public Map<Integer, String> getNodeStatusWithReason() {
185
    return loadCache.getNodeStatusWithReason();
×
186
  }
187

188
  /**
189
   * Filter ConfigNodes through the specified NodeStatus.
190
   *
191
   * @param status The specified NodeStatus
192
   * @return Filtered ConfigNodes with the specified NodeStatus
193
   */
194
  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
195
    return loadCache.filterConfigNodeThroughStatus(status);
×
196
  }
197

198
  /**
199
   * Filter DataNodes through the specified NodeStatus.
200
   *
201
   * @param status The specified NodeStatus
202
   * @return Filtered DataNodes with the specified NodeStatus
203
   */
204
  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
205
    return loadCache.filterDataNodeThroughStatus(status);
×
206
  }
207

208
  /**
209
   * Get the free disk space of the specified DataNode.
210
   *
211
   * @param dataNodeId The index of the specified DataNode
212
   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
213
   */
214
  public double getFreeDiskSpace(int dataNodeId) {
215
    return loadCache.getFreeDiskSpace(dataNodeId);
×
216
  }
217

218
  /**
219
   * Get the loadScore of each DataNode.
220
   *
221
   * @return Map<DataNodeId, loadScore>
222
   */
223
  public Map<Integer, Long> getAllDataNodeLoadScores() {
224
    return loadCache.getAllDataNodeLoadScores();
×
225
  }
226

227
  /**
228
   * Get the lowest loadScore DataNode.
229
   *
230
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
231
   */
232
  public int getLowestLoadDataNode() {
233
    return loadCache.getLowestLoadDataNode();
×
234
  }
235

236
  /**
237
   * Get the lowest loadScore DataNode from the specified DataNodes.
238
   *
239
   * @param dataNodeIds The specified DataNodes
240
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
241
   */
242
  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
243
    return loadCache.getLowestLoadDataNode(dataNodeIds);
×
244
  }
245

246
  /**
247
   * Force update the specified Node's cache.
248
   *
249
   * @param nodeType Specified NodeType
250
   * @param nodeId Specified NodeId
251
   * @param heartbeatSample Specified NodeHeartbeatSample
252
   */
253
  public void forceUpdateNodeCache(
254
      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
255
    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
×
256
  }
×
257

258
  /** Remove the specified Node's cache. */
259
  public void removeNodeCache(int nodeId) {
260
    loadCache.removeNodeCache(nodeId);
×
261
  }
×
262

263
  /**
264
   * Safely get RegionStatus.
265
   *
266
   * @param consensusGroupId Specified RegionGroupId
267
   * @param dataNodeId Specified RegionReplicaId
268
   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
269
   */
270
  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
271
    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
×
272
  }
273

274
  /**
275
   * Safely get RegionGroupStatus.
276
   *
277
   * @param consensusGroupId Specified RegionGroupId
278
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
279
   */
280
  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
281
    return loadCache.getRegionGroupStatus(consensusGroupId);
×
282
  }
283

284
  /**
285
   * Safely get RegionGroupStatus.
286
   *
287
   * @param consensusGroupIds Specified RegionGroupIds
288
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
289
   */
290
  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
291
      List<TConsensusGroupId> consensusGroupIds) {
292
    return loadCache.getRegionGroupStatus(consensusGroupIds);
×
293
  }
294

295
  /**
296
   * Filter the RegionGroups through the RegionGroupStatus.
297
   *
298
   * @param status The specified RegionGroupStatus
299
   * @return Filtered RegionGroups with the specified RegionGroupStatus
300
   */
301
  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
302
    return loadCache.filterRegionGroupThroughStatus(status);
×
303
  }
304

305
  /**
306
   * Count the number of cluster Regions with specified RegionStatus.
307
   *
308
   * @param type The specified RegionGroupType
309
   * @param status The specified statues
310
   * @return The number of cluster Regions with specified RegionStatus
311
   */
312
  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
313
    return loadCache.countRegionWithSpecifiedStatus(type, status);
×
314
  }
315

316
  /**
317
   * Force update the specified RegionGroup's cache.
318
   *
319
   * @param regionGroupId Specified RegionGroupId
320
   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
321
   */
322
  public void forceUpdateRegionGroupCache(
323
      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
324
    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
×
325
  }
×
326

327
  /** Remove the specified RegionGroup's cache. */
328
  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
329
    loadCache.removeRegionGroupCache(consensusGroupId);
×
330
  }
×
331

332
  /**
333
   * Get the latest RegionLeaderMap.
334
   *
335
   * @return Map<RegionGroupId, leaderId>
336
   */
337
  public Map<TConsensusGroupId, Integer> getRegionLeaderMap() {
338
    return loadCache.getRegionLeaderMap();
×
339
  }
340

341
  /**
342
   * Get the latest RegionPriorityMap.
343
   *
344
   * @return Map<RegionGroupId, RegionPriority>.
345
   */
346
  public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
347
    return loadCache.getRegionPriorityMap();
×
348
  }
349

350
  /**
351
   * Get the number of RegionGroup-leaders in the specified DataNode.
352
   *
353
   * @param dataNodeId The specified DataNode
354
   * @param type SchemaRegion or DataRegion
355
   * @return The number of RegionGroup-leaders
356
   */
357
  public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) {
358
    AtomicInteger result = new AtomicInteger(0);
×
359
    getRegionLeaderMap()
×
360
        .forEach(
×
361
            ((consensusGroupId, leaderId) -> {
362
              if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) {
×
363
                result.getAndIncrement();
×
364
              }
365
            }));
×
366
    return result.get();
×
367
  }
368

369
  /**
370
   * Wait for the specified RegionGroups to finish leader election
371
   *
372
   * @param regionGroupIds Specified RegionGroupIds
373
   */
374
  public void waitForLeaderElection(List<TConsensusGroupId> regionGroupIds) {
375
    loadCache.waitForLeaderElection(regionGroupIds);
×
376
  }
×
377

378
  /**
379
   * Force update the specified RegionGroup's leader.
380
   *
381
   * @param regionGroupId Specified RegionGroupId
382
   * @param leaderId Leader DataNodeId
383
   */
384
  public void forceUpdateRegionLeader(TConsensusGroupId regionGroupId, int leaderId) {
385
    loadCache.forceUpdateRegionLeader(regionGroupId, leaderId);
×
386
  }
×
387

388
  /**
389
   * Force update the specified RegionGroup's priority.
390
   *
391
   * @param regionGroupId Specified RegionGroupId
392
   * @param regionPriority Region route priority
393
   */
394
  public void forceUpdateRegionPriority(
395
      TConsensusGroupId regionGroupId, TRegionReplicaSet regionPriority) {
396
    loadCache.forceUpdateRegionPriority(regionGroupId, regionPriority);
×
397
  }
×
398

399
  /**
400
   * Remove the specified RegionGroup's route cache.
401
   *
402
   * @param regionGroupId Specified RegionGroupId
403
   */
404
  public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
405
    loadCache.removeRegionRouteCache(regionGroupId);
×
406
  }
×
407
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc