• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
25
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
26
import org.apache.iotdb.common.rpc.thrift.TSpaceQuota;
27
import org.apache.iotdb.common.rpc.thrift.TThrottleQuota;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.confignode.client.DataNodeRequestType;
30
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
31
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
32
import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
33
import org.apache.iotdb.confignode.consensus.request.write.quota.SetThrottleQuotaPlan;
34
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
35
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
36
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
37
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
38
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
39
import org.apache.iotdb.consensus.exception.ConsensusException;
40
import org.apache.iotdb.rpc.RpcUtils;
41
import org.apache.iotdb.rpc.TSStatusCode;
42

43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
import java.util.ArrayList;
47
import java.util.HashMap;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.concurrent.ConcurrentHashMap;
51
import java.util.concurrent.atomic.AtomicLong;
52

53
public class ClusterQuotaManager {
54

55
  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterQuotaManager.class);
×
56

57
  private final IManager configManager;
58
  private final QuotaInfo quotaInfo;
59
  private final Map<Integer, Long> deviceNum;
60
  private final Map<Integer, Long> timeSeriesNum;
61
  private final Map<String, List<Integer>> schemaRegionIdMap;
62
  private final Map<String, List<Integer>> dataRegionIdMap;
63
  private final Map<Integer, Long> regionDisk;
64

65
  public ClusterQuotaManager(IManager configManager, QuotaInfo quotaInfo) {
×
66
    this.configManager = configManager;
×
67
    this.quotaInfo = quotaInfo;
×
68
    deviceNum = new ConcurrentHashMap<>();
×
69
    timeSeriesNum = new ConcurrentHashMap<>();
×
70
    schemaRegionIdMap = new HashMap<>();
×
71
    dataRegionIdMap = new HashMap<>();
×
72
    regionDisk = new ConcurrentHashMap<>();
×
73
  }
×
74

75
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
76
    if (!checkSpaceQuota(req)) {
×
77
      return RpcUtils.getStatus(
×
78
          TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
×
79
          "The used quota exceeds the preset quota. Please set a larger value.");
80
    }
81
    // TODO: Datanode failed to receive rpc
82
    try {
83
      TSStatus response =
×
84
          configManager
85
              .getConsensusManager()
×
86
              .write(new SetSpaceQuotaPlan(req.getDatabase(), req.getSpaceLimit()));
×
87
      if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
88
        Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
89
            configManager.getNodeManager().getRegisteredDataNodeLocations();
×
90
        AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler =
×
91
            new AsyncClientHandler<>(DataNodeRequestType.SET_SPACE_QUOTA, req, dataNodeLocationMap);
92
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
93
        return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
94
      }
95
      return response;
×
96
    } catch (ConsensusException e) {
×
97
      LOGGER.warn(
×
98
          String.format(
×
99
              "Unexpected error happened while setting space quota on database: %s ",
100
              req.getDatabase()),
×
101
          e);
102
      // consensus layer related errors
103
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
104
      res.setMessage(e.getMessage());
×
105
      return res;
×
106
    }
107
  }
108

109
  /** If the new quota is smaller than the quota already used, the setting fails. */
110
  private boolean checkSpaceQuota(TSetSpaceQuotaReq req) {
111
    for (String database : req.getDatabase()) {
×
112
      if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
×
113
        TSpaceQuota spaceQuota = quotaInfo.getSpaceQuotaUsage().get(database);
×
114
        if (req.getSpaceLimit().getDeviceNum() != IoTDBConstant.UNLIMITED_VALUE
×
115
            && req.getSpaceLimit().getDeviceNum() != IoTDBConstant.DEFAULT_VALUE
×
116
            && spaceQuota.getDeviceNum() > req.getSpaceLimit().getDeviceNum()) {
×
117
          return false;
×
118
        }
119
        if (req.getSpaceLimit().getTimeserieNum() != IoTDBConstant.UNLIMITED_VALUE
×
120
            && req.getSpaceLimit().getTimeserieNum() != IoTDBConstant.DEFAULT_VALUE
×
121
            && spaceQuota.getTimeserieNum() > req.getSpaceLimit().getTimeserieNum()) {
×
122
          return false;
×
123
        }
124
        if (req.getSpaceLimit().getDiskSize() != IoTDBConstant.UNLIMITED_VALUE
×
125
            && req.getSpaceLimit().getDiskSize() != IoTDBConstant.DEFAULT_VALUE
×
126
            && spaceQuota.getDiskSize() > req.getSpaceLimit().getDiskSize()) {
×
127
          return false;
×
128
        }
129
      }
130
    }
×
131
    return true;
×
132
  }
133

134
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
135
    TSpaceQuotaResp showSpaceQuotaResp = new TSpaceQuotaResp();
×
136
    if (databases.isEmpty()) {
×
137
      showSpaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
×
138
      showSpaceQuotaResp.setSpaceQuotaUsage(quotaInfo.getSpaceQuotaUsage());
×
139
    } else if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
×
140
      Map<String, TSpaceQuota> spaceQuotaMap = new HashMap<>();
×
141
      Map<String, TSpaceQuota> spaceQuotaUsageMap = new HashMap<>();
×
142
      for (String database : databases) {
×
143
        if (quotaInfo.getSpaceQuotaLimit().containsKey(database)) {
×
144
          spaceQuotaMap.put(database, quotaInfo.getSpaceQuotaLimit().get(database));
×
145
          spaceQuotaUsageMap.put(database, quotaInfo.getSpaceQuotaUsage().get(database));
×
146
        }
147
      }
×
148
      showSpaceQuotaResp.setSpaceQuota(spaceQuotaMap);
×
149
      showSpaceQuotaResp.setSpaceQuotaUsage(spaceQuotaUsageMap);
×
150
    }
151
    showSpaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
152
    return showSpaceQuotaResp;
×
153
  }
154

155
  public TSpaceQuotaResp getSpaceQuota() {
156
    TSpaceQuotaResp spaceQuotaResp = new TSpaceQuotaResp();
×
157
    if (!quotaInfo.getSpaceQuotaLimit().isEmpty()) {
×
158
      spaceQuotaResp.setSpaceQuota(quotaInfo.getSpaceQuotaLimit());
×
159
    }
160
    spaceQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
161
    return spaceQuotaResp;
×
162
  }
163

164
  public boolean hasSpaceQuotaLimit() {
165
    return quotaInfo.getSpaceQuotaLimit().keySet().isEmpty();
×
166
  }
167

168
  public List<Integer> getSchemaRegionIds() {
169
    List<Integer> schemaRegionIds = new ArrayList<>();
×
170
    getPartitionManager()
×
171
        .getSchemaRegionIds(
×
172
            new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()), schemaRegionIdMap);
×
173
    schemaRegionIdMap.values().forEach(schemaRegionIds::addAll);
×
174
    return schemaRegionIds;
×
175
  }
176

177
  public List<Integer> getDataRegionIds() {
178
    List<Integer> dataRegionIds = new ArrayList<>();
×
179
    getPartitionManager()
×
180
        .getDataRegionIds(
×
181
            new ArrayList<>(quotaInfo.getSpaceQuotaLimit().keySet()), dataRegionIdMap);
×
182
    dataRegionIdMap.values().forEach(dataRegionIds::addAll);
×
183
    return dataRegionIds;
×
184
  }
185

186
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
187
    try {
188
      TSStatus response =
×
189
          configManager
190
              .getConsensusManager()
×
191
              .write(new SetThrottleQuotaPlan(req.getUserName(), req.getThrottleQuota()));
×
192
      if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
193
        Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
194
            configManager.getNodeManager().getRegisteredDataNodeLocations();
×
195
        AsyncClientHandler<TSetThrottleQuotaReq, TSStatus> clientHandler =
×
196
            new AsyncClientHandler<>(
197
                DataNodeRequestType.SET_THROTTLE_QUOTA, req, dataNodeLocationMap);
198
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
199
        return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
200
      }
201
      return response;
×
202
    } catch (ConsensusException e) {
×
203
      LOGGER.warn(
×
204
          String.format(
×
205
              "Unexpected error happened while setting throttle quota on user: %s ",
206
              req.getUserName()),
×
207
          e);
208
      // consensus layer related errors
209
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
210
      res.setMessage(e.getMessage());
×
211
      return res;
×
212
    }
213
  }
214

215
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
216
    TThrottleQuotaResp throttleQuotaResp = new TThrottleQuotaResp();
×
217
    if (req.getUserName() == null) {
×
218
      throttleQuotaResp.setThrottleQuota(quotaInfo.getThrottleQuotaLimit());
×
219
    } else {
220
      Map<String, TThrottleQuota> throttleLimit = new HashMap<>();
×
221
      throttleLimit.put(
×
222
          req.getUserName(),
×
223
          quotaInfo.getThrottleQuotaLimit().get(req.getUserName()) == null
×
224
              ? new TThrottleQuota()
×
225
              : quotaInfo.getThrottleQuotaLimit().get(req.getUserName()));
×
226
      throttleQuotaResp.setThrottleQuota(throttleLimit);
×
227
    }
228
    throttleQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
229
    return throttleQuotaResp;
×
230
  }
231

232
  public TThrottleQuotaResp getThrottleQuota() {
233
    TThrottleQuotaResp throttleQuotaResp = new TThrottleQuotaResp();
×
234
    if (!quotaInfo.getThrottleQuotaLimit().isEmpty()) {
×
235
      throttleQuotaResp.setThrottleQuota(quotaInfo.getThrottleQuotaLimit());
×
236
    }
237
    throttleQuotaResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
238
    return throttleQuotaResp;
×
239
  }
240

241
  public Map<String, TSpaceQuota> getSpaceQuotaUsage() {
242
    return quotaInfo.getSpaceQuotaUsage();
×
243
  }
244

245
  public Map<Integer, Long> getDeviceNum() {
246
    return deviceNum;
×
247
  }
248

249
  public Map<Integer, Long> getTimeSeriesNum() {
250
    return timeSeriesNum;
×
251
  }
252

253
  public Map<Integer, Long> getRegionDisk() {
254
    return regionDisk;
×
255
  }
256

257
  public void updateSpaceQuotaUsage() {
258
    AtomicLong deviceCount = new AtomicLong();
×
259
    AtomicLong timeSeriesCount = new AtomicLong();
×
260
    for (Map.Entry<String, List<Integer>> entry : schemaRegionIdMap.entrySet()) {
×
261
      deviceCount.set(0);
×
262
      timeSeriesCount.set(0);
×
263
      entry
×
264
          .getValue()
×
265
          .forEach(
×
266
              schemaRegionId -> {
267
                if (deviceNum.containsKey(schemaRegionId)) {
×
268
                  deviceCount.addAndGet(deviceCount.get() + deviceNum.get(schemaRegionId));
×
269
                }
270
                if (timeSeriesNum.containsKey(schemaRegionId)) {
×
271
                  timeSeriesCount.addAndGet(
×
272
                      timeSeriesCount.get() + timeSeriesNum.get(schemaRegionId));
×
273
                }
274
              });
×
275
      quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDeviceNum(deviceCount.get());
×
276
      quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setTimeserieNum(timeSeriesCount.get());
×
277
    }
×
278
    AtomicLong regionDiskCount = new AtomicLong();
×
279
    for (Map.Entry<String, List<Integer>> entry : dataRegionIdMap.entrySet()) {
×
280
      regionDiskCount.set(0);
×
281
      entry
×
282
          .getValue()
×
283
          .forEach(
×
284
              dataRegionId -> {
285
                if (regionDisk.containsKey(dataRegionId)) {
×
286
                  regionDiskCount.addAndGet(regionDisk.get(dataRegionId));
×
287
                }
288
              });
×
289
      quotaInfo.getSpaceQuotaUsage().get(entry.getKey()).setDiskSize(regionDiskCount.get());
×
290
    }
×
291
  }
×
292

293
  private PartitionManager getPartitionManager() {
294
    return configManager.getPartitionManager();
×
295
  }
296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc