• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.81
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.persistence.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
26
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
27
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
28
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
29
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
30
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
31
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
32
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
33
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
34
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
35
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
36
import org.apache.iotdb.rpc.TSStatusCode;
37
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
38

39
import org.apache.thrift.TException;
40
import org.apache.thrift.protocol.TBinaryProtocol;
41
import org.apache.thrift.protocol.TProtocol;
42
import org.apache.thrift.transport.TIOStreamTransport;
43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
import java.io.File;
47
import java.io.FileInputStream;
48
import java.io.FileOutputStream;
49
import java.io.IOException;
50
import java.io.InputStream;
51
import java.io.OutputStream;
52
import java.util.ArrayList;
53
import java.util.Collections;
54
import java.util.HashMap;
55
import java.util.List;
56
import java.util.Map;
57
import java.util.Map.Entry;
58
import java.util.Objects;
59
import java.util.UUID;
60
import java.util.concurrent.ConcurrentHashMap;
61
import java.util.concurrent.atomic.AtomicInteger;
62
import java.util.concurrent.locks.ReentrantReadWriteLock;
63

64
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
65

66
/**
67
 * The {@link NodeInfo} stores cluster node information.
68
 *
69
 * <p>The cluster node information includes:
70
 *
71
 * <p>1. DataNode information
72
 *
73
 * <p>2. ConfigNode information
74
 */
75
public class NodeInfo implements SnapshotProcessor {
76

77
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeInfo.class);
1✔
78

79
  private static final int MINIMUM_DATANODE =
1✔
80
      Math.max(
1✔
81
          ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor(),
1✔
82
          ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor());
1✔
83

84
  // Registered ConfigNodes
85
  private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
86
  private final Map<Integer, TConfigNodeLocation> registeredConfigNodes;
87

88
  // Registered DataNodes
89
  private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
90

91
  private final ReentrantReadWriteLock buildInfoReadWriteLock;
92

93
  private final AtomicInteger nextNodeId = new AtomicInteger(-1);
1✔
94
  private final Map<Integer, TDataNodeConfiguration> registeredDataNodes;
95

96
  private final Map<Integer, String> nodeBuildInfo;
97
  private static final String SNAPSHOT_FILENAME = "node_info.bin";
98

99
  public NodeInfo() {
1✔
100
    this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
101
    this.registeredConfigNodes = new ConcurrentHashMap<>();
1✔
102

103
    this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
104
    this.registeredDataNodes = new ConcurrentHashMap<>();
1✔
105

106
    this.nodeBuildInfo = new ConcurrentHashMap<>();
1✔
107
    this.buildInfoReadWriteLock = new ReentrantReadWriteLock();
1✔
108
  }
1✔
109

110
  /**
111
   * Persist DataNode info.
112
   *
113
   * @param registerDataNodePlan RegisterDataNodePlan
114
   * @return {@link TSStatusCode#SUCCESS_STATUS}
115
   */
116
  public TSStatus registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
117
    TSStatus result;
118
    TDataNodeConfiguration info = registerDataNodePlan.getDataNodeConfiguration();
1✔
119
    dataNodeInfoReadWriteLock.writeLock().lock();
1✔
120
    try {
121

122
      // To ensure that the nextNodeId is updated correctly when
123
      // the ConfigNode-followers concurrently processes RegisterDataNodePlan,
124
      // we need to add a synchronization lock here
125
      synchronized (nextNodeId) {
1✔
126
        if (nextNodeId.get() < info.getLocation().getDataNodeId()) {
1✔
127
          nextNodeId.set(info.getLocation().getDataNodeId());
1✔
128
        }
129
      }
1✔
130
      registeredDataNodes.put(info.getLocation().getDataNodeId(), info);
1✔
131
      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
132
      if (nextNodeId.get() < MINIMUM_DATANODE) {
1✔
133
        result.setMessage(
×
134
            String.format(
×
135
                "To enable IoTDB-Cluster's data service, please register %d more IoTDB-DataNode",
136
                MINIMUM_DATANODE - nextNodeId.get()));
×
137
      } else if (nextNodeId.get() == MINIMUM_DATANODE) {
1✔
138
        result.setMessage("IoTDB-Cluster could provide data service, now enjoy yourself!");
×
139
      }
140
    } finally {
141
      dataNodeInfoReadWriteLock.writeLock().unlock();
1✔
142
    }
143
    return result;
1✔
144
  }
145

146
  /**
147
   * Persist Information about remove dataNode.
148
   *
149
   * @param req RemoveDataNodePlan
150
   * @return {@link TSStatus}
151
   */
152
  public TSStatus removeDataNode(RemoveDataNodePlan req) {
153
    LOGGER.info(
×
154
        "{}, There are {} data node in cluster before executed RemoveDataNodePlan",
155
        REMOVE_DATANODE_PROCESS,
156
        registeredDataNodes.size());
×
157

158
    dataNodeInfoReadWriteLock.writeLock().lock();
×
159
    buildInfoReadWriteLock.writeLock().lock();
×
160
    try {
161
      req.getDataNodeLocations()
×
162
          .forEach(
×
163
              removeDataNodes -> {
164
                registeredDataNodes.remove(removeDataNodes.getDataNodeId());
×
165
                nodeBuildInfo.remove(removeDataNodes.getDataNodeId());
×
166
                LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
×
167
              });
×
168
    } finally {
169
      buildInfoReadWriteLock.writeLock().unlock();
×
170
      dataNodeInfoReadWriteLock.writeLock().unlock();
×
171
    }
172
    LOGGER.info(
×
173
        "{}, There are {} data node in cluster after executed RemoveDataNodePlan",
174
        REMOVE_DATANODE_PROCESS,
175
        registeredDataNodes.size());
×
176
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
177
  }
178

179
  /**
180
   * Update the specified DataNode‘s location.
181
   *
182
   * @param updateDataNodePlan UpdateDataNodePlan
183
   * @return {@link TSStatusCode#SUCCESS_STATUS} if update DataNode info successfully.
184
   */
185
  public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
186
    dataNodeInfoReadWriteLock.writeLock().lock();
×
187
    try {
188
      TDataNodeConfiguration newConfiguration = updateDataNodePlan.getDataNodeConfiguration();
×
189
      registeredDataNodes.replace(newConfiguration.getLocation().getDataNodeId(), newConfiguration);
×
190
    } finally {
191
      dataNodeInfoReadWriteLock.writeLock().unlock();
×
192
    }
193
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
194
  }
195

196
  /**
197
   * Get DataNodeConfiguration.
198
   *
199
   * @param getDataNodeConfigurationPlan GetDataNodeConfigurationPlan
200
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
201
   *     GetDataNodeConfigurationPlan is -1
202
   */
203
  public DataNodeConfigurationResp getDataNodeConfiguration(
204
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
205
    DataNodeConfigurationResp result = new DataNodeConfigurationResp();
×
206
    result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
×
207

208
    int dataNodeId = getDataNodeConfigurationPlan.getDataNodeId();
×
209
    dataNodeInfoReadWriteLock.readLock().lock();
×
210
    try {
211
      if (dataNodeId == -1) {
×
212
        result.setDataNodeConfigurationMap(new HashMap<>(registeredDataNodes));
×
213
      } else {
214
        result.setDataNodeConfigurationMap(
×
215
            registeredDataNodes.get(dataNodeId) == null
×
216
                ? new HashMap<>(0)
×
217
                : Collections.singletonMap(dataNodeId, registeredDataNodes.get(dataNodeId)));
×
218
      }
219
    } finally {
220
      dataNodeInfoReadWriteLock.readLock().unlock();
×
221
    }
222

223
    return result;
×
224
  }
225

226
  /** Return the number of registered DataNodes. */
227
  public int getRegisteredDataNodeCount() {
228
    int result;
229
    dataNodeInfoReadWriteLock.readLock().lock();
×
230
    try {
231
      result = registeredDataNodes.size();
×
232
    } finally {
233
      dataNodeInfoReadWriteLock.readLock().unlock();
×
234
    }
235
    return result;
×
236
  }
237

238
  /** Return the number of total cpu cores in online DataNodes. */
239
  public int getTotalCpuCoreCount() {
240
    int result = 0;
×
241
    dataNodeInfoReadWriteLock.readLock().lock();
×
242
    try {
243
      for (TDataNodeConfiguration dataNodeConfiguration : registeredDataNodes.values()) {
×
244
        result += dataNodeConfiguration.getResource().getCpuCoreNum();
×
245
      }
×
246
    } finally {
247
      dataNodeInfoReadWriteLock.readLock().unlock();
×
248
    }
249
    return result;
×
250
  }
251

252
  /** @return All registered DataNodes. */
253
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
254
    List<TDataNodeConfiguration> result;
255
    dataNodeInfoReadWriteLock.readLock().lock();
×
256
    try {
257
      result = new ArrayList<>(registeredDataNodes.values());
×
258
    } finally {
259
      dataNodeInfoReadWriteLock.readLock().unlock();
×
260
    }
261
    return result;
×
262
  }
263

264
  /** @return The specified registered DataNode. */
265
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
266
    dataNodeInfoReadWriteLock.readLock().lock();
×
267
    try {
268
      return registeredDataNodes.getOrDefault(dataNodeId, new TDataNodeConfiguration()).deepCopy();
×
269
    } finally {
270
      dataNodeInfoReadWriteLock.readLock().unlock();
×
271
    }
272
  }
273

274
  /** @return The specified registered DataNodes. */
275
  public List<TDataNodeConfiguration> getRegisteredDataNodes(List<Integer> dataNodeIds) {
276
    List<TDataNodeConfiguration> result = new ArrayList<>();
×
277
    dataNodeInfoReadWriteLock.readLock().lock();
×
278
    try {
279
      dataNodeIds.forEach(
×
280
          dataNodeId -> {
281
            if (registeredDataNodes.containsKey(dataNodeId)) {
×
282
              result.add(registeredDataNodes.get(dataNodeId).deepCopy());
×
283
            }
284
          });
×
285
    } finally {
286
      dataNodeInfoReadWriteLock.readLock().unlock();
×
287
    }
288
    return result;
×
289
  }
290

291
  /**
292
   * Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
293
   *
294
   * @param applyConfigNodePlan ApplyConfigNodePlan
295
   * @return {@link TSStatusCode#ADD_CONFIGNODE_ERROR} if update online ConfigNode failed.
296
   */
297
  public TSStatus applyConfigNode(ApplyConfigNodePlan applyConfigNodePlan) {
298
    TSStatus status = new TSStatus();
1✔
299
    configNodeInfoReadWriteLock.writeLock().lock();
1✔
300
    try {
301
      // To ensure that the nextNodeId is updated correctly when
302
      // the ConfigNode-followers concurrently processes ApplyConfigNodePlan,
303
      // We need to add a synchronization lock here
304
      synchronized (nextNodeId) {
1✔
305
        if (nextNodeId.get() < applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId()) {
1✔
306
          nextNodeId.set(applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
1✔
307
        }
308
      }
1✔
309

310
      registeredConfigNodes.put(
1✔
311
          applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId(),
1✔
312
          applyConfigNodePlan.getConfigNodeLocation());
1✔
313
      SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
1✔
314
      LOGGER.info(
1✔
315
          "Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
316
          applyConfigNodePlan.getConfigNodeLocation(),
1✔
317
          registeredConfigNodes);
318
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
319
    } catch (IOException e) {
×
320
      LOGGER.error("Update online ConfigNode failed.", e);
×
321
      status.setCode(TSStatusCode.ADD_CONFIGNODE_ERROR.getStatusCode());
×
322
      status.setMessage(
×
323
          "Apply new ConfigNode failed because current ConfigNode can't store ConfigNode information.");
324
    } finally {
325
      configNodeInfoReadWriteLock.writeLock().unlock();
1✔
326
    }
327
    return status;
1✔
328
  }
329

330
  /**
331
   * Update ConfigNodeList both in memory and confignode-system{@literal .}properties file.
332
   *
333
   * @param removeConfigNodePlan RemoveConfigNodePlan
334
   * @return {@link TSStatusCode#REMOVE_CONFIGNODE_ERROR} if remove online ConfigNode failed.
335
   */
336
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
337
    TSStatus status = new TSStatus();
×
338
    configNodeInfoReadWriteLock.writeLock().lock();
×
339
    buildInfoReadWriteLock.writeLock().lock();
×
340
    try {
341
      registeredConfigNodes.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
×
342
      nodeBuildInfo.remove(removeConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
×
343
      SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes.values()));
×
344
      LOGGER.info(
×
345
          "Successfully remove ConfigNode: {}. Current ConfigNodeGroup: {}",
346
          removeConfigNodePlan.getConfigNodeLocation(),
×
347
          registeredConfigNodes);
348
      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
349
    } catch (IOException e) {
×
350
      LOGGER.error("Remove online ConfigNode failed.", e);
×
351
      status.setCode(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode());
×
352
      status.setMessage(
×
353
          "Remove ConfigNode failed because current ConfigNode can't store ConfigNode information.");
354
    } finally {
355
      buildInfoReadWriteLock.writeLock().unlock();
×
356
      configNodeInfoReadWriteLock.writeLock().unlock();
×
357
    }
358
    return status;
×
359
  }
360

361
  /**
362
   * Update the specified Node‘s buildInfo.
363
   *
364
   * @param updateBuildInfoPlan UpdateBuildInfoPlan
365
   * @return {@link TSStatusCode#SUCCESS_STATUS} if update build info successfully.
366
   */
367
  public TSStatus updateBuildInfo(UpdateBuildInfoPlan updateBuildInfoPlan) {
368
    buildInfoReadWriteLock.writeLock().lock();
×
369
    try {
370
      nodeBuildInfo.put(updateBuildInfoPlan.getNodeId(), updateBuildInfoPlan.getBuildInfo());
×
371
    } finally {
372
      buildInfoReadWriteLock.writeLock().unlock();
×
373
    }
374
    LOGGER.info("Successfully update Node {} 's buildInfo.", updateBuildInfoPlan.getNodeId());
×
375
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
376
  }
377

378
  /** @return All registered ConfigNodes. */
379
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
380
    List<TConfigNodeLocation> result;
381
    configNodeInfoReadWriteLock.readLock().lock();
×
382
    try {
383
      result = new ArrayList<>(registeredConfigNodes.values());
×
384
    } finally {
385
      configNodeInfoReadWriteLock.readLock().unlock();
×
386
    }
387
    return result;
×
388
  }
389

390
  /** @return The specified registered ConfigNode. */
391
  public List<TConfigNodeLocation> getRegisteredConfigNodes(List<Integer> configNodeIds) {
392
    List<TConfigNodeLocation> result = new ArrayList<>();
×
393
    configNodeInfoReadWriteLock.readLock().lock();
×
394
    try {
395
      configNodeIds.forEach(
×
396
          configNodeId -> {
397
            if (registeredConfigNodes.containsKey(configNodeId)) {
×
398
              result.add(registeredConfigNodes.get(configNodeId).deepCopy());
×
399
            }
400
          });
×
401
    } finally {
402
      configNodeInfoReadWriteLock.readLock().unlock();
×
403
    }
404
    return result;
×
405
  }
406

407
  /** @return all nodes buildInfo */
408
  public Map<Integer, String> getNodeBuildInfo() {
409
    Map<Integer, String> result = new HashMap<>();
×
410
    buildInfoReadWriteLock.readLock().lock();
×
411
    try {
412
      result.putAll(nodeBuildInfo);
×
413
    } finally {
414
      buildInfoReadWriteLock.readLock().unlock();
×
415
    }
416
    return result;
×
417
  }
418

419
  public String getBuildInfo(int nodeId) {
420
    buildInfoReadWriteLock.readLock().lock();
×
421
    try {
422
      return nodeBuildInfo.getOrDefault(nodeId, "Unknown");
×
423
    } finally {
424
      buildInfoReadWriteLock.readLock().unlock();
×
425
    }
426
  }
427

428
  public int generateNextNodeId() {
429
    return nextNodeId.incrementAndGet();
×
430
  }
431

432
  @Override
433
  public boolean processTakeSnapshot(File snapshotDir) throws IOException, TException {
434
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
1✔
435
    if (snapshotFile.exists() && snapshotFile.isFile()) {
1✔
436
      LOGGER.error(
×
437
          "Failed to take snapshot, because snapshot file [{}] is already exist.",
438
          snapshotFile.getAbsolutePath());
×
439
      return false;
×
440
    }
441

442
    File tmpFile = new File(snapshotFile.getAbsolutePath() + "-" + UUID.randomUUID());
1✔
443
    configNodeInfoReadWriteLock.readLock().lock();
1✔
444
    dataNodeInfoReadWriteLock.readLock().lock();
1✔
445
    buildInfoReadWriteLock.readLock().lock();
1✔
446
    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile);
1✔
447
        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
1✔
448

449
      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
1✔
450

451
      ReadWriteIOUtils.write(nextNodeId.get(), fileOutputStream);
1✔
452

453
      serializeRegisteredConfigNode(fileOutputStream, protocol);
1✔
454

455
      serializeRegisteredDataNode(fileOutputStream, protocol);
1✔
456

457
      serializeBuildInfo(fileOutputStream);
1✔
458

459
      fileOutputStream.flush();
1✔
460

461
      fileOutputStream.close();
1✔
462

463
      return tmpFile.renameTo(snapshotFile);
1✔
464

465
    } finally {
466
      buildInfoReadWriteLock.readLock().unlock();
1✔
467
      dataNodeInfoReadWriteLock.readLock().unlock();
1✔
468
      configNodeInfoReadWriteLock.readLock().unlock();
1✔
469
      for (int retry = 0; retry < 5; retry++) {
1✔
470
        if (!tmpFile.exists() || tmpFile.delete()) {
1✔
471
          break;
×
472
        } else {
473
          LOGGER.warn(
×
474
              "Can't delete temporary snapshot file: {}, retrying...", tmpFile.getAbsolutePath());
×
475
        }
476
      }
477
    }
478
  }
479

480
  private void serializeRegisteredConfigNode(OutputStream outputStream, TProtocol protocol)
481
      throws IOException, TException {
482
    ReadWriteIOUtils.write(registeredConfigNodes.size(), outputStream);
1✔
483
    for (Entry<Integer, TConfigNodeLocation> entry : registeredConfigNodes.entrySet()) {
1✔
484
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
1✔
485
      entry.getValue().write(protocol);
1✔
486
    }
1✔
487
  }
1✔
488

489
  private void serializeRegisteredDataNode(OutputStream outputStream, TProtocol protocol)
490
      throws IOException, TException {
491
    ReadWriteIOUtils.write(registeredDataNodes.size(), outputStream);
1✔
492
    for (Entry<Integer, TDataNodeConfiguration> entry : registeredDataNodes.entrySet()) {
1✔
493
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
1✔
494
      entry.getValue().write(protocol);
1✔
495
    }
1✔
496
  }
1✔
497

498
  private void serializeBuildInfo(OutputStream outputStream) throws IOException {
499
    ReadWriteIOUtils.write(nodeBuildInfo.size(), outputStream);
1✔
500
    for (Entry<Integer, String> entry : nodeBuildInfo.entrySet()) {
1✔
501
      ReadWriteIOUtils.write(entry.getKey(), outputStream);
×
502
      ReadWriteIOUtils.write(entry.getValue(), outputStream);
×
503
    }
×
504
  }
1✔
505

506
  @Override
507
  public void processLoadSnapshot(File snapshotDir) throws IOException, TException {
508

509
    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
1✔
510
    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
1✔
511
      LOGGER.error(
×
512
          "Failed to load snapshot,snapshot file [{}] is not exist.",
513
          snapshotFile.getAbsolutePath());
×
514
      return;
×
515
    }
516

517
    configNodeInfoReadWriteLock.writeLock().lock();
1✔
518
    dataNodeInfoReadWriteLock.writeLock().lock();
1✔
519
    buildInfoReadWriteLock.writeLock().lock();
1✔
520

521
    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
1✔
522
        TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
1✔
523
      TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
1✔
524

525
      clear();
1✔
526

527
      nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
1✔
528

529
      deserializeRegisteredConfigNode(fileInputStream, protocol);
1✔
530

531
      deserializeRegisteredDataNode(fileInputStream, protocol);
1✔
532

533
      deserializeBuildInfo(fileInputStream);
1✔
534

535
    } finally {
536
      buildInfoReadWriteLock.writeLock().unlock();
1✔
537
      dataNodeInfoReadWriteLock.writeLock().unlock();
1✔
538
      configNodeInfoReadWriteLock.writeLock().unlock();
1✔
539
    }
540
  }
1✔
541

542
  private void deserializeRegisteredConfigNode(InputStream inputStream, TProtocol protocol)
543
      throws IOException, TException {
544
    int size = ReadWriteIOUtils.readInt(inputStream);
1✔
545
    while (size > 0) {
1✔
546
      int configNodeId = ReadWriteIOUtils.readInt(inputStream);
1✔
547
      TConfigNodeLocation configNodeLocation = new TConfigNodeLocation();
1✔
548
      configNodeLocation.read(protocol);
1✔
549
      registeredConfigNodes.put(configNodeId, configNodeLocation);
1✔
550
      size--;
1✔
551
    }
1✔
552
  }
1✔
553

554
  private void deserializeRegisteredDataNode(InputStream inputStream, TProtocol protocol)
555
      throws IOException, TException {
556
    int size = ReadWriteIOUtils.readInt(inputStream);
1✔
557
    while (size > 0) {
1✔
558
      int dataNodeId = ReadWriteIOUtils.readInt(inputStream);
1✔
559
      TDataNodeConfiguration dataNodeInfo = new TDataNodeConfiguration();
1✔
560
      dataNodeInfo.read(protocol);
1✔
561
      registeredDataNodes.put(dataNodeId, dataNodeInfo);
1✔
562
      size--;
1✔
563
    }
1✔
564
  }
1✔
565

566
  private void deserializeBuildInfo(InputStream inputStream) throws IOException {
567
    // old version may not have build info,
568
    // thus we need to check inputStream before deserialize.
569
    if (inputStream.available() != 0) {
1✔
570
      int size = ReadWriteIOUtils.readInt(inputStream);
1✔
571
      while (size > 0) {
1✔
572
        int nodeId = ReadWriteIOUtils.readInt(inputStream);
×
573
        String buildInfo = ReadWriteIOUtils.readString(inputStream);
×
574
        nodeBuildInfo.put(nodeId, buildInfo);
×
575
        size--;
×
576
      }
×
577
    }
578
  }
1✔
579

580
  public static int getMinimumDataNode() {
581
    return MINIMUM_DATANODE;
×
582
  }
583

584
  public void clear() {
585
    nextNodeId.set(-1);
1✔
586
    registeredDataNodes.clear();
1✔
587
    registeredConfigNodes.clear();
1✔
588
    nodeBuildInfo.clear();
1✔
589
  }
1✔
590

591
  @Override
592
  public boolean equals(Object o) {
593
    if (this == o) {
1✔
594
      return true;
×
595
    }
596
    if (o == null || getClass() != o.getClass()) {
1✔
597
      return false;
×
598
    }
599
    NodeInfo nodeInfo = (NodeInfo) o;
1✔
600
    return registeredConfigNodes.equals(nodeInfo.registeredConfigNodes)
1✔
601
        && nextNodeId.get() == nodeInfo.nextNodeId.get()
1✔
602
        && registeredDataNodes.equals(nodeInfo.registeredDataNodes)
1✔
603
        && nodeBuildInfo.equals(nodeInfo.nodeBuildInfo);
1✔
604
  }
605

606
  @Override
607
  public int hashCode() {
608
    return Objects.hash(registeredConfigNodes, nextNodeId, registeredDataNodes, nodeBuildInfo);
×
609
  }
610
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc