• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9817

pending completion
#9817

push

travis_ci

web-flow
[To rel/1.2] Enhance the event notification mechanism of StatisticsService (#10830)

14 of 14 new or added lines in 4 files covered. (100.0%)

79676 of 165756 relevant lines covered (48.07%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.load;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
26
import org.apache.iotdb.commons.cluster.NodeStatus;
27
import org.apache.iotdb.commons.cluster.NodeType;
28
import org.apache.iotdb.commons.cluster.RegionStatus;
29
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
30
import org.apache.iotdb.commons.concurrent.ThreadName;
31
import org.apache.iotdb.commons.partition.DataPartitionTable;
32
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
33
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
34
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
35
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
36
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
37
import org.apache.iotdb.confignode.manager.IManager;
38
import org.apache.iotdb.confignode.manager.load.balancer.PartitionBalancer;
39
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
40
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
41
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
42
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
43
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
44
import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
45
import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
46
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
47
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
48

49
import com.google.common.eventbus.AsyncEventBus;
50
import com.google.common.eventbus.EventBus;
51

52
import java.util.List;
53
import java.util.Map;
54
import java.util.concurrent.atomic.AtomicInteger;
55

56
/**
57
 * The LoadManager at ConfigNodeGroup-Leader is active. It proactively implements the cluster
58
 * dynamic load balancing policy and passively accepts the PartitionTable expansion request.
59
 */
60
public class LoadManager {
61

62
  private final IManager configManager;
63

64
  /** Balancers. */
65
  private final RegionBalancer regionBalancer;
66

67
  private final PartitionBalancer partitionBalancer;
68
  private final RouteBalancer routeBalancer;
69

70
  /** Cluster load services. */
71
  private final LoadCache loadCache;
72

73
  private final HeartbeatService heartbeatService;
74
  private final StatisticsService statisticsService;
75

76
  private final EventBus loadPublisher =
×
77
      new AsyncEventBus(
78
          ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName(),
×
79
          IoTDBThreadPoolFactory.newFixedThreadPool(
×
80
              5, ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName()));
×
81

82
  public LoadManager(IManager configManager) {
×
83
    this.configManager = configManager;
×
84

85
    this.regionBalancer = new RegionBalancer(configManager);
×
86
    this.partitionBalancer = new PartitionBalancer(configManager);
×
87
    this.routeBalancer = new RouteBalancer(configManager);
×
88

89
    this.loadCache = new LoadCache();
×
90
    this.heartbeatService = new HeartbeatService(configManager, loadCache);
×
91
    this.statisticsService =
×
92
        new StatisticsService(configManager, routeBalancer, loadCache, loadPublisher);
93

94
    loadPublisher.register(statisticsService);
×
95
    loadPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
×
96
  }
×
97

98
  /**
99
   * Generate an optimal CreateRegionGroupsPlan.
100
   *
101
   * @param allotmentMap Map<DatabaseName, Region allotment>
102
   * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated
103
   * @return CreateRegionGroupsPlan
104
   * @throws NotEnoughDataNodeException If there are not enough DataNodes
105
   * @throws DatabaseNotExistsException If some specific StorageGroups don't exist
106
   */
107
  public CreateRegionGroupsPlan allocateRegionGroups(
108
      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
109
      throws NotEnoughDataNodeException, DatabaseNotExistsException {
110
    return regionBalancer.genRegionGroupsAllocationPlan(allotmentMap, consensusGroupType);
×
111
  }
112

113
  /**
114
   * Allocate SchemaPartitions.
115
   *
116
   * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned
117
   * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result
118
   */
119
  public Map<String, SchemaPartitionTable> allocateSchemaPartition(
120
      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
121
      throws NoAvailableRegionGroupException {
122
    return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
×
123
  }
124

125
  /**
126
   * Allocate DataPartitions.
127
   *
128
   * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned
129
   * @return Map<DatabaseName, DataPartitionTable>, the allocating result
130
   */
131
  public Map<String, DataPartitionTable> allocateDataPartition(
132
      Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap)
133
      throws NoAvailableRegionGroupException {
134
    return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
×
135
  }
136

137
  /**
138
   * Re-balance the DataPartitionPolicyTable.
139
   *
140
   * @param database Database name
141
   */
142
  public void reBalanceDataPartitionPolicy(String database) {
143
    partitionBalancer.reBalanceDataPartitionPolicy(database);
×
144
  }
×
145

146
  public void broadcastLatestRegionRouteMap() {
147
    statisticsService.broadcastLatestRegionRouteMap();
×
148
  }
×
149

150
  public void startLoadServices() {
151
    loadCache.initHeartbeatCache(configManager);
×
152
    heartbeatService.startHeartbeatService();
×
153
    statisticsService.startLoadStatisticsService();
×
154
    partitionBalancer.setupPartitionBalancer();
×
155
  }
×
156

157
  public void stopLoadServices() {
158
    heartbeatService.stopHeartbeatService();
×
159
    statisticsService.stopLoadStatisticsService();
×
160
    loadCache.clearHeartbeatCache();
×
161
    partitionBalancer.clearPartitionBalancer();
×
162
  }
×
163

164
  public void clearDataPartitionPolicyTable(String database) {
165
    partitionBalancer.clearDataPartitionPolicyTable(database);
×
166
  }
×
167

168
  /**
169
   * Safely get NodeStatus by NodeId.
170
   *
171
   * @param nodeId The specified NodeId
172
   * @return NodeStatus of the specified Node. Unknown if cache doesn't exist.
173
   */
174
  public NodeStatus getNodeStatus(int nodeId) {
175
    return loadCache.getNodeStatus(nodeId);
×
176
  }
177

178
  /**
179
   * Safely get the specified Node's current status with reason.
180
   *
181
   * @param nodeId The specified NodeId
182
   * @return The specified Node's current status if the nodeCache contains it, Unknown otherwise
183
   */
184
  public String getNodeStatusWithReason(int nodeId) {
185
    return loadCache.getNodeStatusWithReason(nodeId);
×
186
  }
187

188
  /**
189
   * Get all Node's current status with reason.
190
   *
191
   * @return Map<NodeId, NodeStatus with reason>
192
   */
193
  public Map<Integer, String> getNodeStatusWithReason() {
194
    return loadCache.getNodeStatusWithReason();
×
195
  }
196

197
  /**
198
   * Filter ConfigNodes through the specified NodeStatus.
199
   *
200
   * @param status The specified NodeStatus
201
   * @return Filtered ConfigNodes with the specified NodeStatus
202
   */
203
  public List<Integer> filterConfigNodeThroughStatus(NodeStatus... status) {
204
    return loadCache.filterConfigNodeThroughStatus(status);
×
205
  }
206

207
  /**
208
   * Filter DataNodes through the specified NodeStatus.
209
   *
210
   * @param status The specified NodeStatus
211
   * @return Filtered DataNodes with the specified NodeStatus
212
   */
213
  public List<Integer> filterDataNodeThroughStatus(NodeStatus... status) {
214
    return loadCache.filterDataNodeThroughStatus(status);
×
215
  }
216

217
  /**
218
   * Get the free disk space of the specified DataNode.
219
   *
220
   * @param dataNodeId The index of the specified DataNode
221
   * @return The free disk space that sample through heartbeat, 0 if no heartbeat received
222
   */
223
  public double getFreeDiskSpace(int dataNodeId) {
224
    return loadCache.getFreeDiskSpace(dataNodeId);
×
225
  }
226

227
  /**
228
   * Get the loadScore of each DataNode.
229
   *
230
   * @return Map<DataNodeId, loadScore>
231
   */
232
  public Map<Integer, Long> getAllDataNodeLoadScores() {
233
    return loadCache.getAllDataNodeLoadScores();
×
234
  }
235

236
  /**
237
   * Get the lowest loadScore DataNode.
238
   *
239
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
240
   */
241
  public int getLowestLoadDataNode() {
242
    return loadCache.getLowestLoadDataNode();
×
243
  }
244

245
  /**
246
   * Get the lowest loadScore DataNode from the specified DataNodes.
247
   *
248
   * @param dataNodeIds The specified DataNodes
249
   * @return The index of the lowest loadScore DataNode. -1 if no DataNode heartbeat received.
250
   */
251
  public int getLowestLoadDataNode(List<Integer> dataNodeIds) {
252
    return loadCache.getLowestLoadDataNode(dataNodeIds);
×
253
  }
254

255
  /**
256
   * Force update the specified Node's cache.
257
   *
258
   * @param nodeType Specified NodeType
259
   * @param nodeId Specified NodeId
260
   * @param heartbeatSample Specified NodeHeartbeatSample
261
   */
262
  public void forceUpdateNodeCache(
263
      NodeType nodeType, int nodeId, NodeHeartbeatSample heartbeatSample) {
264
    loadCache.forceUpdateNodeCache(nodeType, nodeId, heartbeatSample);
×
265
  }
×
266

267
  /** Remove the specified Node's cache. */
268
  public void removeNodeCache(int nodeId) {
269
    loadCache.removeNodeCache(nodeId);
×
270
  }
×
271

272
  /**
273
   * Safely get RegionStatus.
274
   *
275
   * @param consensusGroupId Specified RegionGroupId
276
   * @param dataNodeId Specified RegionReplicaId
277
   * @return Corresponding RegionStatus if cache exists, Unknown otherwise
278
   */
279
  public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
280
    return loadCache.getRegionStatus(consensusGroupId, dataNodeId);
×
281
  }
282

283
  /**
284
   * Safely get RegionGroupStatus.
285
   *
286
   * @param consensusGroupId Specified RegionGroupId
287
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
288
   */
289
  public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
290
    return loadCache.getRegionGroupStatus(consensusGroupId);
×
291
  }
292

293
  /**
294
   * Safely get RegionGroupStatus.
295
   *
296
   * @param consensusGroupIds Specified RegionGroupIds
297
   * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
298
   */
299
  public Map<TConsensusGroupId, RegionGroupStatus> getRegionGroupStatus(
300
      List<TConsensusGroupId> consensusGroupIds) {
301
    return loadCache.getRegionGroupStatus(consensusGroupIds);
×
302
  }
303

304
  /**
305
   * Filter the RegionGroups through the RegionGroupStatus.
306
   *
307
   * @param status The specified RegionGroupStatus
308
   * @return Filtered RegionGroups with the specified RegionGroupStatus
309
   */
310
  public List<TConsensusGroupId> filterRegionGroupThroughStatus(RegionGroupStatus... status) {
311
    return loadCache.filterRegionGroupThroughStatus(status);
×
312
  }
313

314
  /**
315
   * Count the number of cluster Regions with specified RegionStatus.
316
   *
317
   * @param type The specified RegionGroupType
318
   * @param status The specified statues
319
   * @return The number of cluster Regions with specified RegionStatus
320
   */
321
  public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus... status) {
322
    return loadCache.countRegionWithSpecifiedStatus(type, status);
×
323
  }
324

325
  /**
326
   * Force update the specified RegionGroup's cache.
327
   *
328
   * @param regionGroupId Specified RegionGroupId
329
   * @param heartbeatSampleMap Specified RegionHeartbeatSampleMap
330
   */
331
  public void forceUpdateRegionGroupCache(
332
      TConsensusGroupId regionGroupId, Map<Integer, RegionHeartbeatSample> heartbeatSampleMap) {
333
    loadCache.forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap);
×
334
  }
×
335

336
  /** Remove the specified RegionGroup's cache. */
337
  public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
338
    loadCache.removeRegionGroupCache(consensusGroupId);
×
339
  }
×
340

341
  /**
342
   * Get the latest RegionLeaderMap.
343
   *
344
   * @return Map<RegionGroupId, leaderId>
345
   */
346
  public Map<TConsensusGroupId, Integer> getRegionLeaderMap() {
347
    return loadCache.getRegionLeaderMap();
×
348
  }
349

350
  /**
351
   * Get the latest RegionPriorityMap.
352
   *
353
   * @return Map<RegionGroupId, RegionPriority>.
354
   */
355
  public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() {
356
    return loadCache.getRegionPriorityMap();
×
357
  }
358

359
  /**
360
   * Get the number of RegionGroup-leaders in the specified DataNode.
361
   *
362
   * @param dataNodeId The specified DataNode
363
   * @param type SchemaRegion or DataRegion
364
   * @return The number of RegionGroup-leaders
365
   */
366
  public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) {
367
    AtomicInteger result = new AtomicInteger(0);
×
368
    getRegionLeaderMap()
×
369
        .forEach(
×
370
            ((consensusGroupId, leaderId) -> {
371
              if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) {
×
372
                result.getAndIncrement();
×
373
              }
374
            }));
×
375
    return result.get();
×
376
  }
377

378
  /**
379
   * Wait for the specified RegionGroups to finish leader election
380
   *
381
   * @param regionGroupIds Specified RegionGroupIds
382
   */
383
  public void waitForLeaderElection(List<TConsensusGroupId> regionGroupIds) {
384
    loadCache.waitForLeaderElection(regionGroupIds);
×
385
  }
×
386

387
  /**
388
   * Force update the specified RegionGroup's leader.
389
   *
390
   * @param regionGroupId Specified RegionGroupId
391
   * @param leaderId Leader DataNodeId
392
   */
393
  public void forceUpdateRegionLeader(TConsensusGroupId regionGroupId, int leaderId) {
394
    loadCache.forceUpdateRegionLeader(regionGroupId, leaderId);
×
395
  }
×
396

397
  /**
398
   * Force update the specified RegionGroup's priority.
399
   *
400
   * @param regionGroupId Specified RegionGroupId
401
   * @param regionPriority Region route priority
402
   */
403
  public void forceUpdateRegionPriority(
404
      TConsensusGroupId regionGroupId, TRegionReplicaSet regionPriority) {
405
    loadCache.forceUpdateRegionPriority(regionGroupId, regionPriority);
×
406
  }
×
407

408
  /**
409
   * Remove the specified RegionGroup's route cache.
410
   *
411
   * @param regionGroupId Specified RegionGroupId
412
   */
413
  public void removeRegionRouteCache(TConsensusGroupId regionGroupId) {
414
    loadCache.removeRegionRouteCache(regionGroupId);
×
415
  }
×
416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc