• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.load;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
26
import org.apache.iotdb.commons.cluster.NodeStatus;
27
import org.apache.iotdb.commons.cluster.NodeType;
28
import org.apache.iotdb.commons.cluster.RegionStatus;
29
import org.apache.iotdb.commons.partition.DataPartitionTable;
30
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
31
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
32
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
33
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
34
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
35
import org.apache.iotdb.confignode.manager.IManager;
36
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
37
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
38
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
39
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
40
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
41
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
42
import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
43
import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
44
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
45
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
46

47
import com.google.common.eventbus.AsyncEventBus;
48
import com.google.common.eventbus.EventBus;
49

50
import java.util.List;
51
import java.util.Map;
52
import java.util.concurrent.Executors;
53
import java.util.concurrent.atomic.AtomicInteger;
54

55
/**
56
 * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
57
 * dynamic load balancing policy and passively accepts the PartitionTable expansion request.
58
 */
59
public class LoadManager {
60

61
  private final IManager configManager;
62

63
  /** Balancers. */
64
  private final RegionBalancer regionBalancer;
65

66
  private final PartitionBalancer partitionBalancer;
67
  private final RouteBalancer routeBalancer;
68

69
  /** Cluster load services. */
70
  private final LoadCache loadCache;
71

72
  private final HeartbeatService heartbeatService;
73
  private final StatisticsService statisticsService;
74

75
  private final EventBus loadPublisher =
×
76
      new AsyncEventBus("Cluster-LoadPublisher-Thread", Executors.newFixedThreadPool(5));
×
77

78
  public LoadManager(IManager configManager) {
×
79
    this.configManager = configManager;
×
80

81
    this.regionBalancer = new RegionBalancer(configManager);
×
82
    this.partitionBalancer = new PartitionBalancer(configManager);
×
83
    this.routeBalancer = new RouteBalancer(configManager);
×
84

85
    this.loadCache = new LoadCache();
×
86
    this.heartbeatService = new HeartbeatService(configManager, loadCache);
×
87
    this.statisticsService =
×
88
        new StatisticsService(configManager, routeBalancer, loadCache, loadPublisher);
89

90
    loadPublisher.register(statisticsService);
×
91
    loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
×
92
  }
×
93

94
  /**
95
   * Generate an optimal CreateRegionGroupsPlan.
96
   *
97
   * @param allotmentMap Map<DatabaseName, Region allotment>
98
   * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
99
   * @return CreateRegionGroupsPlan
100
   * @throws NotEnoughDataNodeException If there are not enough DataNodes
101
   * @throws DatabaseNotExistsException If some specific StorageGroups don't exist
102
   */
103
  public CreateRegionGroupsPlan allocateRegionGroups(
104
      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
105
      throws NotEnoughDataNodeException, DatabaseNotExistsException {
106
    return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
×
107
  }
108

109
  /**
110
   * Allocate SchemaPartitions.
111
   *
112
   * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
113
   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
114
   */
115
  public Map<String, SchemaPartitionTable> allocateSchemaPartition(
116
      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
117
      throws NoAvailableRegionGroupException {
118
    return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
×
119
  }
120

121
  /**
122
   * Allocate DataPartitions.
123
   *
124
   * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
125
   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
126
   */
127
  public Map<String, DataPartitionTable> allocateDataPartition(
128
      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
129
      throws NoAvailableRegionGroupException {
130
    return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
×
131
  }
132

133
  /**
134
   * Re-balance the DataPartitionPolicyTable.
135
   *
136
   * @param database Database name
137
   */
138
  public void reBalanceDataPartitionPolicy(String database) {
139
    partitionBalancer.reBalanceDataPartitionPolicy(database);
×
140
  }
×
141

142
  public void broadcastLatestRegionRouteMap() {
143
    statisticsService.broadcastLatestRegionRouteMap();
×
144
  }
×
145

146
  public void startLoadServices() {
147
    loadCache.initHeartbeatCache(configManager);
×
148
    heartbeatService.startHeartbeatService();
×
149
    statisticsService.startLoadStatisticsService();
×
150
    partitionBalancer.setupPartitionBalancer();
×
151
  }
×
152

153
  public void stopLoadServices() {
154
    heartbeatService.stopHeartbeatService();
×
155
    statisticsService.stopLoadStatisticsService();
×
156
    loadCache.clearHeartbeatCache();
×
157
    partitionBalancer.clearPartitionBalancer();
×
158
  }
×
159

160
  public void clearDataPartitionPolicyTable(String database) {
161
    partitionBalancer.clearDataPartitionPolicyTable(database);
×
162
  }
×
163

164
  /**
165
   * Safely get NodeStatus by NodeId.
166
   *
167
   * @param nodeId The specified NodeId
168
   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
169
   */
170
  public NodeStatus getNodeStatus(int nodeId) {
171
    return loadCache.getNodeStatus(nodeId);
×
172
  }
173

174
  /**
175
   * Safely get the specified Node's current status with reason.
176
   *
177
   * @param nodeId The specified NodeId
178
   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
179
   */
180
  public String getNodeStatusWithReason(int nodeId) {
181
    return loadCache.getNodeStatusWithReason(nodeId);
×
182
  }
183

184
  /**
185
   * Get all Node's current status with reason.
186
   *
187
   * @return Map<NodeId, NodeStatus with reason>
188
   */
189
  public Map<Integer, String> getNodeStatusWithReason() {
190
    return loadCache.getNodeStatusWithReason();
×
191
  }
192

193
  /**
194
   * Filter ConfigNodes through the specified NodeStatus.
195
   *
196
   * @param status The specified NodeStatus
197
   * @return Filtered ConfigNodes with the specified NodeStatus
198
   */
199
  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
200
    return loadCache.filterConfigNodeThroughStatus(status);
×
201
  }
202

203
  /**
204
   * Filter DataNodes through the specified NodeStatus.
205
   *
206
   * @param status The specified NodeStatus
207
   * @return Filtered DataNodes with the specified NodeStatus
208
   */
209
  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
210
    return loadCache.filterDataNodeThroughStatus(status);
×
211
  }
212

213
  /**
214
   * Get the free disk space of the specified DataNode.
215
   *
216
   * @param dataNodeId The index of the specified DataNode
217
   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
218
   */
219
  public double getFreeDiskSpace(int dataNodeId) {
220
    return loadCache.getFreeDiskSpace(dataNodeId);
×
221
  }
222

223
  /**
224
   * Get the loadScore of each DataNode.
225
   *
226
   * @return Map<DataNodeId, loadScore>
227
   */
228
  public Map<Integer, Long> getAllDataNodeLoadScores() {
229
    return loadCache.getAllDataNodeLoadScores();
×
230
  }
231

232
  /**
233
   * Get the lowest loadScore DataNode.
234
   *
235
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
236
   */
237
  public int getLowestLoadDataNode() {
238
    return loadCache.getLowestLoadDataNode();
×
239
  }
240

241
  /**
242
   * Get the lowest loadScore DataNode from the specified DataNodes.
243
   *
244
   * @param dataNodeIds The specified DataNodes
245
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
246
   */
247
  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
248
    return loadCache.getLowestLoadDataNode(dataNodeIds);
×
249
  }
250

251
  /**
252
   * Force update the specified Node's cache.
253
   *
254
   * @param nodeType Specified NodeType
255
   * @param nodeId Specified NodeId
256
   * @param heartbeatSample Specified NodeHeartbeatSample
257
   */
258
  public void forceUpdateNodeCache(
259
      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
260
    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
×
261
  }
×
262

263
  /** Remove the specified Node's cache. */
264
  public void removeNodeCache(int nodeId) {
265
    loadCache.removeNodeCache(nodeId);
×
266
  }
×
267

268
  /**
269
   * Safely get RegionStatus.
270
   *
271
   * @param consensusGroupId Specified RegionGroupId
272
   * @param dataNodeId Specified RegionReplicaId
273
   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
274
   */
275
  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
276
    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
×
277
  }
278

279
  /**
280
   * Safely get RegionGroupStatus.
281
   *
282
   * @param consensusGroupId Specified RegionGroupId
283
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
284
   */
285
  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
286
    return loadCache.getRegionGroupStatus(consensusGroupId);
×
287
  }
288

289
  /**
290
   * Safely get RegionGroupStatus.
291
   *
292
   * @param consensusGroupIds Specified RegionGroupIds
293
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
294
   */
295
  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
296
      List<TConsensusGroupId> consensusGroupIds) {
297
    return loadCache.getRegionGroupStatus(consensusGroupIds);
×
298
  }
299

300
  /**
301
   * Filter the RegionGroups through the RegionGroupStatus.
302
   *
303
   * @param status The specified RegionGroupStatus
304
   * @return Filtered RegionGroups with the specified RegionGroupStatus
305
   */
306
  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
307
    return loadCache.filterRegionGroupThroughStatus(status);
×
308
  }
309

310
  /**
311
   * Count the number of cluster Regions with specified RegionStatus.
312
   *
313
   * @param type The specified RegionGroupType
314
   * @param status The specified statues
315
   * @return The number of cluster Regions with specified RegionStatus
316
   */
317
  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
318
    return loadCache.countRegionWithSpecifiedStatus(type, status);
×
319
  }
320

321
  /**
322
   * Force update the specified RegionGroup's cache.
323
   *
324
   * @param regionGroupId Specified RegionGroupId
325
   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
326
   */
327
  public void forceUpdateRegionGroupCache(
328
      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
329
    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
×
330
  }
×
331

332
  /** Remove the specified RegionGroup's cache. */
333
  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
334
    loadCache.removeRegionGroupCache(consensusGroupId);
×
335
  }
×
336

337
  /**
338
   * Get the latest RegionLeaderMap.
339
   *
340
   * @return Map<RegionGroupId, leaderId>
341
   */
342
  public Map<TConsensusGroupId, Integer> getRegionLeaderMap() {
343
    return loadCache.getRegionLeaderMap();
×
344
  }
345

346
  /**
347
   * Get the latest RegionPriorityMap.
348
   *
349
   * @return Map<RegionGroupId, RegionPriority>.
350
   */
351
  public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
352
    return loadCache.getRegionPriorityMap();
×
353
  }
354

355
  /**
356
   * Get the number of RegionGroup-leaders in the specified DataNode.
357
   *
358
   * @param dataNodeId The specified DataNode
359
   * @param type SchemaRegion or DataRegion
360
   * @return The number of RegionGroup-leaders
361
   */
362
  public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) {
363
    AtomicInteger result = new AtomicInteger(0);
×
364
    getRegionLeaderMap()
×
365
        .forEach(
×
366
            ((consensusGroupId, leaderId) -> {
367
              if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) {
×
368
                result.getAndIncrement();
×
369
              }
370
            }));
×
371
    return result.get();
×
372
  }
373

374
  /**
375
   * Wait for the specified RegionGroups to finish leader election
376
   *
377
   * @param regionGroupIds Specified RegionGroupIds
378
   */
379
  public void waitForLeaderElection(List<TConsensusGroupId> regionGroupIds) {
380
    loadCache.waitForLeaderElection(regionGroupIds);
×
381
  }
×
382

383
  /**
384
   * Force update the specified RegionGroup's leader.
385
   *
386
   * @param regionGroupId Specified RegionGroupId
387
   * @param leaderId Leader DataNodeId
388
   */
389
  public void forceUpdateRegionLeader(TConsensusGroupId regionGroupId, int leaderId) {
390
    loadCache.forceUpdateRegionLeader(regionGroupId, leaderId);
×
391
  }
×
392

393
  /**
394
   * Force update the specified RegionGroup's priority.
395
   *
396
   * @param regionGroupId Specified RegionGroupId
397
   * @param regionPriority Region route priority
398
   */
399
  public void forceUpdateRegionPriority(
400
      TConsensusGroupId regionGroupId, TRegionReplicaSet regionPriority) {
401
    loadCache.forceUpdateRegionPriority(regionGroupId, regionPriority);
×
402
  }
×
403

404
  /**
405
   * Remove the specified RegionGroup's route cache.
406
   *
407
   * @param regionGroupId Specified RegionGroupId
408
   */
409
  public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
410
    loadCache.removeRegionRouteCache(regionGroupId);
×
411
  }
×
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc