• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.23
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.persistence.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
26
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
27
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
28
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
30
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
31
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
32
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
33
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
34
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
35
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
36
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
37
import org.apache.iotdb.rpc.TSStatusCode;
38
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
39

40
import org.apache.thrift.TException;
41
import org.apache.thrift.protocol.TBinaryProtocol;
42
import org.apache.thrift.protocol.TProtocol;
43
import org.apache.thrift.transport.TIOStreamTransport;
44
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
46

47
import java.io.File;
48
import java.io.FileInputStream;
49
import java.io.FileOutputStream;
50
import java.io.IOException;
51
import java.io.InputStream;
52
import java.io.OutputStream;
53
import java.util.ArrayList;
54
import java.util.Collections;
55
import java.util.HashMap;
56
import java.util.List;
57
import java.util.Map;
58
import java.util.Map.Entry;
59
import java.util.Objects;
60
import java.util.UUID;
61
import java.util.concurrent.ConcurrentHashMap;
62
import java.util.concurrent.atomic.AtomicInteger;
63
import java.util.concurrent.locks.ReentrantReadWriteLock;
64

65
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
66

67
/**
68
 * The {@link NodeInfo} stores cluster node information.
69
 *
70
 * <p>The cluster node information includes:
71
 *
72
 * <p>1. DataNode information
73
 *
74
 * <p>2. ConfigNode information
75
 */
76
public class NodeInfo implements SnapshotProcessor {
77

78
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
1✔
79

80
  private static final int MINIMUM_DATANODE =
1✔
81
      Math.max(
1✔
82
          ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor(),
1✔
83
          ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor());
1✔
84

85
  // Registered ConfigNodes
86
  private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
87
  private final Map<Integer, TConfigNodeLocation> registeredConfigNodes;
88

89
  // Registered DataNodes
90
  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
91

92
  private final ReentrantReadWriteLock versionInfoReadWriteLock;
93

94
  private final AtomicInteger nextNodeId = new AtomicInteger(-1);
1✔
95
  private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
96

97
  private final Map<Integer, TNodeVersionInfo> nodeVersionInfo;
98
  private static final String SNAPSHOT_FILENAME = "node_info.bin";
99

100
  public NodeInfo() {
1✔
101
    this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
102
    this.registeredConfigNodes = new ConcurrentHashMap<>();
1✔
103

104
    this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
105
    this.registeredDataNodes = new ConcurrentHashMap<>();
1✔
106

107
    this.nodeVersionInfo = new ConcurrentHashMap<>();
1✔
108
    this.versionInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
109
  }
1✔
110

111
  /**
112
   * Persist DataNode info.
113
   *
114
   * @param registerDataNodePlan RegisterDataNodePlan
115
   * @return {@link TSStatusCode#SUCCESS_STATUS}
116
   */
117
  public TSStatus registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
118
    TSStatus result;
119
    TDataNodeConfiguration info = registerDataNodePlan.getDataNodeConfiguration();
1✔
120
    dataNodeInfoReadWriteLock.writeLock().lock();
1✔
121
    try {
122

123
      // To ensure that the nextNodeId is updated correctly when
124
      // the ConfigNode-followers concurrently processes RegisterDataNodePlan,
125
      // we need to add a synchronization lock here
126
      synchronized (nextNodeId) {
1✔
127
        if (nextNodeId.get() < info.getLocation().getDataNodeId()) {
1✔
128
          nextNodeId.set(info.getLocation().getDataNodeId());
1✔
129
        }
130
      }
1✔
131
      registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
1✔
132
      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
133
      if (nextNodeId.get() < MINIMUM_DATANODE) {
1✔
134
        result.setMessage(
×
135
            String.format(
×
136
                "To enable IoTDB-Cluster's data service, please register %d more IoTDB-DataNode",
137
                MINIMUM_DATANODE - nextNodeId.get()));
×
138
      } else if (nextNodeId.get() == MINIMUM_DATANODE) {
1✔
139
        result.setMessage("IoTDB-Cluster could provide data service, now enjoy yourself!");
×
140
      }
141
    } finally {
142
      dataNodeInfoReadWriteLock.writeLock().unlock();
1✔
143
    }
144
    return result;
1✔
145
  }
146

147
  /**
148
   * Persist Information about remove dataNode.
149
   *
150
   * @param req RemoveDataNodePlan
151
   * @return {@link TSStatus}
152
   */
153
  public TSStatus removeDataNode(RemoveDataNodePlan req) {
154
    LOGGER.info(
×
155
        "{}, There are {} data node in cluster before executed RemoveDataNodePlan",
156
        REMOVE_DATANODE_PROCESS,
157
        registeredDataNodes.size());
×
158

159
    dataNodeInfoReadWriteLock.writeLock().lock();
×
160
    versionInfoReadWriteLock.writeLock().lock();
×
161
    try {
162
      req.getDataNodeLocations()
×
163
          .forEach(
×
164
              removeDataNodes -> {
165
                registeredDataNodes.remove(removeDataNodes.getDataNodeId());
×
166
                nodeVersionInfo.remove(removeDataNodes.getDataNodeId());
×
167
                LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
×
168
              });
×
169
    } finally {
170
      versionInfoReadWriteLock.writeLock().unlock();
×
171
      dataNodeInfoReadWriteLock.writeLock().unlock();
×
172
    }
173
    LOGGER.info(
×
174
        "{}, There are {} data node in cluster after executed RemoveDataNodePlan",
175
        REMOVE_DATANODE_PROCESS,
176
        registeredDataNodes.size());
×
177
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
178
  }
179

180
  /**
181
   * Update the specified DataNode‘s location.
182
   *
183
   * @param updateDataNodePlan UpdateDataNodePlan
184
   * @return {@link TSStatusCode#SUCCESS_STATUS} if update DataNode info successfully.
185
   */
186
  public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
187
    dataNodeInfoReadWriteLock.writeLock().lock();
×
188
    try {
189
      TDataNodeConfiguration newConfiguration = updateDataNodePlan.getDataNodeConfiguration();
×
190
      registeredDataNodes.replace(newConfiguration.getLocation().getDataNodeId(), newConfiguration);
×
191
    } finally {
192
      dataNodeInfoReadWriteLock.writeLock().unlock();
×
193
    }
194
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
195
  }
196

197
  /**
198
   * Get DataNodeConfiguration.
199
   *
200
   * @param getDataNodeConfigurationPlan GetDataNodeConfigurationPlan
201
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
202
   *     GetDataNodeConfigurationPlan is -1
203
   */
204
  public DataNodeConfigurationResp getDataNodeConfiguration(
205
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
206
    DataNodeConfigurationResp result = new DataNodeConfigurationResp();
×
207
    result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
×
208

209
    int dataNodeId = getDataNodeConfigurationPlan.getDataNodeId();
×
210
    dataNodeInfoReadWriteLock.readLock().lock();
×
211
    try {
212
      if (dataNodeId == -1) {
×
213
        result.setDataNodeConfigurationMap(new HashMap<>(registeredDataNodes));
×
214
      } else {
215
        result.setDataNodeConfigurationMap(
×
216
            registeredDataNodes.get(dataNodeId) == null
×
217
                ? new HashMap<>(0)
×
218
                : Collections.singletonMap(dataNodeId, registeredDataNodes.get(dataNodeId)));
×
219
      }
220
    } finally {
221
      dataNodeInfoReadWriteLock.readLock().unlock();
×
222
    }
223

224
    return result;
×
225
  }
226

227
  /** Return the number of registered DataNodes. */
228
  public int getRegisteredDataNodeCount() {
229
    int result;
230
    dataNodeInfoReadWriteLock.readLock().lock();
×
231
    try {
232
      result = registeredDataNodes.size();
×
233
    } finally {
234
      dataNodeInfoReadWriteLock.readLock().unlock();
×
235
    }
236
    return result;
×
237
  }
238

239
  /** Return the number of total cpu cores in online DataNodes. */
240
  public int getTotalCpuCoreCount() {
241
    int result = 0;
×
242
    dataNodeInfoReadWriteLock.readLock().lock();
×
243
    try {
244
      for (TDataNodeConfiguration dataNodeConfiguration : registeredDataNodes.values()) {
×
245
        result += dataNodeConfiguration.getResource().getCpuCoreNum();
×
246
      }
×
247
    } finally {
248
      dataNodeInfoReadWriteLock.readLock().unlock();
×
249
    }
250
    return result;
×
251
  }
252

253
  /** @return All registered DataNodes. */
254
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
255
    List<TDataNodeConfiguration> result;
256
    dataNodeInfoReadWriteLock.readLock().lock();
×
257
    try {
258
      result = new ArrayList<>(registeredDataNodes.values());
×
259
    } finally {
260
      dataNodeInfoReadWriteLock.readLock().unlock();
×
261
    }
262
    return result;
×
263
  }
264

265
  /** @return The specified registered DataNode. */
266
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
267
    dataNodeInfoReadWriteLock.readLock().lock();
×
268
    try {
269
      return registeredDataNodes.getOrDefault(dataNodeId, new TDataNodeConfiguration()).deepCopy();
×
270
    } finally {
271
      dataNodeInfoReadWriteLock.readLock().unlock();
×
272
    }
273
  }
274

275
  /** @return The specified registered DataNodes. */
276
  public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
277
    List<TDataNodeConfiguration> result = new ArrayList<>();
×
278
    dataNodeInfoReadWriteLock.readLock().lock();
×
279
    try {
280
      dataNodeIds.forEach(
×
281
          dataNodeId -> {
282
            if (registeredDataNodes.containsKey(dataNodeId)) {
×
283
              result.add(registeredDataNodes.get(dataNodeId).deepCopy());
×
284
            }
285
          });
×
286
    } finally {
287
      dataNodeInfoReadWriteLock.readLock().unlock();
×
288
    }
289
    return result;
×
290
  }
291

292
  /**
293
   * Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
294
   *
295
   * @param applyConfigNodePlan ApplyConfigNodePlan
296
   * @return {@link TSStatusCode#ADD_CONFIGNODE_ERROR} if update online ConfigNode failed.
297
   */
298
  public TSStatus applyConfigNode(ApplyConfigNodePlan applyConfigNodePlan) {
299
    TSStatus status = new TSStatus();
1✔
300
    configNodeInfoReadWriteLock.writeLock().lock();
1✔
301
    try {
302
      // To ensure that the nextNodeId is updated correctly when
303
      // the ConfigNode-followers concurrently processes ApplyConfigNodePlan,
304
      // We need to add a synchronization lock here
305
      synchronized (nextNodeId) {
1✔
306
        if (nextNodeId.get() < applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId()) {
1✔
307
          nextNodeId.set(applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
1✔
308
        }
309
      }
1✔
310

311
      registeredConfigNodes.put(
1✔
312
          applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId(),
1✔
313
          applyConfigNodePlan.getConfigNodeLocation());
1✔
314
      SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
1✔
315
      LOGGER.info(
1✔
316
          "Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
317
          applyConfigNodePlan.getConfigNodeLocation(),
1✔
318
          registeredConfigNodes);
319
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
320
    } catch (IOException e) {
×
321
      LOGGER.error("Update online ConfigNode failed.", e);
×
322
      status.setCode(TSStatusCode.ADD_CONFIGNODE_ERROR.getStatusCode());
×
323
      status.setMessage(
×
324
          "Apply new ConfigNode failed because current ConfigNode can't store ConfigNode information.");
325
    } finally {
326
      configNodeInfoReadWriteLock.writeLock().unlock();
1✔
327
    }
328
    return status;
1✔
329
  }
330

331
  /**
332
   * Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
333
   *
334
   * @param removeConfigNodePlan RemoveConfigNodePlan
335
   * @return {@link TSStatusCode#REMOVE_CONFIGNODE_ERROR} if remove online ConfigNode failed.
336
   */
337
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
338
    TSStatus status = new TSStatus();
×
339
    configNodeInfoReadWriteLock.writeLock().lock();
×
340
    versionInfoReadWriteLock.writeLock().lock();
×
341
    try {
342
      registeredConfigNodes.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
×
343
      nodeVersionInfo.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
×
344
      SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
×
345
      LOGGER.info(
×
346
          "Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
347
          removeConfigNodePlan.getConfigNodeLocation(),
×
348
          registeredConfigNodes);
349
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
350
    } catch (IOException e) {
×
351
      LOGGER.error("Remove online ConfigNode failed.", e);
×
352
      status.setCode(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode());
×
353
      status.setMessage(
×
354
          "Remove ConfigNode failed because current ConfigNode can't store ConfigNode information.");
355
    } finally {
356
      versionInfoReadWriteLock.writeLock().unlock();
×
357
      configNodeInfoReadWriteLock.writeLock().unlock();
×
358
    }
359
    return status;
×
360
  }
361

362
  /**
363
   * Update the specified Node‘s versionInfo.
364
   *
365
   * @param updateVersionInfoPlan UpdateVersionInfoPlan
366
   * @return {@link TSStatusCode#SUCCESS_STATUS} if update build info successfully.
367
   */
368
  public TSStatus updateVersionInfo(UpdateVersionInfoPlan updateVersionInfoPlan) {
369
    versionInfoReadWriteLock.writeLock().lock();
×
370
    try {
371
      nodeVersionInfo.put(
×
372
          updateVersionInfoPlan.getNodeId(), updateVersionInfoPlan.getVersionInfo());
×
373
    } finally {
374
      versionInfoReadWriteLock.writeLock().unlock();
×
375
    }
376
    LOGGER.info("Successfully update Node {} 's version.", updateVersionInfoPlan.getNodeId());
×
377
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
378
  }
379

380
  /** @return All registered ConfigNodes. */
381
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
382
    List<TConfigNodeLocation> result;
383
    configNodeInfoReadWriteLock.readLock().lock();
×
384
    try {
385
      result = new ArrayList<>(registeredConfigNodes.values());
×
386
    } finally {
387
      configNodeInfoReadWriteLock.readLock().unlock();
×
388
    }
389
    return result;
×
390
  }
391

392
  /** @return The specified registered ConfigNode. */
393
  public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
394
    List<TConfigNodeLocation> result = new ArrayList<>();
×
395
    configNodeInfoReadWriteLock.readLock().lock();
×
396
    try {
397
      configNodeIds.forEach(
×
398
          configNodeId -> {
399
            if (registeredConfigNodes.containsKey(configNodeId)) {
×
400
              result.add(registeredConfigNodes.get(configNodeId).deepCopy());
×
401
            }
402
          });
×
403
    } finally {
404
      configNodeInfoReadWriteLock.readLock().unlock();
×
405
    }
406
    return result;
×
407
  }
408

409
  /** @return all nodes buildInfo */
410
  public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
411
    Map<Integer, TNodeVersionInfo> result = new HashMap<>(nodeVersionInfo.size());
×
412
    versionInfoReadWriteLock.readLock().lock();
×
413
    try {
414
      result.putAll(nodeVersionInfo);
×
415
    } finally {
416
      versionInfoReadWriteLock.readLock().unlock();
×
417
    }
418
    return result;
×
419
  }
420

421
  public TNodeVersionInfo getVersionInfo(int nodeId) {
422
    versionInfoReadWriteLock.readLock().lock();
×
423
    try {
424
      return nodeVersionInfo.getOrDefault(nodeId, new TNodeVersionInfo("Unknown", "Unknown"));
×
425
    } finally {
426
      versionInfoReadWriteLock.readLock().unlock();
×
427
    }
428
  }
429

430
  public int generateNextNodeId() {
431
    return nextNodeId.incrementAndGet();
×
432
  }
433

434
  @Override
435
  public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
436
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
1✔
437
    if (snapshotFile.exists() && snapshotFile.isFile()) {
1✔
438
      LOGGER.error(
×
439
          "Failed to take snapshot, because snapshot file [{}] is already exist.",
440
          snapshotFile.getAbsolutePath());
×
441
      return false;
×
442
    }
443

444
    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
1✔
445
    configNodeInfoReadWriteLock.readLock().lock();
1✔
446
    dataNodeInfoReadWriteLock.readLock().lock();
1✔
447
    versionInfoReadWriteLock.readLock().lock();
1✔
448
    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
1✔
449
        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
1✔
450

451
      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
1✔
452

453
      ReadWriteIOUtils.write(nextNodeId.get(), fileOutputStream);
1✔
454

455
      serializeRegisteredConfigNode(fileOutputStream, protocol);
1✔
456

457
      serializeRegisteredDataNode(fileOutputStream, protocol);
1✔
458

459
      serializeVersionInfo(fileOutputStream);
1✔
460

461
      fileOutputStream.flush();
1✔
462

463
      fileOutputStream.close();
1✔
464

465
      return tmpFile.renameTo(snapshotFile);
1✔
466

467
    } finally {
468
      versionInfoReadWriteLock.readLock().unlock();
1✔
469
      dataNodeInfoReadWriteLock.readLock().unlock();
1✔
470
      configNodeInfoReadWriteLock.readLock().unlock();
1✔
471
      for (int retry = 0; retry < 5; retry++) {
1✔
472
        if (!tmpFile.exists() || tmpFile.delete()) {
1✔
473
          break;
×
474
        } else {
475
          LOGGER.warn(
×
476
              "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
×
477
        }
478
      }
479
    }
480
  }
481

482
  private void serializeRegisteredConfigNode(OutputStream outputStream, TProtocol protocol)
483
      throws IOException, TException {
484
    ReadWriteIOUtils.write(registeredConfigNodes.size(), outputStream);
1✔
485
    for (Entry<Integer, TConfigNodeLocation> entry : registeredConfigNodes.entrySet()) {
1✔
486
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
1✔
487
      entry.getValue().write(protocol);
1✔
488
    }
1✔
489
  }
1✔
490

491
  private void serializeRegisteredDataNode(OutputStream outputStream, TProtocol protocol)
492
      throws IOException, TException {
493
    ReadWriteIOUtils.write(registeredDataNodes.size(), outputStream);
1✔
494
    for (Entry<Integer, TDataNodeConfiguration> entry : registeredDataNodes.entrySet()) {
1✔
495
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
1✔
496
      entry.getValue().write(protocol);
1✔
497
    }
1✔
498
  }
1✔
499

500
  private void serializeVersionInfo(OutputStream outputStream) throws IOException {
501
    ReadWriteIOUtils.write(nodeVersionInfo.size(), outputStream);
1✔
502
    for (Entry<Integer, TNodeVersionInfo> entry : nodeVersionInfo.entrySet()) {
1✔
503
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
×
504
      ReadWriteIOUtils.write(entry.getValue().getVersion(), outputStream);
×
505
      ReadWriteIOUtils.write(entry.getValue().getBuildInfo(), outputStream);
×
506
    }
×
507
  }
1✔
508

509
  @Override
510
  public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
511

512
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
1✔
513
    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
1✔
514
      LOGGER.error(
×
515
          "Failed to load snapshot,snapshot file [{}] is not exist.",
516
          snapshotFile.getAbsolutePath());
×
517
      return;
×
518
    }
519

520
    configNodeInfoReadWriteLock.writeLock().lock();
1✔
521
    dataNodeInfoReadWriteLock.writeLock().lock();
1✔
522
    versionInfoReadWriteLock.writeLock().lock();
1✔
523

524
    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
1✔
525
        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
1✔
526
      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
1✔
527

528
      clear();
1✔
529

530
      nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
1✔
531

532
      deserializeRegisteredConfigNode(fileInputStream, protocol);
1✔
533

534
      deserializeRegisteredDataNode(fileInputStream, protocol);
1✔
535

536
      deserializeBuildInfo(fileInputStream);
1✔
537

538
    } finally {
539
      versionInfoReadWriteLock.writeLock().unlock();
1✔
540
      dataNodeInfoReadWriteLock.writeLock().unlock();
1✔
541
      configNodeInfoReadWriteLock.writeLock().unlock();
1✔
542
    }
543
  }
1✔
544

545
  private void deserializeRegisteredConfigNode(InputStream inputStream, TProtocol protocol)
546
      throws IOException, TException {
547
    int size = ReadWriteIOUtils.readInt(inputStream);
1✔
548
    while (size > 0) {
1✔
549
      int configNodeId = ReadWriteIOUtils.readInt(inputStream);
1✔
550
      TConfigNodeLocation configNodeLocation = new TConfigNodeLocation();
1✔
551
      configNodeLocation.read(protocol);
1✔
552
      registeredConfigNodes.put(configNodeId, configNodeLocation);
1✔
553
      size--;
1✔
554
    }
1✔
555
  }
1✔
556

557
  private void deserializeRegisteredDataNode(InputStream inputStream, TProtocol protocol)
558
      throws IOException, TException {
559
    int size = ReadWriteIOUtils.readInt(inputStream);
1✔
560
    while (size > 0) {
1✔
561
      int dataNodeId = ReadWriteIOUtils.readInt(inputStream);
1✔
562
      TDataNodeConfiguration dataNodeInfo = new TDataNodeConfiguration();
1✔
563
      dataNodeInfo.read(protocol);
1✔
564
      registeredDataNodes.put(dataNodeId, dataNodeInfo);
1✔
565
      size--;
1✔
566
    }
1✔
567
  }
1✔
568

569
  private void deserializeBuildInfo(InputStream inputStream) throws IOException {
570
    // old version may not have build info,
571
    // thus we need to check inputStream before deserialize.
572
    if (inputStream.available() != 0) {
1✔
573
      int size = ReadWriteIOUtils.readInt(inputStream);
1✔
574
      while (size > 0) {
1✔
575
        int nodeId = ReadWriteIOUtils.readInt(inputStream);
×
576
        String version = ReadWriteIOUtils.readString(inputStream);
×
577
        String buildInfo = ReadWriteIOUtils.readString(inputStream);
×
578
        nodeVersionInfo.put(nodeId, new TNodeVersionInfo(version, buildInfo));
×
579
        size--;
×
580
      }
×
581
    }
582
  }
1✔
583

584
  public static int getMinimumDataNode() {
585
    return MINIMUM_DATANODE;
×
586
  }
587

588
  public void clear() {
589
    nextNodeId.set(-1);
1✔
590
    registeredDataNodes.clear();
1✔
591
    registeredConfigNodes.clear();
1✔
592
    nodeVersionInfo.clear();
1✔
593
  }
1✔
594

595
  @Override
596
  public boolean equals(Object o) {
597
    if (this == o) {
1✔
598
      return true;
×
599
    }
600
    if (o == null || getClass() != o.getClass()) {
1✔
601
      return false;
×
602
    }
603
    NodeInfo nodeInfo = (NodeInfo) o;
1✔
604
    return registeredConfigNodes.equals(nodeInfo.registeredConfigNodes)
1✔
605
        && nextNodeId.get() == nodeInfo.nextNodeId.get()
1✔
606
        && registeredDataNodes.equals(nodeInfo.registeredDataNodes)
1✔
607
        && nodeVersionInfo.equals(nodeInfo.nodeVersionInfo);
1✔
608
  }
609

610
  @Override
611
  public int hashCode() {
612
    return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes, nodeVersionInfo);
×
613
  }
614
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc