• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9986

04 Sep 2023 12:14AM UTC coverage: 47.705% (+0.003%) from 47.702%
#9986

push

travis_ci

web-flow
Fix possible NPE while executing show cluster or show cluster details

42 of 42 new or added lines in 2 files covered. (100.0%)

80193 of 168102 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.85
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.iot.logdispatcher;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.commons.client.IClientManager;
24
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
25
import org.apache.iotdb.commons.concurrent.ThreadName;
26
import org.apache.iotdb.commons.service.metric.MetricService;
27
import org.apache.iotdb.consensus.common.Peer;
28
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
29
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
30
import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
31
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
32
import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
33
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
34
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
35
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
36
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesReq;
37

38
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
40

41
import java.io.IOException;
42
import java.util.Iterator;
43
import java.util.LinkedList;
44
import java.util.List;
45
import java.util.Objects;
46
import java.util.OptionalLong;
47
import java.util.concurrent.ArrayBlockingQueue;
48
import java.util.concurrent.BlockingQueue;
49
import java.util.concurrent.ExecutorService;
50
import java.util.concurrent.TimeUnit;
51
import java.util.concurrent.atomic.AtomicLong;
52
import java.util.stream.Collectors;
53

54
/** Manage all asynchronous replication threads and corresponding async clients. */
55
public class LogDispatcher {
56

57
  private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
1✔
58
  private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
59
  private final IoTConsensusServerImpl impl;
60
  private final List<LogDispatcherThread> threads;
61
  private final int selfPeerId;
62
  private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
63
  private ExecutorService executorService;
64

65
  private boolean stopped = false;
1✔
66

67
  private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
1✔
68
  private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
1✔
69

70
  public LogDispatcher(
71
      IoTConsensusServerImpl impl,
72
      IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) {
1✔
73
    this.impl = impl;
1✔
74
    this.selfPeerId = impl.getThisNode().getNodeId();
1✔
75
    this.clientManager = clientManager;
1✔
76
    this.threads =
1✔
77
        impl.getConfiguration().stream()
1✔
78
            .filter(x -> !Objects.equals(x, impl.getThisNode()))
1✔
79
            .map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX))
1✔
80
            .collect(Collectors.toList());
1✔
81
    if (!threads.isEmpty()) {
1✔
82
      initLogSyncThreadPool();
1✔
83
    }
84
  }
1✔
85

86
  private void initLogSyncThreadPool() {
87
    // We use cached thread pool here because each LogDispatcherThread will occupy one thread.
88
    // And every LogDispatcherThread won't release its thread in this pool because it won't stop
89
    // unless LogDispatcher stop.
90
    // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread.
91
    this.executorService =
1✔
92
        IoTDBThreadPoolFactory.newCachedThreadPool(
1✔
93
            ThreadName.LOG_DISPATCHER.getName() + "-" + impl.getThisNode().getGroupId());
1✔
94
  }
1✔
95

96
  public synchronized void start() {
97
    if (!threads.isEmpty()) {
1✔
98
      threads.forEach(executorService::submit);
1✔
99
    }
100
  }
1✔
101

102
  public synchronized void stop() {
103
    if (!threads.isEmpty()) {
1✔
104
      executorService.shutdownNow();
1✔
105
      int timeout = 10;
1✔
106
      try {
107
        if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
1✔
108
          logger.error("Unable to shutdown LogDispatcher service after {} seconds", timeout);
×
109
        }
110
      } catch (InterruptedException e) {
×
111
        Thread.currentThread().interrupt();
×
112
        logger.error("Unexpected Interruption when closing LogDispatcher service ");
×
113
      }
1✔
114
      threads.forEach(LogDispatcherThread::stop);
1✔
115
    }
116
    stopped = true;
1✔
117
  }
1✔
118

119
  public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
120
    if (stopped) {
×
121
      return;
×
122
    }
123
    LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
×
124
    threads.add(thread);
×
125
    // If the initial replica is 1, the executorService won't be initialized. And when adding
126
    // dispatcher thread, the executorService should be initialized manually
127
    if (this.executorService == null) {
×
128
      initLogSyncThreadPool();
×
129
    }
130
    executorService.submit(thread);
×
131
  }
×
132

133
  public synchronized void removeLogDispatcherThread(Peer peer) throws IOException {
134
    if (stopped) {
×
135
      return;
×
136
    }
137
    int threadIndex = -1;
×
138
    for (int i = 0; i < threads.size(); i++) {
×
139
      if (threads.get(i).peer.equals(peer)) {
×
140
        threadIndex = i;
×
141
        break;
×
142
      }
143
    }
144
    if (threadIndex == -1) {
×
145
      return;
×
146
    }
147
    threads.get(threadIndex).stop();
×
148
    threads.get(threadIndex).cleanup();
×
149
    threads.remove(threadIndex);
×
150
  }
×
151

152
  public synchronized OptionalLong getMinSyncIndex() {
153
    return threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
1✔
154
  }
155

156
  public void offer(IndexedConsensusRequest request) {
157
    // we don't need to serialize and offer request when replicaNum is 1.
158
    if (!threads.isEmpty()) {
1✔
159
      request.buildSerializedRequests();
1✔
160
      synchronized (this) {
1✔
161
        threads.forEach(
1✔
162
            thread -> {
163
              logger.debug(
1✔
164
                  "{}->{}: Push a log to the queue, where the queue length is {}",
165
                  impl.getThisNode().getGroupId(),
1✔
166
                  thread.getPeer().getEndpoint().getIp(),
1✔
167
                  thread.getPendingEntriesSize());
1✔
168
              if (!thread.offer(request)) {
1✔
169
                logger.debug(
×
170
                    "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
171
                    impl.getThisNode().getGroupId(),
×
172
                    thread.getPeer(),
×
173
                    request.getSearchIndex());
×
174
              }
175
            });
1✔
176
      }
1✔
177
    }
178
  }
1✔
179

180
  public long getLogEntriesFromWAL() {
181
    return logEntriesFromWAL.get();
×
182
  }
183

184
  public long getLogEntriesFromQueue() {
185
    return logEntriesFromQueue.get();
×
186
  }
187

188
  public class LogDispatcherThread implements Runnable {
189

190
    private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
191
    private static final long START_INDEX = 1;
192
    private final IoTConsensusConfig config;
193
    private final Peer peer;
194
    private final IndexController controller;
195
    // A sliding window class that manages asynchronous pendingBatches
196
    private final SyncStatus syncStatus;
197
    // A queue used to receive asynchronous replication requests
198
    private final BlockingQueue<IndexedConsensusRequest> pendingEntries;
199
    // A container used to cache requests, whose size changes dynamically
200
    private final List<IndexedConsensusRequest> bufferedEntries = new LinkedList<>();
1✔
201
    // A reader management class that gets requests from the DataRegion
202
    private final ConsensusReqReader reader =
1✔
203
        (ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan());
1✔
204
    private final IoTConsensusMemoryManager iotConsensusMemoryManager =
1✔
205
        IoTConsensusMemoryManager.getInstance();
1✔
206
    private volatile boolean stopped = false;
1✔
207

208
    private final ConsensusReqReader.ReqIterator walEntryIterator;
209

210
    private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
211

212
    public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
1✔
213
      this.peer = peer;
1✔
214
      this.config = config;
1✔
215
      this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
1✔
216
      this.controller =
1✔
217
          new IndexController(
218
              impl.getStorageDir(),
1✔
219
              peer,
220
              initialSyncIndex,
221
              config.getReplication().getCheckpointGap());
1✔
222
      this.syncStatus = new SyncStatus(controller, config);
1✔
223
      this.walEntryIterator = reader.getReqIterator(START_INDEX);
1✔
224
      this.logDispatcherThreadMetrics = new LogDispatcherThreadMetrics(this);
1✔
225
      MetricService.getInstance().addMetricSet(logDispatcherThreadMetrics);
1✔
226
    }
1✔
227

228
    public IndexController getController() {
229
      return controller;
×
230
    }
231

232
    public long getCurrentSyncIndex() {
233
      return controller.getCurrentIndex();
1✔
234
    }
235

236
    public Peer getPeer() {
237
      return peer;
1✔
238
    }
239

240
    public IoTConsensusConfig getConfig() {
241
      return config;
1✔
242
    }
243

244
    public int getPendingEntriesSize() {
245
      return pendingEntries.size();
1✔
246
    }
247

248
    public int getBufferRequestSize() {
249
      return bufferedEntries.size();
×
250
    }
251

252
    /** try to offer a request into queue with memory control. */
253
    public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
254
      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
1✔
255
        return false;
×
256
      }
257
      boolean success;
258
      try {
259
        success = pendingEntries.offer(indexedConsensusRequest);
1✔
260
      } catch (Throwable t) {
×
261
        // If exception occurs during request offer, the reserved memory should be released
262
        iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
×
263
        throw t;
×
264
      }
1✔
265
      if (!success) {
1✔
266
        // If offer failed, the reserved memory should be released
267
        iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
×
268
      }
269
      return success;
1✔
270
    }
271

272
    /** try to remove a request from queue with memory control. */
273
    private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) {
274
      iotConsensusMemoryManager.free(indexedConsensusRequest.getSerializedSize(), true);
1✔
275
    }
1✔
276

277
    public void stop() {
278
      stopped = true;
1✔
279
      long requestSize = 0;
1✔
280
      for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
1✔
281
        requestSize += indexedConsensusRequest.getSerializedSize();
1✔
282
      }
1✔
283
      pendingEntries.clear();
1✔
284
      iotConsensusMemoryManager.free(requestSize, true);
1✔
285
      requestSize = 0;
1✔
286
      for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
1✔
287
        requestSize += indexedConsensusRequest.getSerializedSize();
×
288
      }
×
289
      iotConsensusMemoryManager.free(requestSize, true);
1✔
290
      syncStatus.free();
1✔
291
      MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
1✔
292
    }
1✔
293

294
    public void cleanup() throws IOException {
295
      this.controller.cleanupVersionFiles();
×
296
    }
×
297

298
    public boolean isStopped() {
299
      return stopped;
1✔
300
    }
301

302
    public IoTConsensusServerImpl getImpl() {
303
      return impl;
1✔
304
    }
305

306
    @Override
307
    public void run() {
308
      logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
1✔
309
      try {
310
        Batch batch;
311
        while (!Thread.interrupted()) {
1✔
312
          long startTime = System.nanoTime();
1✔
313
          while ((batch = getBatch()).isEmpty()) {
1✔
314
            // we may block here if there is no requests in the queue
315
            IndexedConsensusRequest request =
1✔
316
                pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
1✔
317
            if (request != null) {
1✔
318
              bufferedEntries.add(request);
1✔
319
              // If write pressure is low, we simply sleep a little to reduce the number of RPC
320
              if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch()
1✔
321
                  && bufferedEntries.isEmpty()) {
1✔
322
                Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
×
323
              }
324
            }
325
          }
1✔
326
          logDispatcherThreadMetrics.recordConstructBatchTime(
1✔
327
              (System.nanoTime() - startTime) / batch.getLogEntries().size());
1✔
328
          // we may block here if the synchronization pipeline is full
329
          syncStatus.addNextBatch(batch);
1✔
330
          logEntriesFromWAL.addAndGet(batch.getLogEntriesNumFromWAL());
1✔
331
          logEntriesFromQueue.addAndGet(
1✔
332
              batch.getLogEntries().size() - batch.getLogEntriesNumFromWAL());
1✔
333
          // sends batch asynchronously and migrates the retry logic into the callback handler
334
          sendBatchAsync(batch, new DispatchLogHandler(this, logDispatcherThreadMetrics, batch));
1✔
335
        }
1✔
336
      } catch (InterruptedException e) {
1✔
337
        Thread.currentThread().interrupt();
1✔
338
      } catch (Exception e) {
×
339
        logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
×
340
      }
1✔
341
      logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
1✔
342
    }
1✔
343

344
    public void updateSafelyDeletedSearchIndex() {
345
      // update safely deleted search index to delete outdated info,
346
      // indicating that insert nodes whose search index are before this value can be deleted
347
      // safely
348
      long currentSafelyDeletedSearchIndex = impl.getCurrentSafelyDeletedSearchIndex();
1✔
349
      reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
1✔
350
      // notify
351
      if (impl.unblockWrite()) {
1✔
352
        impl.signal();
1✔
353
      }
354
    }
1✔
355

356
    public Batch getBatch() {
357
      long startIndex = syncStatus.getNextSendingIndex();
1✔
358
      long maxIndex;
359
      synchronized (impl.getIndexObject()) {
1✔
360
        maxIndex = impl.getSearchIndex() + 1;
1✔
361
        logger.debug(
1✔
362
            "{}: startIndex: {}, maxIndex: {}, pendingEntries size: {}, bufferedEntries size: {}",
363
            impl.getThisNode().getGroupId(),
1✔
364
            startIndex,
1✔
365
            maxIndex,
1✔
366
            getPendingEntriesSize(),
1✔
367
            bufferedEntries.size());
1✔
368
        // Use drainTo instead of poll to reduce lock overhead
369
        pendingEntries.drainTo(
1✔
370
            bufferedEntries,
371
            config.getReplication().getMaxLogEntriesNumPerBatch() - bufferedEntries.size());
1✔
372
      }
1✔
373
      // remove all request that searchIndex < startIndex
374
      Iterator<IndexedConsensusRequest> iterator = bufferedEntries.iterator();
1✔
375
      while (iterator.hasNext()) {
1✔
376
        IndexedConsensusRequest request = iterator.next();
1✔
377
        if (request.getSearchIndex() < startIndex) {
1✔
378
          iterator.remove();
×
379
          releaseReservedMemory(request);
×
380
        } else {
381
          break;
382
        }
383
      }
×
384

385
      Batch batches = new Batch(config);
1✔
386
      // This condition will be executed in several scenarios:
387
      // 1. restart
388
      // 2. The getBatch() is invoked immediately at the moment the PendingEntries are consumed
389
      // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
390
      // `maxIndex`
391
      if (bufferedEntries.isEmpty()) {
1✔
392
        constructBatchFromWAL(startIndex, maxIndex, batches);
1✔
393
        batches.buildIndex();
1✔
394
        logger.debug(
1✔
395
            "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches);
1✔
396
      } else {
397
        // Notice that prev searchIndex >= startIndex
398
        iterator = bufferedEntries.iterator();
1✔
399
        IndexedConsensusRequest prev = iterator.next();
1✔
400

401
        // Prevents gap between logs. For example, some requests are not written into the queue when
402
        // the queue is full. In this case, requests need to be loaded from the WAL
403
        if (startIndex != prev.getSearchIndex()) {
1✔
404
          constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches);
×
405
          if (!batches.canAccumulate()) {
×
406
            batches.buildIndex();
×
407
            logger.debug(
×
408
                "{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batches);
×
409
            return batches;
×
410
          }
411
        }
412

413
        constructBatchIndexedFromConsensusRequest(prev, batches);
1✔
414
        iterator.remove();
1✔
415
        releaseReservedMemory(prev);
1✔
416
        if (!batches.canAccumulate()) {
1✔
417
          batches.buildIndex();
×
418
          logger.debug(
×
419
              "{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batches);
×
420
          return batches;
×
421
        }
422

423
        while (iterator.hasNext() && batches.canAccumulate()) {
1✔
424
          IndexedConsensusRequest current = iterator.next();
1✔
425
          // Prevents gap between logs. For example, some logs are not written into the queue when
426
          // the queue is full. In this case, requests need to be loaded from the WAL
427
          if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
1✔
428
            constructBatchFromWAL(prev.getSearchIndex() + 1, current.getSearchIndex(), batches);
×
429
            if (!batches.canAccumulate()) {
×
430
              batches.buildIndex();
×
431
              logger.debug(
×
432
                  "gap {} : accumulated a {} from queue and wal when gap",
433
                  impl.getThisNode().getGroupId(),
×
434
                  batches);
435
              return batches;
×
436
            }
437
          }
438
          constructBatchIndexedFromConsensusRequest(current, batches);
1✔
439
          prev = current;
1✔
440
          // We might not be able to remove all the elements in the bufferedEntries in the
441
          // current function, but that's fine, we'll continue processing these elements in the
442
          // bufferedEntries the next time we go into the function, they're never lost
443
          iterator.remove();
1✔
444
          releaseReservedMemory(current);
1✔
445
        }
1✔
446
        batches.buildIndex();
1✔
447
        logger.debug(
1✔
448
            "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batches);
1✔
449
      }
450
      return batches;
1✔
451
    }
452

453
    public void sendBatchAsync(Batch batch, DispatchLogHandler handler) {
454
      try {
455
        AsyncIoTConsensusServiceClient client = clientManager.borrowClient(peer.getEndpoint());
1✔
456
        TSyncLogEntriesReq req =
1✔
457
            new TSyncLogEntriesReq(
458
                selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getLogEntries());
1✔
459
        logger.debug(
1✔
460
            "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
461
            batch.getStartIndex(),
1✔
462
            batch.getEndIndex(),
1✔
463
            peer.getGroupId().convertToTConsensusGroupId());
1✔
464
        client.syncLogEntries(req, handler);
1✔
465
      } catch (Exception e) {
×
466
        logger.error("Can not sync logs to peer {} because", peer, e);
×
467
        handler.onError(e);
×
468
      }
1✔
469
    }
1✔
470

471
    public SyncStatus getSyncStatus() {
472
      return syncStatus;
1✔
473
    }
474

475
    private void constructBatchFromWAL(long currentIndex, long maxIndex, Batch logBatches) {
476
      logger.debug(
1✔
477
          "DataRegion[{}]->{}: currentIndex: {}, maxIndex: {}",
478
          peer.getGroupId().getId(),
1✔
479
          peer.getEndpoint().getIp(),
1✔
480
          currentIndex,
1✔
481
          maxIndex);
1✔
482
      // targetIndex is the index of request that we need to find
483
      long targetIndex = currentIndex;
1✔
484
      // Even if there is no WAL files, these code won't produce error.
485
      walEntryIterator.skipTo(targetIndex);
1✔
486
      while (targetIndex < maxIndex && logBatches.canAccumulate()) {
1✔
487
        logger.debug("construct from WAL for one Entry, index : {}", targetIndex);
1✔
488
        try {
489
          walEntryIterator.waitForNextReady();
1✔
490
        } catch (InterruptedException e) {
×
491
          Thread.currentThread().interrupt();
×
492
          logger.warn("wait for next WAL entry is interrupted");
×
493
        }
1✔
494
        IndexedConsensusRequest data = walEntryIterator.next();
1✔
495
        if (data.getSearchIndex() < targetIndex) {
1✔
496
          // if the index of request is smaller than currentIndex, then continue
497
          logger.warn(
×
498
              "search for one Entry which index is {}, but find a smaller one, index : {}",
499
              targetIndex,
×
500
              data.getSearchIndex());
×
501
          continue;
×
502
        } else if (data.getSearchIndex() > targetIndex) {
1✔
503
          logger.warn(
×
504
              "search for one Entry which index is {}, but find a larger one, index : {}",
505
              targetIndex,
×
506
              data.getSearchIndex());
×
507
          if (data.getSearchIndex() >= maxIndex) {
×
508
            // if the index of request is larger than maxIndex, then finish
509
            break;
×
510
          }
511
        }
512
        targetIndex = data.getSearchIndex() + 1;
1✔
513
        data.buildSerializedRequests();
1✔
514
        // construct request from wal
515
        logBatches.addTLogEntry(
1✔
516
            new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
1✔
517
      }
1✔
518
    }
1✔
519

520
    private void constructBatchIndexedFromConsensusRequest(
521
        IndexedConsensusRequest request, Batch logBatches) {
522
      logBatches.addTLogEntry(
1✔
523
          new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false));
1✔
524
    }
1✔
525
  }
526
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc