• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.db.service;
20

21
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
22
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
25
import org.apache.iotdb.common.rpc.thrift.TSStatus;
26
import org.apache.iotdb.commons.client.exception.ClientManagerException;
27
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
28
import org.apache.iotdb.commons.concurrent.ThreadName;
29
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
30
import org.apache.iotdb.commons.consensus.DataRegionId;
31
import org.apache.iotdb.commons.consensus.SchemaRegionId;
32
import org.apache.iotdb.commons.exception.StartupException;
33
import org.apache.iotdb.commons.service.IService;
34
import org.apache.iotdb.commons.service.ServiceType;
35
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
36
import org.apache.iotdb.consensus.common.Peer;
37
import org.apache.iotdb.consensus.exception.ConsensusException;
38
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
39
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
40
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
41
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
42
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
43
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
44
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
45
import org.apache.iotdb.db.schemaengine.SchemaEngine;
46
import org.apache.iotdb.db.storageengine.StorageEngine;
47
import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
48
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
49
import org.apache.iotdb.rpc.TSStatusCode;
50

51
import org.apache.thrift.TException;
52
import org.slf4j.Logger;
53
import org.slf4j.LoggerFactory;
54

55
import java.util.HashMap;
56
import java.util.Map;
57

58
public class RegionMigrateService implements IService {
59

60
  private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
×
61

62
  public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
63

64
  private static final int MAX_RETRY_NUM = 5;
65

66
  private static final int SLEEP_MILLIS = 5000;
67

68
  private RegionMigratePool regionMigratePool;
69

70
  private RegionMigrateService() {}
71

72
  public static RegionMigrateService getInstance() {
73
    return Holder.INSTANCE;
×
74
  }
75

76
  /**
77
   * Submit AddRegionPeerTask
78
   *
79
   * @param req TMaintainPeerReq
80
   * @return if the submit task succeed
81
   */
82
  public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
83

84
    boolean submitSucceed = true;
×
85
    try {
86
      regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode()));
×
87
    } catch (Exception e) {
×
88
      LOGGER.error(
×
89
          "{}, Submit AddRegionPeerTask error for Region: {}",
90
          REGION_MIGRATE_PROCESS,
91
          req.getRegionId(),
×
92
          e);
93
      submitSucceed = false;
×
94
    }
×
95
    return submitSucceed;
×
96
  }
97

98
  /**
99
   * Submit RemoveRegionPeerTask
100
   *
101
   * @param req TMaintainPeerReq
102
   * @return if the submit task succeed
103
   */
104
  public synchronized boolean submitRemoveRegionPeerTask(TMaintainPeerReq req) {
105

106
    boolean submitSucceed = true;
×
107
    try {
108
      regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode()));
×
109
    } catch (Exception e) {
×
110
      LOGGER.error(
×
111
          "{}, Submit RemoveRegionPeer task error for Region: {}",
112
          REGION_MIGRATE_PROCESS,
113
          req.getRegionId(),
×
114
          e);
115
      submitSucceed = false;
×
116
    }
×
117
    return submitSucceed;
×
118
  }
119

120
  /**
121
   * Submit DeleteOldRegionPeerTask
122
   *
123
   * @param req TMigrateRegionReq
124
   * @return if the submit task succeed
125
   */
126
  public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
127

128
    boolean submitSucceed = true;
×
129
    try {
130
      regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode()));
×
131
    } catch (Exception e) {
×
132
      LOGGER.error(
×
133
          "{}, Submit DeleteOldRegionPeerTask error for Region: {}",
134
          REGION_MIGRATE_PROCESS,
135
          req.getRegionId(),
×
136
          e);
137
      submitSucceed = false;
×
138
    }
×
139
    return submitSucceed;
×
140
  }
141

142
  @Override
143
  public void start() throws StartupException {
144
    regionMigratePool = new RegionMigratePool();
×
145
    regionMigratePool.start();
×
146
    LOGGER.info("Region migrate service start");
×
147
  }
×
148

149
  @Override
150
  public void stop() {
151
    if (regionMigratePool != null) {
×
152
      regionMigratePool.stop();
×
153
    }
154
    LOGGER.info("Region migrate service stop");
×
155
  }
×
156

157
  @Override
158
  public ServiceType getID() {
159
    return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
×
160
  }
161

162
  private static class RegionMigratePool extends AbstractPoolManager {
163

164
    private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
×
165

166
    private RegionMigratePool() {
×
167
      this.pool =
×
168
          IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.REGION_MIGRATE.getName());
×
169
    }
×
170

171
    @Override
172
    public Logger getLogger() {
173
      return poolLogger;
×
174
    }
175

176
    @Override
177
    public void start() {
178
      if (this.pool != null) {
×
179
        poolLogger.info("DataNode region migrate pool start");
×
180
      }
181
    }
×
182

183
    @Override
184
    public String getName() {
185
      return "migrate region";
×
186
    }
187
  }
188

189
  private static class AddRegionPeerTask implements Runnable {
190

191
    private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
×
192

193
    // The RegionGroup that shall perform the add peer process
194
    private final TConsensusGroupId tRegionId;
195

196
    // The new DataNode to be added in the RegionGroup
197
    private final TDataNodeLocation destDataNode;
198

199
    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
×
200
      this.tRegionId = tRegionId;
×
201
      this.destDataNode = destDataNode;
×
202
    }
×
203

204
    @Override
205
    public void run() {
206
      TSStatus runResult = addPeer();
×
207
      if (isFailed(runResult)) {
×
208
        reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
×
209
        return;
×
210
      }
211

212
      reportSucceed(tRegionId, "AddPeer");
×
213
    }
×
214

215
    private TSStatus addPeer() {
216
      taskLogger.info(
×
217
          "{}, Start to addPeer {} for region {}", REGION_MIGRATE_PROCESS, destDataNode, tRegionId);
218
      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
×
219
      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
220
      TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
×
221
      boolean addPeerSucceed = true;
×
222
      Throwable throwable = null;
×
223
      for (int i = 0; i < MAX_RETRY_NUM; i++) {
×
224
        try {
225
          if (!addPeerSucceed) {
×
226
            Thread.sleep(SLEEP_MILLIS);
×
227
          }
228
          addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
×
229
          addPeerSucceed = true;
×
230
        } catch (PeerAlreadyInConsensusGroupException e) {
×
231
          addPeerSucceed = true;
×
232
        } catch (InterruptedException e) {
×
233
          throwable = e;
×
234
          Thread.currentThread().interrupt();
×
235
        } catch (ConsensusException e) {
×
236
          addPeerSucceed = false;
×
237
          throwable = e;
×
238
          taskLogger.error(
×
239
              "{}, executed addPeer {} for region {} error, retry times: {}",
240
              REGION_MIGRATE_PROCESS,
241
              destEndpoint,
242
              regionId,
243
              i,
×
244
              e);
245
        }
×
246
        if (addPeerSucceed || throwable instanceof InterruptedException) {
×
247
          break;
×
248
        }
249
      }
250

251
      if (!addPeerSucceed) {
×
252
        String errorMsg =
×
253
            String.format(
×
254
                "%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s",
255
                REGION_MIGRATE_PROCESS, destEndpoint, regionId);
256
        taskLogger.error(errorMsg, throwable);
×
257
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
258
        status.setMessage(errorMsg);
×
259
        return status;
×
260
      }
261

262
      taskLogger.info(
×
263
          "{}, Succeed to addPeer {} for region {}",
264
          REGION_MIGRATE_PROCESS,
265
          destEndpoint,
266
          regionId);
267
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
268
      status.setMessage("addPeer " + destEndpoint + " for region " + regionId + " succeed");
×
269
      return status;
×
270
    }
271

272
    private void addRegionPeer(ConsensusGroupId regionId, Peer newPeer) throws ConsensusException {
273
      if (regionId instanceof DataRegionId) {
×
274
        DataRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer);
×
275
      } else {
276
        SchemaRegionConsensusImpl.getInstance().addRemotePeer(regionId, newPeer);
×
277
      }
278
    }
×
279
  }
280

281
  private static class RemoveRegionPeerTask implements Runnable {
282

283
    private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
×
284

285
    private final TConsensusGroupId tRegionId;
286

287
    private final TDataNodeLocation destDataNode;
288

289
    public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
×
290
      this.tRegionId = tRegionId;
×
291
      this.destDataNode = destDataNode;
×
292
    }
×
293

294
    @Override
295
    public void run() {
296
      TSStatus runResult = removePeer();
×
297
      if (isSucceed(runResult)) {
×
298
        reportSucceed(tRegionId, "RemovePeer");
×
299
      } else {
300
        reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
×
301
      }
302
    }
×
303

304
    private TSStatus removePeer() {
305
      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
×
306
      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
307
      TEndPoint destEndPoint = getConsensusEndPoint(destDataNode, regionId);
×
308
      taskLogger.info(
×
309
          "{}, Start to removePeer {} for region {}",
310
          REGION_MIGRATE_PROCESS,
311
          destEndPoint,
312
          regionId);
313
      Throwable throwable = null;
×
314
      boolean removePeerSucceed = true;
×
315
      for (int i = 0; i < MAX_RETRY_NUM; i++) {
×
316
        try {
317
          if (!removePeerSucceed) {
×
318
            Thread.sleep(SLEEP_MILLIS);
×
319
          }
320
          removeRegionPeer(
×
321
              regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndPoint));
×
322
          removePeerSucceed = true;
×
323
        } catch (PeerNotInConsensusGroupException e) {
×
324
          removePeerSucceed = true;
×
325
        } catch (InterruptedException e) {
×
326
          throwable = e;
×
327
          Thread.currentThread().interrupt();
×
328
        } catch (ConsensusException e) {
×
329
          removePeerSucceed = false;
×
330
          throwable = e;
×
331
          taskLogger.error(
×
332
              "{}, executed removePeer {} for region {} error, retry times: {}",
333
              REGION_MIGRATE_PROCESS,
334
              destEndPoint,
335
              regionId,
336
              i,
×
337
              e);
338
        }
×
339
        if (removePeerSucceed || throwable instanceof InterruptedException) {
×
340
          break;
×
341
        }
342
      }
343

344
      if (!removePeerSucceed) {
×
345
        String errorMsg =
×
346
            String.format(
×
347
                "%s, RemovePeer for region error after max retry times, peerId: %s, regionId: %s",
348
                REGION_MIGRATE_PROCESS, destEndPoint, regionId);
349
        taskLogger.error(errorMsg, throwable);
×
350
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
351
        status.setMessage(errorMsg);
×
352
        return status;
×
353
      }
354

355
      taskLogger.info(
×
356
          "{}, Succeed to removePeer {} for region {}",
357
          REGION_MIGRATE_PROCESS,
358
          destEndPoint,
359
          regionId);
360
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
361
      status.setMessage("removePeer " + destEndPoint + " for region " + regionId + " succeed");
×
362
      return status;
×
363
    }
364

365
    private void removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer)
366
        throws ConsensusException {
367
      if (regionId instanceof DataRegionId) {
×
368
        DataRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
×
369
      } else {
370
        SchemaRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
×
371
      }
372
    }
×
373
  }
374

375
  private static class DeleteOldRegionPeerTask implements Runnable {
376

377
    private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
×
378

379
    private final TConsensusGroupId tRegionId;
380

381
    private final TDataNodeLocation originalDataNode;
382

383
    public DeleteOldRegionPeerTask(
384
        TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
×
385
      this.tRegionId = tRegionId;
×
386
      this.originalDataNode = originalDataNode;
×
387
    }
×
388

389
    @Override
390
    public void run() {
391
      // deletePeer: remove the peer from the consensus group
392
      TSStatus runResult = deletePeer();
×
393
      if (isFailed(runResult)) {
×
394
        reportFailed(
×
395
            tRegionId,
396
            originalDataNode,
397
            TRegionMigrateFailedType.RemoveConsensusGroupFailed,
398
            runResult);
399
      }
400

401
      // deleteRegion: delete region data
402
      runResult = deleteRegion();
×
403
      if (isFailed(runResult)) {
×
404
        reportFailed(
×
405
            tRegionId, originalDataNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
406
      }
407

408
      reportSucceed(tRegionId, "DeletePeer");
×
409
    }
×
410

411
    private TSStatus deletePeer() {
412
      taskLogger.info(
×
413
          "{}, Start to deletePeer {} for region {}",
414
          REGION_MIGRATE_PROCESS,
415
          originalDataNode,
416
          tRegionId);
417
      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
×
418
      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
419
      try {
420
        if (regionId instanceof DataRegionId) {
×
421
          DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
×
422
        } else {
423
          SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
×
424
        }
425
      } catch (ConsensusException e) {
×
426
        String errorMsg =
×
427
            String.format(
×
428
                "deletePeer error, regionId: %s, errorMessage: %s", regionId, e.getMessage());
×
429
        taskLogger.error(errorMsg);
×
430
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
431
        status.setMessage(errorMsg);
×
432
        return status;
×
433
      } catch (Exception e) {
×
434
        taskLogger.error("{}, deletePeer error, regionId: {}", REGION_MIGRATE_PROCESS, regionId, e);
×
435
        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
436
        status.setMessage(
×
437
            "deletePeer for region: " + regionId + " error. exception: " + e.getMessage());
×
438
        return status;
×
439
      }
×
440
      taskLogger.info(
×
441
          "{}, Succeed to deletePeer {} from consensus group", REGION_MIGRATE_PROCESS, regionId);
442
      status.setMessage("deletePeer from consensus group " + regionId + "succeed");
×
443
      return status;
×
444
    }
445

446
    private TSStatus deleteRegion() {
447
      taskLogger.info(
×
448
          "{}, Start to deleteRegion {} for datanode {}",
449
          REGION_MIGRATE_PROCESS,
450
          tRegionId,
451
          originalDataNode);
452
      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
453
      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
×
454
      try {
455
        if (regionId instanceof DataRegionId) {
×
456
          StorageEngine.getInstance().deleteDataRegion((DataRegionId) regionId);
×
457
        } else {
458
          SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
×
459
        }
460
      } catch (Exception e) {
×
461
        taskLogger.error("{}, deleteRegion {} error", REGION_MIGRATE_PROCESS, regionId, e);
×
462
        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
×
463
        status.setMessage("deleteRegion " + regionId + " error, " + e.getMessage());
×
464
        return status;
×
465
      }
×
466
      status.setMessage("deleteRegion " + regionId + " succeed");
×
467
      taskLogger.info("{}, Succeed to deleteRegion {}", REGION_MIGRATE_PROCESS, regionId);
×
468
      return status;
×
469
    }
470
  }
471

472
  private static class Holder {
473

474
    private static final RegionMigrateService INSTANCE = new RegionMigrateService();
×
475

476
    private Holder() {}
477
  }
478

479
  private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) {
480
    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
481
    status.setMessage(
×
482
        String.format("Region: %s, state: %s, executed succeed", tRegionId, migrateState));
×
483
    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
×
484
    try {
485
      reportRegionMigrateResultToConfigNode(req);
×
486
    } catch (Exception e) {
×
487
      LOGGER.error(
×
488
          "{}, Report region {} migrate result error in reportSucceed, result: {}",
489
          REGION_MIGRATE_PROCESS,
490
          tRegionId,
491
          req,
492
          e);
493
    }
×
494
  }
×
495

496
  private static void reportFailed(
497
      TConsensusGroupId tRegionId,
498
      TDataNodeLocation failedNode,
499
      TRegionMigrateFailedType failedType,
500
      TSStatus status) {
501
    Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
×
502
    failedNodeAndReason.put(failedNode, failedType);
×
503
    TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
×
504
    req.setFailedNodeAndReason(failedNodeAndReason);
×
505
    try {
506
      reportRegionMigrateResultToConfigNode(req);
×
507
    } catch (Exception e) {
×
508
      LOGGER.error(
×
509
          "{}, Report region {} migrate error in reportFailed, result:{}",
510
          REGION_MIGRATE_PROCESS,
511
          tRegionId,
512
          req,
513
          e);
514
    }
×
515
  }
×
516

517
  private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
518
      throws TException, ClientManagerException {
519
    TSStatus status;
520
    try (ConfigNodeClient client =
521
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
522
      status = client.reportRegionMigrateResult(req);
×
523
      LOGGER.info(
×
524
          "{}, Report region {} migrate result {} to Config node succeed, result: {}",
525
          REGION_MIGRATE_PROCESS,
526
          req.getRegionId(),
×
527
          req,
528
          status);
529
    }
530
  }
×
531

532
  private static boolean isSucceed(TSStatus status) {
533
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
×
534
  }
535

536
  private static boolean isFailed(TSStatus status) {
537
    return !isSucceed(status);
×
538
  }
539

540
  private static TEndPoint getConsensusEndPoint(
541
      TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
542
    if (regionId instanceof DataRegionId) {
×
543
      return nodeLocation.getDataRegionConsensusEndPoint();
×
544
    }
545
    return nodeLocation.getSchemaRegionConsensusEndPoint();
×
546
  }
547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc