• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9871

18 Aug 2023 08:01AM UTC coverage: 47.982% (-0.02%) from 48.003%
#9871

push

travis_ci

web-flow
[IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN send exactly one pipeMeta to each DN upon create/start/stop/drop pipe (#10875) (#10905)

Currently, CN sends pipeMeta of all existing pipes to each dn upon create/start/stop/drop pipe, which may be time-comsuming.

In this commit, CN will send exactly one pipeMeta to each DN upon create/start/stop/drop pipe.

---------

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 38b36006b)

95 of 95 new or added lines in 12 files covered. (100.0%)

79801 of 166313 relevant lines covered (47.98%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.57
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.persistence.pipe;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
25
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
26
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
28
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
29
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
30
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
31
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
32
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
33
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
34
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
35
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
36
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
37
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
38
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
39
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
40
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
41
import org.apache.iotdb.consensus.common.DataSet;
42
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
43
import org.apache.iotdb.pipe.api.exception.PipeException;
44
import org.apache.iotdb.rpc.TSStatusCode;
45

46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
import java.io.File;
50
import java.io.FileInputStream;
51
import java.io.FileOutputStream;
52
import java.io.IOException;
53
import java.util.LinkedList;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.concurrent.atomic.AtomicBoolean;
57
import java.util.stream.Collectors;
58
import java.util.stream.StreamSupport;
59

60
public class PipeTaskInfo implements SnapshotProcessor {
61

62
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskInfo.class);
1✔
63
  private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin";
64

65
  private final PipeMetaKeeper pipeMetaKeeper;
66

67
  public PipeTaskInfo() {
1✔
68
    this.pipeMetaKeeper = new PipeMetaKeeper();
1✔
69
  }
1✔
70

71
  /////////////////////////////// Lock ///////////////////////////////
72

73
  private void acquireReadLock() {
74
    pipeMetaKeeper.acquireReadLock();
1✔
75
  }
1✔
76

77
  private void releaseReadLock() {
78
    pipeMetaKeeper.releaseReadLock();
1✔
79
  }
1✔
80

81
  private void acquireWriteLock() {
82
    pipeMetaKeeper.acquireWriteLock();
1✔
83
  }
1✔
84

85
  private void releaseWriteLock() {
86
    pipeMetaKeeper.releaseWriteLock();
1✔
87
  }
1✔
88

89
  /////////////////////////////// Validator ///////////////////////////////
90

91
  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws PipeException {
92
    acquireReadLock();
×
93
    try {
94
      checkBeforeCreatePipeInternal(createPipeRequest);
×
95
    } finally {
96
      releaseReadLock();
×
97
    }
98
  }
×
99

100
  private void checkBeforeCreatePipeInternal(TCreatePipeReq createPipeRequest)
101
      throws PipeException {
102
    if (!isPipeExisted(createPipeRequest.getPipeName())) {
×
103
      return;
×
104
    }
105

106
    final String exceptionMessage =
×
107
        String.format(
×
108
            "Failed to create pipe %s, the pipe with the same name has been created",
109
            createPipeRequest.getPipeName());
×
110
    LOGGER.info(exceptionMessage);
×
111
    throw new PipeException(exceptionMessage);
×
112
  }
113

114
  public void checkBeforeStartPipe(String pipeName) throws PipeException {
115
    acquireReadLock();
×
116
    try {
117
      checkBeforeStartPipeInternal(pipeName);
×
118
    } finally {
119
      releaseReadLock();
×
120
    }
121
  }
×
122

123
  private void checkBeforeStartPipeInternal(String pipeName) throws PipeException {
124
    if (!isPipeExisted(pipeName)) {
×
125
      final String exceptionMessage =
×
126
          String.format("Failed to start pipe %s, the pipe does not exist", pipeName);
×
127
      LOGGER.info(exceptionMessage);
×
128
      throw new PipeException(exceptionMessage);
×
129
    }
130

131
    final PipeStatus pipeStatus = getPipeStatus(pipeName);
×
132
    if (pipeStatus == PipeStatus.RUNNING) {
×
133
      final String exceptionMessage =
×
134
          String.format("Failed to start pipe %s, the pipe is already running", pipeName);
×
135
      LOGGER.info(exceptionMessage);
×
136
      throw new PipeException(exceptionMessage);
×
137
    }
138
    if (pipeStatus == PipeStatus.DROPPED) {
×
139
      final String exceptionMessage =
×
140
          String.format("Failed to start pipe %s, the pipe is already dropped", pipeName);
×
141
      LOGGER.info(exceptionMessage);
×
142
      throw new PipeException(exceptionMessage);
×
143
    }
144
  }
×
145

146
  public void checkBeforeStopPipe(String pipeName) throws PipeException {
147
    acquireReadLock();
×
148
    try {
149
      checkBeforeStopPipeInternal(pipeName);
×
150
    } finally {
151
      releaseReadLock();
×
152
    }
153
  }
×
154

155
  private void checkBeforeStopPipeInternal(String pipeName) throws PipeException {
156
    if (!isPipeExisted(pipeName)) {
×
157
      final String exceptionMessage =
×
158
          String.format("Failed to stop pipe %s, the pipe does not exist", pipeName);
×
159
      LOGGER.info(exceptionMessage);
×
160
      throw new PipeException(exceptionMessage);
×
161
    }
162

163
    final PipeStatus pipeStatus = getPipeStatus(pipeName);
×
164
    if (pipeStatus == PipeStatus.STOPPED) {
×
165
      final String exceptionMessage =
×
166
          String.format("Failed to stop pipe %s, the pipe is already stop", pipeName);
×
167
      LOGGER.info(exceptionMessage);
×
168
      throw new PipeException(exceptionMessage);
×
169
    }
170
    if (pipeStatus == PipeStatus.DROPPED) {
×
171
      final String exceptionMessage =
×
172
          String.format("Failed to stop pipe %s, the pipe is already dropped", pipeName);
×
173
      LOGGER.info(exceptionMessage);
×
174
      throw new PipeException(exceptionMessage);
×
175
    }
176
  }
×
177

178
  public void checkBeforeDropPipe(String pipeName) {
179
    acquireReadLock();
×
180
    try {
181
      checkBeforeDropPipeInternal(pipeName);
×
182
    } finally {
183
      releaseReadLock();
×
184
    }
185
  }
×
186

187
  private void checkBeforeDropPipeInternal(String pipeName) {
188
    if (LOGGER.isDebugEnabled()) {
×
189
      LOGGER.debug(
×
190
          "Check before drop pipe {}, pipe exists: {}.",
191
          pipeName,
192
          isPipeExisted(pipeName) ? "true" : "false");
×
193
    }
194
    // No matter whether the pipe exists, we allow the drop operation executed on all nodes to
195
    // ensure the consistency.
196
    // DO NOTHING HERE!
197
  }
×
198

199
  public boolean isPipeExisted(String pipeName) {
200
    acquireReadLock();
1✔
201
    try {
202
      return pipeMetaKeeper.containsPipeMeta(pipeName);
1✔
203
    } finally {
204
      releaseReadLock();
1✔
205
    }
206
  }
207

208
  private PipeStatus getPipeStatus(String pipeName) {
209
    acquireReadLock();
×
210
    try {
211
      return pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get();
×
212
    } finally {
213
      releaseReadLock();
×
214
    }
215
  }
216

217
  /////////////////////////////// Pipe Task Management ///////////////////////////////
218

219
  public TSStatus createPipe(CreatePipePlanV2 plan) {
220
    acquireWriteLock();
1✔
221
    try {
222
      pipeMetaKeeper.addPipeMeta(
1✔
223
          plan.getPipeStaticMeta().getPipeName(),
1✔
224
          new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
1✔
225
      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
226
    } finally {
227
      releaseWriteLock();
1✔
228
    }
229
  }
230

231
  public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
232
    acquireWriteLock();
1✔
233
    try {
234
      pipeMetaKeeper
1✔
235
          .getPipeMeta(plan.getPipeName())
1✔
236
          .getRuntimeMeta()
1✔
237
          .getStatus()
1✔
238
          .set(plan.getPipeStatus());
1✔
239
      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
240
    } finally {
241
      releaseWriteLock();
1✔
242
    }
243
  }
244

245
  public TSStatus dropPipe(DropPipePlanV2 plan) {
246
    acquireWriteLock();
1✔
247
    try {
248
      pipeMetaKeeper.removePipeMeta(plan.getPipeName());
1✔
249
      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
1✔
250
    } finally {
251
      releaseWriteLock();
1✔
252
    }
253
  }
254

255
  public DataSet showPipes() {
256
    acquireReadLock();
×
257
    try {
258
      return new PipeTableResp(
×
259
          new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
×
260
          StreamSupport.stream(getPipeMetaList().spliterator(), false)
×
261
              .collect(Collectors.toList()));
×
262
    } finally {
263
      releaseReadLock();
×
264
    }
265
  }
266

267
  public Iterable<PipeMeta> getPipeMetaList() {
268
    acquireReadLock();
×
269
    try {
270
      return pipeMetaKeeper.getPipeMetaList();
×
271
    } finally {
272
      releaseReadLock();
×
273
    }
274
  }
275

276
  public PipeMeta getPipeMetaByPipeName(String pipeName) {
277
    acquireReadLock();
×
278
    try {
279
      return pipeMetaKeeper.getPipeMetaByPipeName(pipeName);
×
280
    } finally {
281
      releaseReadLock();
×
282
    }
283
  }
284

285
  public boolean isEmpty() {
286
    acquireReadLock();
1✔
287
    try {
288
      return pipeMetaKeeper.isEmpty();
1✔
289
    } finally {
290
      releaseReadLock();
1✔
291
    }
292
  }
293

294
  /////////////////////////////// Pipe Runtime Management ///////////////////////////////
295

296
  /** Handle the data region leader change event and update the pipe task meta accordingly. */
297
  public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
298
    acquireWriteLock();
×
299
    try {
300
      return handleLeaderChangeInternal(plan);
×
301
    } finally {
302
      releaseWriteLock();
×
303
    }
304
  }
305

306
  private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan) {
307
    plan.getConsensusGroupId2NewDataRegionLeaderIdMap()
×
308
        .forEach(
×
309
            (dataRegionGroupId, newDataRegionLeader) ->
310
                pipeMetaKeeper
×
311
                    .getPipeMetaList()
×
312
                    .forEach(
×
313
                        pipeMeta -> {
314
                          final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMap =
×
315
                              pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
×
316

317
                          if (consensusGroupIdToTaskMetaMap.containsKey(dataRegionGroupId)) {
×
318
                            // If the data region leader is -1, it means the data region is
319
                            // removed
320
                            if (newDataRegionLeader != -1) {
×
321
                              consensusGroupIdToTaskMetaMap
×
322
                                  .get(dataRegionGroupId)
×
323
                                  .setLeaderDataNodeId(newDataRegionLeader);
×
324
                            } else {
325
                              consensusGroupIdToTaskMetaMap.remove(dataRegionGroupId);
×
326
                            }
327
                          } else {
328
                            // If CN does not contain the data region group, it means the data
329
                            // region group is newly added.
330
                            if (newDataRegionLeader != -1) {
×
331
                              consensusGroupIdToTaskMetaMap.put(
×
332
                                  dataRegionGroupId,
333
                                  new PipeTaskMeta(
334
                                      new MinimumProgressIndex(), newDataRegionLeader));
×
335
                            }
336
                            // else:
337
                            // "The pipe task meta does not contain the data region group {} or
338
                            // the data region group has already been removed"
339
                          }
340
                        }));
×
341

342
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
343
  }
344

345
  /**
346
   * Replace the local pipeMetas by the pipeMetas from the leader ConfigNode.
347
   *
348
   * @param plan The plan containing all the pipeMetas from leader ConfigNode
349
   * @return {@link TSStatusCode#SUCCESS_STATUS}
350
   */
351
  public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
352
    acquireWriteLock();
×
353
    try {
354
      return handleMetaChangesInternal(plan);
×
355
    } finally {
356
      releaseWriteLock();
×
357
    }
358
  }
359

360
  private TSStatus handleMetaChangesInternal(PipeHandleMetaChangePlan plan) {
361
    LOGGER.info("Handling pipe meta changes ...");
×
362

363
    pipeMetaKeeper.clear();
×
364

365
    plan.getPipeMetaList()
×
366
        .forEach(
×
367
            pipeMeta -> {
368
              pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
×
369
              LOGGER.info("Recording pipe meta: {}", pipeMeta);
×
370
            });
×
371

372
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
373
  }
374

375
  public boolean hasExceptions(String pipeName) {
376
    acquireReadLock();
×
377
    try {
378
      return hasExceptionsInternal(pipeName);
×
379
    } finally {
380
      releaseReadLock();
×
381
    }
382
  }
383

384
  private boolean hasExceptionsInternal(String pipeName) {
385
    if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
×
386
      return false;
×
387
    }
388

389
    final PipeRuntimeMeta runtimeMeta = pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta();
×
390
    final Map<Integer, PipeRuntimeException> exceptionMap =
×
391
        runtimeMeta.getDataNodeId2PipeRuntimeExceptionMap();
×
392

393
    if (!exceptionMap.isEmpty()) {
×
394
      return true;
×
395
    }
396

397
    final AtomicBoolean hasException = new AtomicBoolean(false);
×
398
    runtimeMeta
×
399
        .getConsensusGroupId2TaskMetaMap()
×
400
        .values()
×
401
        .forEach(
×
402
            pipeTaskMeta -> {
403
              if (pipeTaskMeta.getExceptionMessages().iterator().hasNext()) {
×
404
                hasException.set(true);
×
405
              }
406
            });
×
407
    return hasException.get();
×
408
  }
409

410
  public boolean isStoppedByRuntimeException(String pipeName) {
411
    acquireReadLock();
×
412
    try {
413
      return isStoppedByRuntimeExceptionInternal(pipeName);
×
414
    } finally {
415
      releaseReadLock();
×
416
    }
417
  }
418

419
  private boolean isStoppedByRuntimeExceptionInternal(String pipeName) {
420
    return pipeMetaKeeper.containsPipeMeta(pipeName)
×
421
        && pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getIsStoppedByRuntimeException();
×
422
  }
423

424
  /**
425
   * Clear the exceptions of, and set the isAutoStopped flag to false for a pipe locally after it
426
   * starts successfully.
427
   *
428
   * <p>If there are exceptions cleared or flag changed, the messages will then be updated to all
429
   * the nodes through {@link PipeHandleMetaChangeProcedure}.
430
   *
431
   * @param pipeName The name of the pipe to be clear exception
432
   */
433
  public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(String pipeName) {
434
    acquireWriteLock();
×
435
    try {
436
      clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
×
437
    } finally {
438
      releaseWriteLock();
×
439
    }
440
  }
×
441

442
  private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) {
443
    if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
×
444
      return;
×
445
    }
446

447
    final PipeRuntimeMeta runtimeMeta = pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta();
×
448

449
    // To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false
450
    runtimeMeta.setIsStoppedByRuntimeException(false);
×
451

452
    runtimeMeta.setExceptionsClearTime(System.currentTimeMillis());
×
453

454
    final Map<Integer, PipeRuntimeException> exceptionMap =
×
455
        runtimeMeta.getDataNodeId2PipeRuntimeExceptionMap();
×
456
    if (!exceptionMap.isEmpty()) {
×
457
      exceptionMap.clear();
×
458
    }
459

460
    runtimeMeta
×
461
        .getConsensusGroupId2TaskMetaMap()
×
462
        .values()
×
463
        .forEach(
×
464
            pipeTaskMeta -> {
465
              if (pipeTaskMeta.getExceptionMessages().iterator().hasNext()) {
×
466
                pipeTaskMeta.clearExceptionMessages();
×
467
              }
468
            });
×
469
  }
×
470

471
  public void setIsStoppedByRuntimeExceptionToFalse(String pipeName) {
472
    acquireWriteLock();
×
473
    try {
474
      setIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
×
475
    } finally {
476
      releaseWriteLock();
×
477
    }
478
  }
×
479

480
  private void setIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) {
481
    if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
×
482
      return;
×
483
    }
484

485
    pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().setIsStoppedByRuntimeException(false);
×
486
  }
×
487

488
  /**
489
   * Record the exceptions of all pipes locally if they encountered failure when pushing pipe meta.
490
   *
491
   * <p>If there are exceptions recorded, the related pipes will be stopped, and the exception
492
   * messages will then be updated to all the nodes through {@link PipeHandleMetaChangeProcedure}.
493
   *
494
   * @param respMap The responseMap after pushing pipe meta
495
   * @return true if there are exceptions encountered
496
   */
497
  public boolean recordPushPipeMetaExceptions(Map<Integer, TPushPipeMetaResp> respMap) {
498
    acquireWriteLock();
×
499
    try {
500
      return recordPushPipeMetaExceptionsInternal(respMap);
×
501
    } finally {
502
      releaseWriteLock();
×
503
    }
504
  }
505

506
  private boolean recordPushPipeMetaExceptionsInternal(Map<Integer, TPushPipeMetaResp> respMap) {
507
    boolean hasException = false;
×
508

509
    for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) {
×
510
      final int dataNodeId = respEntry.getKey();
×
511
      final TPushPipeMetaResp resp = respEntry.getValue();
×
512

513
      if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
×
514
        hasException = true;
×
515

516
        if (!resp.isSetExceptionMessages()) {
×
517
          // The pushPipeMeta process on dataNode encountered internal errors
518
          continue;
×
519
        }
520

521
        resp.getExceptionMessages()
×
522
            .forEach(
×
523
                message -> {
524
                  if (pipeMetaKeeper.containsPipeMeta(message.getPipeName())) {
×
525
                    final PipeRuntimeMeta runtimeMeta =
×
526
                        pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
×
527

528
                    // Mark the status of the pipe with exception as stopped
529
                    runtimeMeta.getStatus().set(PipeStatus.STOPPED);
×
530
                    runtimeMeta.setIsStoppedByRuntimeException(true);
×
531

532
                    final Map<Integer, PipeRuntimeException> exceptionMap =
×
533
                        runtimeMeta.getDataNodeId2PipeRuntimeExceptionMap();
×
534
                    if (!exceptionMap.containsKey(dataNodeId)
×
535
                        || exceptionMap.get(dataNodeId).getTimeStamp() < message.getTimeStamp()) {
×
536
                      exceptionMap.put(
×
537
                          dataNodeId,
×
538
                          new PipeRuntimeCriticalException(
539
                              message.getMessage(), message.getTimeStamp()));
×
540
                    }
541
                  }
542
                });
×
543
      }
544
    }
×
545

546
    return hasException;
×
547
  }
548

549
  public boolean autoRestart() {
550
    acquireWriteLock();
×
551
    try {
552
      return autoRestartInternal();
×
553
    } finally {
554
      releaseWriteLock();
×
555
    }
556
  }
557

558
  /**
559
   * Set the statuses of all the pipes stopped automatically because of critical exceptions to
560
   * {@link PipeStatus#RUNNING} in order to restart them.
561
   *
562
   * @return true if there are pipes need restarting
563
   */
564
  private boolean autoRestartInternal() {
565
    final AtomicBoolean needRestart = new AtomicBoolean(false);
×
566
    final List<String> pipeToRestart = new LinkedList<>();
×
567

568
    pipeMetaKeeper
×
569
        .getPipeMetaList()
×
570
        .forEach(
×
571
            pipeMeta -> {
572
              if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) {
×
573
                pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
×
574

575
                needRestart.set(true);
×
576
                pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName());
×
577
              }
578
            });
×
579

580
    if (needRestart.get()) {
×
581
      LOGGER.info("PipeMetaSyncer is trying to restart the pipes: {}", pipeToRestart);
×
582
    }
583
    return needRestart.get();
×
584
  }
585

586
  public void handleSuccessfulRestart() {
587
    acquireWriteLock();
×
588
    try {
589
      handleSuccessfulRestartInternal();
×
590
    } finally {
591
      releaseWriteLock();
×
592
    }
593
  }
×
594

595
  /**
596
   * Clear the exceptions of, and set the isAutoStopped flag to false for the successfully restarted
597
   * pipe.
598
   */
599
  private void handleSuccessfulRestartInternal() {
600
    pipeMetaKeeper
×
601
        .getPipeMetaList()
×
602
        .forEach(
×
603
            pipeMeta -> {
604
              if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) {
×
605
                clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
×
606
                    pipeMeta.getStaticMeta().getPipeName());
×
607
              }
608
            });
×
609
  }
×
610

611
  /////////////////////////////// Snapshot ///////////////////////////////
612

613
  @Override
614
  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
615
    acquireReadLock();
1✔
616
    try {
617
      final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
1✔
618
      if (snapshotFile.exists() && snapshotFile.isFile()) {
1✔
619
        LOGGER.error(
×
620
            "Failed to take snapshot, because snapshot file [{}] is already exist.",
621
            snapshotFile.getAbsolutePath());
×
622
        return false;
×
623
      }
624

625
      try (final FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
1✔
626
        pipeMetaKeeper.processTakeSnapshot(fileOutputStream);
1✔
627
      }
628
      return true;
1✔
629
    } finally {
630
      releaseReadLock();
1✔
631
    }
632
  }
633

634
  @Override
635
  public void processLoadSnapshot(File snapshotDir) throws IOException {
636
    acquireWriteLock();
1✔
637
    try {
638
      final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
1✔
639
      if (!snapshotFile.exists() || !snapshotFile.isFile()) {
1✔
640
        LOGGER.error(
×
641
            "Failed to load snapshot,snapshot file [{}] is not exist.",
642
            snapshotFile.getAbsolutePath());
×
643
        return;
×
644
      }
645

646
      try (final FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
1✔
647
        pipeMetaKeeper.processLoadSnapshot(fileInputStream);
1✔
648
      }
649
    } finally {
650
      releaseWriteLock();
1✔
651
    }
652
  }
1✔
653

654
  /////////////////////////////// hashCode & equals ///////////////////////////////
655

656
  @Override
657
  public int hashCode() {
658
    return pipeMetaKeeper.hashCode();
×
659
  }
660

661
  @Override
662
  public boolean equals(Object obj) {
663
    if (this == obj) {
1✔
664
      return true;
×
665
    }
666
    if (obj == null || getClass() != obj.getClass()) {
1✔
667
      return false;
×
668
    }
669
    PipeTaskInfo other = (PipeTaskInfo) obj;
1✔
670
    return pipeMetaKeeper.equals(other.pipeMetaKeeper);
1✔
671
  }
672

673
  @Override
674
  public String toString() {
675
    return pipeMetaKeeper.toString();
1✔
676
  }
677
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc