• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9711

pending completion
#9711

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6089] Improved the lock behaviour of the pipe heartbeat to avoid causing DataNode unknown (#10714) (#10718)

12 of 12 new or added lines in 2 files covered. (100.0%)

79232 of 165180 relevant lines covered (47.97%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.67
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.task;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
26
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
28
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
29
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
30
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
31
import org.apache.iotdb.db.conf.IoTDBConfig;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.pipe.agent.PipeAgent;
34
import org.apache.iotdb.db.pipe.task.PipeBuilder;
35
import org.apache.iotdb.db.pipe.task.PipeTask;
36
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
37
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
38
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
39
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
40
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
41
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
42
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
43

44
import org.apache.thrift.TException;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
import javax.validation.constraints.NotNull;
49

50
import java.io.IOException;
51
import java.nio.ByteBuffer;
52
import java.util.ArrayList;
53
import java.util.Collections;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.Objects;
57
import java.util.Set;
58
import java.util.concurrent.atomic.AtomicReference;
59
import java.util.stream.Collectors;
60

61
/**
62
 * State transition diagram of a pipe task:
63
 *
64
 * <p><code>
65
 * |----------------|                     |---------| --> start pipe --> |---------|                   |---------|
66
 * | initial status | --> create pipe --> | STOPPED |                    | RUNNING | --> drop pipe --> | DROPPED |
67
 * |----------------|                     |---------| <-- stop  pipe <-- |---------|                   |---------|
68
 *                                             |                                                            |
69
 *                                             | ----------------------> drop pipe -----------------------> |
70
 * </code>
71
 *
72
 * <p>Other transitions are not allowed, will be ignored when received in the pipe task agent.
73
 */
74
public class PipeTaskAgent {
75

76
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
1✔
77
  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
1✔
78

79
  private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
80
  private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
81

82
  private final PipeMetaKeeper pipeMetaKeeper;
83
  private final PipeTaskManager pipeTaskManager;
84

85
  public PipeTaskAgent() {
1✔
86
    pipeMetaKeeper = new PipeMetaKeeper();
1✔
87
    pipeTaskManager = new PipeTaskManager();
1✔
88
  }
1✔
89

90
  ////////////////////////// PipeMeta Lock Control //////////////////////////
91

92
  private void acquireReadLock() {
93
    pipeMetaKeeper.acquireReadLock();
×
94
  }
×
95

96
  public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
97
    try {
98
      return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
×
99
    } catch (InterruptedException e) {
×
100
      Thread.currentThread().interrupt();
×
101
      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
×
102
      return false;
×
103
    }
104
  }
105

106
  private void releaseReadLock() {
107
    pipeMetaKeeper.releaseReadLock();
×
108
  }
×
109

110
  private void acquireWriteLock() {
111
    pipeMetaKeeper.acquireWriteLock();
×
112
  }
×
113

114
  private void releaseWriteLock() {
115
    pipeMetaKeeper.releaseWriteLock();
×
116
  }
×
117

118
  ////////////////////////// Pipe Task Management Entry //////////////////////////
119

120
  public synchronized List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
121
      List<PipeMeta> pipeMetaListFromConfigNode) {
122
    acquireWriteLock();
×
123
    try {
124
      return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
×
125
    } finally {
126
      releaseWriteLock();
×
127
    }
128
  }
129

130
  private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
131
      List<PipeMeta> pipeMetaListFromConfigNode) {
132
    // Do nothing if data node is removing or removed
133
    if (PipeAgent.runtime().isShutdown()) {
×
134
      return Collections.emptyList();
×
135
    }
136

137
    final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
×
138

139
    // Iterate through pipe meta list from config node, check if pipe meta exists on data node
140
    // or has changed
141
    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
×
142
      final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
143

144
      try {
145
        final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
×
146

147
        // If pipe meta does not exist on data node, create a new pipe
148
        if (metaOnDataNode == null) {
×
149
          if (createPipe(metaFromConfigNode)) {
×
150
            // If the status recorded in config node is RUNNING, start the pipe
151
            startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
152
          }
153
          // If the status recorded in config node is STOPPED or DROPPED, do nothing
154
          continue;
×
155
        }
156

157
        // If pipe meta exists on data node, check if it has changed
158
        final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
×
159
        final PipeStaticMeta staticMetaFromConfigNode = metaFromConfigNode.getStaticMeta();
×
160

161
        // First check if pipe static meta has changed, if so, drop the pipe and create a new one
162
        if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
×
163
          dropPipe(pipeName);
×
164
          if (createPipe(metaFromConfigNode)) {
×
165
            startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
166
          }
167
          // If the status is STOPPED or DROPPED, do nothing
168
          continue;
×
169
        }
170

171
        // Then check if pipe runtime meta has changed, if so, update the pipe
172
        final PipeRuntimeMeta runtimeMetaOnDataNode = metaOnDataNode.getRuntimeMeta();
×
173
        final PipeRuntimeMeta runtimeMetaFromConfigNode = metaFromConfigNode.getRuntimeMeta();
×
174
        handlePipeRuntimeMetaChanges(
×
175
            staticMetaFromConfigNode, runtimeMetaFromConfigNode, runtimeMetaOnDataNode);
176
      } catch (Exception e) {
×
177
        final String errorMessage =
×
178
            String.format(
×
179
                "Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
×
180
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
181
        exceptionMessages.add(
×
182
            new TPushPipeMetaRespExceptionMessage(
183
                pipeName, errorMessage, System.currentTimeMillis()));
×
184
      }
×
185
    }
×
186

187
    // Check if there are pipes on data node that do not exist on config node, if so, drop them
188
    final Set<String> pipeNamesFromConfigNode =
×
189
        pipeMetaListFromConfigNode.stream()
×
190
            .map(meta -> meta.getStaticMeta().getPipeName())
×
191
            .collect(Collectors.toSet());
×
192
    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
×
193
      final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
×
194

195
      try {
196
        if (!pipeNamesFromConfigNode.contains(pipeName)) {
×
197
          dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
×
198
        }
199
      } catch (Exception e) {
×
200
        // Do not record the error messages for the pipes don't exist on ConfigNode.
201
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
202
      }
×
203
    }
×
204

205
    return exceptionMessages;
×
206
  }
207

208
  private void handlePipeRuntimeMetaChanges(
209
      @NotNull PipeStaticMeta pipeStaticMeta,
210
      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
211
      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
212
    // 1. Handle data region group leader changed first
213
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapFromConfigNode =
×
214
        runtimeMetaFromConfigNode.getConsensusGroupId2TaskMetaMap();
×
215
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapOnDataNode =
×
216
        runtimeMetaOnDataNode.getConsensusGroupId2TaskMetaMap();
×
217

218
    // 1.1 Iterate over all consensus group ids in config node's pipe runtime meta, decide if we
219
    // need to drop and create a new task for each consensus group id
220
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
221
        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
×
222
      final TConsensusGroupId consensusGroupIdFromConfigNode = entryFromConfigNode.getKey();
×
223

224
      final PipeTaskMeta taskMetaFromConfigNode = entryFromConfigNode.getValue();
×
225
      final PipeTaskMeta taskMetaOnDataNode =
×
226
          consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
×
227

228
      // If task meta does not exist on data node, create a new task
229
      if (taskMetaOnDataNode == null) {
×
230
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
231
        // We keep the new created task's status consistent with the status recorded in data node's
232
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
233
        // is not reliable, but we will have a check later to make sure the status is correct.
234
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
235
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
236
        }
237
        continue;
238
      }
239

240
      // If task meta exists on data node, check if it has changed
241
      final int dataNodeIdFromConfigNode = taskMetaFromConfigNode.getLeaderDataNodeId();
×
242
      final int dataNodeIdOnDataNode = taskMetaOnDataNode.getLeaderDataNodeId();
×
243

244
      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
×
245
        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
246
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
247
        // We keep the new created task's status consistent with the status recorded in data node's
248
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
249
        // is not reliable, but we will have a check later to make sure the status is correct.
250
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
251
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
252
        }
253
      }
254
    }
×
255

256
    // 1.2 Iterate over all consensus group ids on data node's pipe runtime meta, decide if we need
257
    // to drop any task. we do not need to create any new task here because we have already done
258
    // that in 1.1.
259
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
260
        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
×
261
      final TConsensusGroupId consensusGroupIdOnDataNode = entryOnDataNode.getKey();
×
262
      final PipeTaskMeta taskMetaFromConfigNode =
×
263
          consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
×
264
      if (taskMetaFromConfigNode == null) {
×
265
        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
×
266
      }
267
    }
×
268

269
    // 2. Handle pipe runtime meta status changes
270
    final PipeStatus statusFromConfigNode = runtimeMetaFromConfigNode.getStatus().get();
×
271
    final PipeStatus statusOnDataNode = runtimeMetaOnDataNode.getStatus().get();
×
272
    if (statusFromConfigNode == statusOnDataNode) {
×
273
      return;
×
274
    }
275

276
    switch (statusFromConfigNode) {
×
277
      case RUNNING:
278
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
×
279
          startPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
280
        } else {
281
          throw new IllegalStateException(
×
282
              String.format(
×
283
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
284
        }
285
        break;
286
      case STOPPED:
287
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
×
288
          stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
289
        } else {
290
          throw new IllegalStateException(
×
291
              String.format(
×
292
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
293
        }
294
        break;
295
      case DROPPED:
296
        // This should not happen, but we still handle it here
297
        dropPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
298
        break;
×
299
      default:
300
        throw new IllegalStateException(
×
301
            String.format(
×
302
                MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, pipeStaticMeta.getPipeName()));
×
303
    }
304
  }
×
305

306
  public synchronized void dropAllPipeTasks() {
307
    acquireWriteLock();
×
308
    try {
309
      dropAllPipeTasksInternal();
×
310
    } finally {
311
      releaseWriteLock();
×
312
    }
313
  }
×
314

315
  private void dropAllPipeTasksInternal() {
316
    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
317
      try {
318
        dropPipe(
×
319
            pipeMeta.getStaticMeta().getPipeName(), pipeMeta.getStaticMeta().getCreationTime());
×
320
      } catch (final Exception e) {
×
321
        LOGGER.warn(
×
322
            "Failed to drop pipe {} with creation time {}",
323
            pipeMeta.getStaticMeta().getPipeName(),
×
324
            pipeMeta.getStaticMeta().getCreationTime(),
×
325
            e);
326
      }
×
327
    }
×
328
  }
×
329

330
  public synchronized void stopAllPipesWithCriticalException() {
331
    acquireWriteLock();
×
332
    try {
333
      stopAllPipesWithCriticalExceptionInternal();
×
334
    } finally {
335
      releaseWriteLock();
×
336
    }
337
  }
×
338

339
  private void stopAllPipesWithCriticalExceptionInternal() {
340
    pipeMetaKeeper
×
341
        .getPipeMetaList()
×
342
        .forEach(
×
343
            pipeMeta -> {
344
              final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
×
345
              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
×
346

347
              if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
×
348
                runtimeMeta
×
349
                    .getConsensusGroupId2TaskMetaMap()
×
350
                    .values()
×
351
                    .forEach(
×
352
                        pipeTaskMeta -> {
353
                          for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
×
354
                            if (e instanceof PipeRuntimeCriticalException) {
×
355
                              stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
×
356
                              return;
×
357
                            }
358
                          }
×
359
                        });
×
360
              }
361
            });
×
362
  }
×
363

364
  ////////////////////////// Manage by Pipe Name //////////////////////////
365

366
  /**
367
   * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create
368
   * the pipe and return true.
369
   *
370
   * @param pipeMetaFromConfigNode pipe meta from config node
371
   * @return true if the pipe is created successfully and should be started, false if the pipe
372
   *     already exists or is created but should not be started
373
   * @throws IllegalStateException if the status is illegal
374
   */
375
  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
376
    final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
377
    final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
×
378

379
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
380
    if (existedPipeMeta != null) {
×
381
      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
×
382
        final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
383
        switch (status) {
×
384
          case STOPPED:
385
          case RUNNING:
386
            if (LOGGER.isInfoEnabled()) {
×
387
              LOGGER.info(
×
388
                  "Pipe {} (creation time = {}) has already been created. "
389
                      + "Current status = {}. Skip creating.",
390
                  pipeName,
391
                  creationTime,
×
392
                  status.name());
×
393
            }
394
            return false;
×
395
          case DROPPED:
396
            if (LOGGER.isInfoEnabled()) {
×
397
              LOGGER.info(
×
398
                  "Pipe {} (creation time = {}) has already been dropped, "
399
                      + "but the pipe task meta has not been cleaned up. "
400
                      + "Current status = {}. Try dropping the pipe and recreating it.",
401
                  pipeName,
402
                  creationTime,
×
403
                  status.name());
×
404
            }
405
            // Break to drop the pipe and recreate it
406
            break;
407
          default:
408
            throw new IllegalStateException(
×
409
                MESSAGE_UNEXPECTED_PIPE_STATUS
410
                    + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
411
        }
412
      }
413

414
      // Drop the pipe if
415
      // 1. The pipe with the same name but with different creation time has been created before
416
      // 2. The pipe with the same name and the same creation time has been dropped before, but the
417
      //  pipe task meta has not been cleaned up
418
      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
×
419
    }
420

421
    // Create pipe tasks and trigger create() method for each pipe task
422
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
423
        new PipeBuilder(pipeMetaFromConfigNode).build();
×
424
    for (PipeTask pipeTask : pipeTasks.values()) {
×
425
      pipeTask.create();
×
426
    }
×
427
    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks);
×
428

429
    // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status
430
    // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial
431
    // status of a pipe, which makes the status transition logic simpler.
432
    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
×
433
        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
×
434
    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING;
×
435
    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
×
436

437
    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
×
438

439
    // If the pipe status from config node is RUNNING, we will start the pipe later.
440
    return needToStartPipe;
×
441
  }
442

443
  private void dropPipe(String pipeName, long creationTime) {
444
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
445

446
    if (existedPipeMeta == null) {
×
447
      LOGGER.info(
×
448
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
449
              + "Skip dropping.",
450
          pipeName,
451
          creationTime);
×
452
      return;
×
453
    }
454
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
455
      LOGGER.info(
×
456
          "Pipe {} (creation time = {}) has been created but does not match "
457
              + "the creation time ({}) in dropPipe request. Skip dropping.",
458
          pipeName,
459
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
460
          creationTime);
×
461
      return;
×
462
    }
463

464
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
465
    // but the pipe task meta has not been cleaned up (in case of failure when executing
466
    // dropPipeTaskByConsensusGroup).
467
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
468

469
    // Drop pipe tasks and trigger drop() method for each pipe task
470
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
471
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
472
    if (pipeTasks == null) {
×
473
      LOGGER.info(
×
474
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
475
              + "Skip dropping.",
476
          pipeName,
477
          creationTime);
×
478
      return;
×
479
    }
480
    for (PipeTask pipeTask : pipeTasks.values()) {
×
481
      pipeTask.drop();
×
482
    }
×
483

484
    // Remove pipe meta from pipe meta keeper
485
    pipeMetaKeeper.removePipeMeta(pipeName);
×
486
  }
×
487

488
  private void dropPipe(String pipeName) {
489
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
490

491
    if (existedPipeMeta == null) {
×
492
      LOGGER.info(
×
493
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
494
      return;
×
495
    }
496

497
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
498
    // but the pipe task meta has not been cleaned up (in case of failure when executing
499
    // dropPipeTaskByConsensusGroup).
500
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
501

502
    // Drop pipe tasks and trigger drop() method for each pipe task
503
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
504
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
505
    if (pipeTasks == null) {
×
506
      LOGGER.info(
×
507
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
508
      return;
×
509
    }
510
    for (PipeTask pipeTask : pipeTasks.values()) {
×
511
      pipeTask.drop();
×
512
    }
×
513

514
    // Remove pipe meta from pipe meta keeper
515
    pipeMetaKeeper.removePipeMeta(pipeName);
×
516
  }
×
517

518
  private void startPipe(String pipeName, long creationTime) {
519
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
520

521
    if (existedPipeMeta == null) {
×
522
      LOGGER.info(
×
523
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
524
              + "Skip starting.",
525
          pipeName,
526
          creationTime);
×
527
      return;
×
528
    }
529
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
530
      LOGGER.info(
×
531
          "Pipe {} (creation time = {}) has been created but does not match "
532
              + "the creation time ({}) in startPipe request. Skip starting.",
533
          pipeName,
534
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
535
          creationTime);
×
536
      return;
×
537
    }
538

539
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
540
    switch (status) {
×
541
      case STOPPED:
542
        if (LOGGER.isInfoEnabled()) {
×
543
          LOGGER.info(
×
544
              "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
545
              pipeName,
546
              creationTime,
×
547
              status.name());
×
548
        }
549
        break;
550
      case RUNNING:
551
        if (LOGGER.isInfoEnabled()) {
×
552
          LOGGER.info(
×
553
              "Pipe {} (creation time = {}) has already been started. Current status = {}. "
554
                  + "Skip starting.",
555
              pipeName,
556
              creationTime,
×
557
              status.name());
×
558
        }
559
        return;
×
560
      case DROPPED:
561
        if (LOGGER.isInfoEnabled()) {
×
562
          LOGGER.info(
×
563
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
564
                  + "Skip starting.",
565
              pipeName,
566
              creationTime,
×
567
              status.name());
×
568
        }
569
        return;
×
570
      default:
571
        throw new IllegalStateException(
×
572
            MESSAGE_UNEXPECTED_PIPE_STATUS
573
                + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
574
    }
575

576
    // Trigger start() method for each pipe task
577
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
578
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
579
    if (pipeTasks == null) {
×
580
      LOGGER.info(
×
581
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
582
              + "Skip starting.",
583
          pipeName,
584
          creationTime);
×
585
      return;
×
586
    }
587
    for (PipeTask pipeTask : pipeTasks.values()) {
×
588
      pipeTask.start();
×
589
    }
×
590

591
    // Set pipe meta status to RUNNING
592
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
×
593
    // Clear exception messages if started successfully
594
    existedPipeMeta
×
595
        .getRuntimeMeta()
×
596
        .getConsensusGroupId2TaskMetaMap()
×
597
        .values()
×
598
        .forEach(PipeTaskMeta::clearExceptionMessages);
×
599
  }
×
600

601
  private void stopPipe(String pipeName, long creationTime) {
602
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
603

604
    if (existedPipeMeta == null) {
×
605
      LOGGER.info(
×
606
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
607
              + "Skip stopping.",
608
          pipeName,
609
          creationTime);
×
610
      return;
×
611
    }
612
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
613
      LOGGER.info(
×
614
          "Pipe {} (creation time = {}) has been created but does not match "
615
              + "the creation time ({}) in stopPipe request. Skip stopping.",
616
          pipeName,
617
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
618
          creationTime);
×
619
      return;
×
620
    }
621

622
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
623
    switch (status) {
×
624
      case STOPPED:
625
        if (LOGGER.isInfoEnabled()) {
×
626
          LOGGER.info(
×
627
              "Pipe {} (creation time = {}) has already been stopped. Current status = {}. "
628
                  + "Skip stopping.",
629
              pipeName,
630
              creationTime,
×
631
              status.name());
×
632
        }
633
        return;
×
634
      case RUNNING:
635
        if (LOGGER.isInfoEnabled()) {
×
636
          LOGGER.info(
×
637
              "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
638
              pipeName,
639
              creationTime,
×
640
              status.name());
×
641
        }
642
        break;
643
      case DROPPED:
644
        if (LOGGER.isInfoEnabled()) {
×
645
          LOGGER.info(
×
646
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
647
                  + "Skip stopping.",
648
              pipeName,
649
              creationTime,
×
650
              status.name());
×
651
        }
652
        return;
×
653
      default:
654
        throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + status.name());
×
655
    }
656

657
    // Trigger stop() method for each pipe task
658
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
659
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
660
    if (pipeTasks == null) {
×
661
      LOGGER.info(
×
662
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
663
              + "Skip stopping.",
664
          pipeName,
665
          creationTime);
×
666
      return;
×
667
    }
668
    for (PipeTask pipeTask : pipeTasks.values()) {
×
669
      pipeTask.stop();
×
670
    }
×
671

672
    // Set pipe meta status to STOPPED
673
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
×
674
  }
×
675

676
  ///////////////////////// Manage by dataRegionGroupId /////////////////////////
677

678
  private void createPipeTask(
679
      TConsensusGroupId consensusGroupId,
680
      PipeStaticMeta pipeStaticMeta,
681
      PipeTaskMeta pipeTaskMeta) {
682
    if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
×
683
      final PipeTask pipeTask =
×
684
          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
×
685
      pipeTask.create();
×
686
      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
×
687
    }
688
    pipeMetaKeeper
×
689
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
690
        .getRuntimeMeta()
×
691
        .getConsensusGroupId2TaskMetaMap()
×
692
        .put(consensusGroupId, pipeTaskMeta);
×
693
  }
×
694

695
  private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
696
    pipeMetaKeeper
×
697
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
698
        .getRuntimeMeta()
×
699
        .getConsensusGroupId2TaskMetaMap()
×
700
        .remove(dataRegionGroupId);
×
701
    final PipeTask pipeTask = pipeTaskManager.removePipeTask(pipeStaticMeta, dataRegionGroupId);
×
702
    if (pipeTask != null) {
×
703
      pipeTask.drop();
×
704
    }
705
  }
×
706

707
  private void startPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
708
    final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, dataRegionGroupId);
×
709
    if (pipeTask != null) {
×
710
      pipeTask.start();
×
711
    }
712
  }
×
713

714
  ///////////////////////// Heartbeat /////////////////////////
715

716
  public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
717
      throws TException {
718
    // If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't
719
    // need to be acquired
720
    if (!req.isNeedPipeMetaList()) {
×
721
      return;
×
722
    }
723
    // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat
724
    // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode
725
    if (!tryReadLockWithTimeOut(10)) {
×
726
      return;
×
727
    }
728
    try {
729
      collectPipeMetaListInternal(resp);
×
730
    } finally {
731
      releaseReadLock();
×
732
    }
733
  }
×
734

735
  private void collectPipeMetaListInternal(THeartbeatResp resp) throws TException {
736
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
737
    if (PipeAgent.runtime().isShutdown()) {
×
738
      return;
×
739
    }
740

741
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
742
    try {
743
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
744
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
745
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
746
      }
×
747
    } catch (IOException e) {
×
748
      throw new TException(e);
×
749
    }
×
750
    resp.setPipeMetaList(pipeMetaBinaryList);
×
751
  }
×
752

753
  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
754
      throws TException {
755
    acquireReadLock();
×
756
    try {
757
      collectPipeMetaListInternal(req, resp);
×
758
    } finally {
759
      releaseReadLock();
×
760
    }
761
  }
×
762

763
  private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
764
      throws TException {
765
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
766
    if (PipeAgent.runtime().isShutdown()) {
×
767
      return;
×
768
    }
769
    LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
×
770

771
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
772
    try {
773
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
774
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
775
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
776
      }
×
777
    } catch (IOException e) {
×
778
      throw new TException(e);
×
779
    }
×
780
    resp.setPipeMetaList(pipeMetaBinaryList);
×
781
  }
×
782
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc