• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9703

pending completion
#9703

push

travis_ci

web-flow
[IOTDB-6067] Pipe: Improve the stability of iotdb-thrift-connector-v2 during fault tolerance (avoid OOM) (#10550) (#10719)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit f280381be)

283 of 283 new or added lines in 14 files covered. (100.0%)

79217 of 165033 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.71
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.task;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
26
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
28
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
29
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
30
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
31
import org.apache.iotdb.db.conf.IoTDBConfig;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.pipe.agent.PipeAgent;
34
import org.apache.iotdb.db.pipe.task.PipeBuilder;
35
import org.apache.iotdb.db.pipe.task.PipeTask;
36
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
37
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
38
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
39
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
40
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
41
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
42
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
43

44
import org.apache.thrift.TException;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
import javax.validation.constraints.NotNull;
49

50
import java.io.IOException;
51
import java.nio.ByteBuffer;
52
import java.util.ArrayList;
53
import java.util.Collections;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.Objects;
57
import java.util.Set;
58
import java.util.concurrent.atomic.AtomicReference;
59
import java.util.stream.Collectors;
60

61
/**
62
 * State transition diagram of a pipe task:
63
 *
64
 * <p><code>
65
 * |----------------|                     |---------| --> start pipe --> |---------|                   |---------|
66
 * | initial status | --> create pipe --> | STOPPED |                    | RUNNING | --> drop pipe --> | DROPPED |
67
 * |----------------|                     |---------| <-- stop  pipe <-- |---------|                   |---------|
68
 *                                             |                                                            |
69
 *                                             | ----------------------> drop pipe -----------------------> |
70
 * </code>
71
 *
72
 * <p>Other transitions are not allowed, will be ignored when received in the pipe task agent.
73
 */
74
public class PipeTaskAgent {
75

76
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
1✔
77
  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
1✔
78

79
  private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
80
  private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
81

82
  private final PipeMetaKeeper pipeMetaKeeper;
83
  private final PipeTaskManager pipeTaskManager;
84

85
  public PipeTaskAgent() {
1✔
86
    pipeMetaKeeper = new PipeMetaKeeper();
1✔
87
    pipeTaskManager = new PipeTaskManager();
1✔
88
  }
1✔
89

90
  ////////////////////////// PipeMeta Lock Control //////////////////////////
91

92
  private void acquireReadLock() {
93
    pipeMetaKeeper.acquireReadLock();
×
94
  }
×
95

96
  private void releaseReadLock() {
97
    pipeMetaKeeper.releaseReadLock();
×
98
  }
×
99

100
  private void acquireWriteLock() {
101
    pipeMetaKeeper.acquireWriteLock();
×
102
  }
×
103

104
  private void releaseWriteLock() {
105
    pipeMetaKeeper.releaseWriteLock();
×
106
  }
×
107

108
  ////////////////////////// Pipe Task Management Entry //////////////////////////
109

110
  public synchronized List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
111
      List<PipeMeta> pipeMetaListFromConfigNode) {
112
    acquireWriteLock();
×
113
    try {
114
      return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
×
115
    } finally {
116
      releaseWriteLock();
×
117
    }
118
  }
119

120
  private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
121
      List<PipeMeta> pipeMetaListFromConfigNode) {
122
    // Do nothing if data node is removing or removed
123
    if (PipeAgent.runtime().isShutdown()) {
×
124
      return Collections.emptyList();
×
125
    }
126

127
    final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
×
128

129
    // Iterate through pipe meta list from config node, check if pipe meta exists on data node
130
    // or has changed
131
    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
×
132
      final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
133

134
      try {
135
        final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
×
136

137
        // If pipe meta does not exist on data node, create a new pipe
138
        if (metaOnDataNode == null) {
×
139
          if (createPipe(metaFromConfigNode)) {
×
140
            // If the status recorded in config node is RUNNING, start the pipe
141
            startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
142
          }
143
          // If the status recorded in config node is STOPPED or DROPPED, do nothing
144
          continue;
×
145
        }
146

147
        // If pipe meta exists on data node, check if it has changed
148
        final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
×
149
        final PipeStaticMeta staticMetaFromConfigNode = metaFromConfigNode.getStaticMeta();
×
150

151
        // First check if pipe static meta has changed, if so, drop the pipe and create a new one
152
        if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
×
153
          dropPipe(pipeName);
×
154
          if (createPipe(metaFromConfigNode)) {
×
155
            startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
156
          }
157
          // If the status is STOPPED or DROPPED, do nothing
158
          continue;
×
159
        }
160

161
        // Then check if pipe runtime meta has changed, if so, update the pipe
162
        final PipeRuntimeMeta runtimeMetaOnDataNode = metaOnDataNode.getRuntimeMeta();
×
163
        final PipeRuntimeMeta runtimeMetaFromConfigNode = metaFromConfigNode.getRuntimeMeta();
×
164
        handlePipeRuntimeMetaChanges(
×
165
            staticMetaFromConfigNode, runtimeMetaFromConfigNode, runtimeMetaOnDataNode);
166
      } catch (Exception e) {
×
167
        final String errorMessage =
×
168
            String.format(
×
169
                "Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
×
170
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
171
        exceptionMessages.add(
×
172
            new TPushPipeMetaRespExceptionMessage(
173
                pipeName, errorMessage, System.currentTimeMillis()));
×
174
      }
×
175
    }
×
176

177
    // Check if there are pipes on data node that do not exist on config node, if so, drop them
178
    final Set<String> pipeNamesFromConfigNode =
×
179
        pipeMetaListFromConfigNode.stream()
×
180
            .map(meta -> meta.getStaticMeta().getPipeName())
×
181
            .collect(Collectors.toSet());
×
182
    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
×
183
      final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
×
184

185
      try {
186
        if (!pipeNamesFromConfigNode.contains(pipeName)) {
×
187
          dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
×
188
        }
189
      } catch (Exception e) {
×
190
        // Do not record the error messages for the pipes don't exist on ConfigNode.
191
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
192
      }
×
193
    }
×
194

195
    return exceptionMessages;
×
196
  }
197

198
  private void handlePipeRuntimeMetaChanges(
199
      @NotNull PipeStaticMeta pipeStaticMeta,
200
      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
201
      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
202
    // 1. Handle data region group leader changed first
203
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapFromConfigNode =
×
204
        runtimeMetaFromConfigNode.getConsensusGroupId2TaskMetaMap();
×
205
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapOnDataNode =
×
206
        runtimeMetaOnDataNode.getConsensusGroupId2TaskMetaMap();
×
207

208
    // 1.1 Iterate over all consensus group ids in config node's pipe runtime meta, decide if we
209
    // need to drop and create a new task for each consensus group id
210
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
211
        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
×
212
      final TConsensusGroupId consensusGroupIdFromConfigNode = entryFromConfigNode.getKey();
×
213

214
      final PipeTaskMeta taskMetaFromConfigNode = entryFromConfigNode.getValue();
×
215
      final PipeTaskMeta taskMetaOnDataNode =
×
216
          consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
×
217

218
      // If task meta does not exist on data node, create a new task
219
      if (taskMetaOnDataNode == null) {
×
220
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
221
        // We keep the new created task's status consistent with the status recorded in data node's
222
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
223
        // is not reliable, but we will have a check later to make sure the status is correct.
224
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
225
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
226
        }
227
        continue;
228
      }
229

230
      // If task meta exists on data node, check if it has changed
231
      final int dataNodeIdFromConfigNode = taskMetaFromConfigNode.getLeaderDataNodeId();
×
232
      final int dataNodeIdOnDataNode = taskMetaOnDataNode.getLeaderDataNodeId();
×
233

234
      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
×
235
        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
236
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
237
        // We keep the new created task's status consistent with the status recorded in data node's
238
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
239
        // is not reliable, but we will have a check later to make sure the status is correct.
240
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
241
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
242
        }
243
      }
244
    }
×
245

246
    // 1.2 Iterate over all consensus group ids on data node's pipe runtime meta, decide if we need
247
    // to drop any task. we do not need to create any new task here because we have already done
248
    // that in 1.1.
249
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
250
        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
×
251
      final TConsensusGroupId consensusGroupIdOnDataNode = entryOnDataNode.getKey();
×
252
      final PipeTaskMeta taskMetaFromConfigNode =
×
253
          consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
×
254
      if (taskMetaFromConfigNode == null) {
×
255
        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
×
256
      }
257
    }
×
258

259
    // 2. Handle pipe runtime meta status changes
260
    final PipeStatus statusFromConfigNode = runtimeMetaFromConfigNode.getStatus().get();
×
261
    final PipeStatus statusOnDataNode = runtimeMetaOnDataNode.getStatus().get();
×
262
    if (statusFromConfigNode == statusOnDataNode) {
×
263
      return;
×
264
    }
265

266
    switch (statusFromConfigNode) {
×
267
      case RUNNING:
268
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
×
269
          startPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
270
        } else {
271
          throw new IllegalStateException(
×
272
              String.format(
×
273
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
274
        }
275
        break;
276
      case STOPPED:
277
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
×
278
          stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
279
        } else {
280
          throw new IllegalStateException(
×
281
              String.format(
×
282
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
283
        }
284
        break;
285
      case DROPPED:
286
        // This should not happen, but we still handle it here
287
        dropPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
288
        break;
×
289
      default:
290
        throw new IllegalStateException(
×
291
            String.format(
×
292
                MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, pipeStaticMeta.getPipeName()));
×
293
    }
294
  }
×
295

296
  public synchronized void dropAllPipeTasks() {
297
    acquireWriteLock();
×
298
    try {
299
      dropAllPipeTasksInternal();
×
300
    } finally {
301
      releaseWriteLock();
×
302
    }
303
  }
×
304

305
  private void dropAllPipeTasksInternal() {
306
    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
307
      try {
308
        dropPipe(
×
309
            pipeMeta.getStaticMeta().getPipeName(), pipeMeta.getStaticMeta().getCreationTime());
×
310
      } catch (final Exception e) {
×
311
        LOGGER.warn(
×
312
            "Failed to drop pipe {} with creation time {}",
313
            pipeMeta.getStaticMeta().getPipeName(),
×
314
            pipeMeta.getStaticMeta().getCreationTime(),
×
315
            e);
316
      }
×
317
    }
×
318
  }
×
319

320
  public synchronized void stopAllPipesWithCriticalException() {
321
    acquireWriteLock();
×
322
    try {
323
      stopAllPipesWithCriticalExceptionInternal();
×
324
    } finally {
325
      releaseWriteLock();
×
326
    }
327
  }
×
328

329
  private void stopAllPipesWithCriticalExceptionInternal() {
330
    pipeMetaKeeper
×
331
        .getPipeMetaList()
×
332
        .forEach(
×
333
            pipeMeta -> {
334
              final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
×
335
              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
×
336

337
              if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
×
338
                runtimeMeta
×
339
                    .getConsensusGroupId2TaskMetaMap()
×
340
                    .values()
×
341
                    .forEach(
×
342
                        pipeTaskMeta -> {
343
                          for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
×
344
                            if (e instanceof PipeRuntimeCriticalException) {
×
345
                              stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
×
346
                              return;
×
347
                            }
348
                          }
×
349
                        });
×
350
              }
351
            });
×
352
  }
×
353

354
  ////////////////////////// Manage by Pipe Name //////////////////////////
355

356
  /**
357
   * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create
358
   * the pipe and return true.
359
   *
360
   * @param pipeMetaFromConfigNode pipe meta from config node
361
   * @return true if the pipe is created successfully and should be started, false if the pipe
362
   *     already exists or is created but should not be started
363
   * @throws IllegalStateException if the status is illegal
364
   */
365
  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
366
    final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
367
    final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
×
368

369
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
370
    if (existedPipeMeta != null) {
×
371
      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
×
372
        final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
373
        switch (status) {
×
374
          case STOPPED:
375
          case RUNNING:
376
            if (LOGGER.isInfoEnabled()) {
×
377
              LOGGER.info(
×
378
                  "Pipe {} (creation time = {}) has already been created. "
379
                      + "Current status = {}. Skip creating.",
380
                  pipeName,
381
                  creationTime,
×
382
                  status.name());
×
383
            }
384
            return false;
×
385
          case DROPPED:
386
            if (LOGGER.isInfoEnabled()) {
×
387
              LOGGER.info(
×
388
                  "Pipe {} (creation time = {}) has already been dropped, "
389
                      + "but the pipe task meta has not been cleaned up. "
390
                      + "Current status = {}. Try dropping the pipe and recreating it.",
391
                  pipeName,
392
                  creationTime,
×
393
                  status.name());
×
394
            }
395
            // Break to drop the pipe and recreate it
396
            break;
397
          default:
398
            throw new IllegalStateException(
×
399
                MESSAGE_UNEXPECTED_PIPE_STATUS
400
                    + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
401
        }
402
      }
403

404
      // Drop the pipe if
405
      // 1. The pipe with the same name but with different creation time has been created before
406
      // 2. The pipe with the same name and the same creation time has been dropped before, but the
407
      //  pipe task meta has not been cleaned up
408
      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
×
409
    }
410

411
    // Create pipe tasks and trigger create() method for each pipe task
412
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
413
        new PipeBuilder(pipeMetaFromConfigNode).build();
×
414
    for (PipeTask pipeTask : pipeTasks.values()) {
×
415
      pipeTask.create();
×
416
    }
×
417
    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks);
×
418

419
    // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status
420
    // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial
421
    // status of a pipe, which makes the status transition logic simpler.
422
    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
×
423
        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
×
424
    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING;
×
425
    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
×
426

427
    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
×
428

429
    // If the pipe status from config node is RUNNING, we will start the pipe later.
430
    return needToStartPipe;
×
431
  }
432

433
  private void dropPipe(String pipeName, long creationTime) {
434
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
435

436
    if (existedPipeMeta == null) {
×
437
      LOGGER.info(
×
438
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
439
              + "Skip dropping.",
440
          pipeName,
441
          creationTime);
×
442
      return;
×
443
    }
444
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
445
      LOGGER.info(
×
446
          "Pipe {} (creation time = {}) has been created but does not match "
447
              + "the creation time ({}) in dropPipe request. Skip dropping.",
448
          pipeName,
449
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
450
          creationTime);
×
451
      return;
×
452
    }
453

454
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
455
    // but the pipe task meta has not been cleaned up (in case of failure when executing
456
    // dropPipeTaskByConsensusGroup).
457
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
458

459
    // Drop pipe tasks and trigger drop() method for each pipe task
460
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
461
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
462
    if (pipeTasks == null) {
×
463
      LOGGER.info(
×
464
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
465
              + "Skip dropping.",
466
          pipeName,
467
          creationTime);
×
468
      return;
×
469
    }
470
    for (PipeTask pipeTask : pipeTasks.values()) {
×
471
      pipeTask.drop();
×
472
    }
×
473

474
    // Remove pipe meta from pipe meta keeper
475
    pipeMetaKeeper.removePipeMeta(pipeName);
×
476
  }
×
477

478
  private void dropPipe(String pipeName) {
479
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
480

481
    if (existedPipeMeta == null) {
×
482
      LOGGER.info(
×
483
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
484
      return;
×
485
    }
486

487
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
488
    // but the pipe task meta has not been cleaned up (in case of failure when executing
489
    // dropPipeTaskByConsensusGroup).
490
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
491

492
    // Drop pipe tasks and trigger drop() method for each pipe task
493
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
494
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
495
    if (pipeTasks == null) {
×
496
      LOGGER.info(
×
497
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
498
      return;
×
499
    }
500
    for (PipeTask pipeTask : pipeTasks.values()) {
×
501
      pipeTask.drop();
×
502
    }
×
503

504
    // Remove pipe meta from pipe meta keeper
505
    pipeMetaKeeper.removePipeMeta(pipeName);
×
506
  }
×
507

508
  private void startPipe(String pipeName, long creationTime) {
509
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
510

511
    if (existedPipeMeta == null) {
×
512
      LOGGER.info(
×
513
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
514
              + "Skip starting.",
515
          pipeName,
516
          creationTime);
×
517
      return;
×
518
    }
519
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
520
      LOGGER.info(
×
521
          "Pipe {} (creation time = {}) has been created but does not match "
522
              + "the creation time ({}) in startPipe request. Skip starting.",
523
          pipeName,
524
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
525
          creationTime);
×
526
      return;
×
527
    }
528

529
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
530
    switch (status) {
×
531
      case STOPPED:
532
        if (LOGGER.isInfoEnabled()) {
×
533
          LOGGER.info(
×
534
              "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
535
              pipeName,
536
              creationTime,
×
537
              status.name());
×
538
        }
539
        break;
540
      case RUNNING:
541
        if (LOGGER.isInfoEnabled()) {
×
542
          LOGGER.info(
×
543
              "Pipe {} (creation time = {}) has already been started. Current status = {}. "
544
                  + "Skip starting.",
545
              pipeName,
546
              creationTime,
×
547
              status.name());
×
548
        }
549
        return;
×
550
      case DROPPED:
551
        if (LOGGER.isInfoEnabled()) {
×
552
          LOGGER.info(
×
553
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
554
                  + "Skip starting.",
555
              pipeName,
556
              creationTime,
×
557
              status.name());
×
558
        }
559
        return;
×
560
      default:
561
        throw new IllegalStateException(
×
562
            MESSAGE_UNEXPECTED_PIPE_STATUS
563
                + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
564
    }
565

566
    // Trigger start() method for each pipe task
567
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
568
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
569
    if (pipeTasks == null) {
×
570
      LOGGER.info(
×
571
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
572
              + "Skip starting.",
573
          pipeName,
574
          creationTime);
×
575
      return;
×
576
    }
577
    for (PipeTask pipeTask : pipeTasks.values()) {
×
578
      pipeTask.start();
×
579
    }
×
580

581
    // Set pipe meta status to RUNNING
582
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
×
583
    // Clear exception messages if started successfully
584
    existedPipeMeta
×
585
        .getRuntimeMeta()
×
586
        .getConsensusGroupId2TaskMetaMap()
×
587
        .values()
×
588
        .forEach(PipeTaskMeta::clearExceptionMessages);
×
589
  }
×
590

591
  private void stopPipe(String pipeName, long creationTime) {
592
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
593

594
    if (existedPipeMeta == null) {
×
595
      LOGGER.info(
×
596
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
597
              + "Skip stopping.",
598
          pipeName,
599
          creationTime);
×
600
      return;
×
601
    }
602
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
603
      LOGGER.info(
×
604
          "Pipe {} (creation time = {}) has been created but does not match "
605
              + "the creation time ({}) in stopPipe request. Skip stopping.",
606
          pipeName,
607
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
608
          creationTime);
×
609
      return;
×
610
    }
611

612
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
613
    switch (status) {
×
614
      case STOPPED:
615
        if (LOGGER.isInfoEnabled()) {
×
616
          LOGGER.info(
×
617
              "Pipe {} (creation time = {}) has already been stopped. Current status = {}. "
618
                  + "Skip stopping.",
619
              pipeName,
620
              creationTime,
×
621
              status.name());
×
622
        }
623
        return;
×
624
      case RUNNING:
625
        if (LOGGER.isInfoEnabled()) {
×
626
          LOGGER.info(
×
627
              "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
628
              pipeName,
629
              creationTime,
×
630
              status.name());
×
631
        }
632
        break;
633
      case DROPPED:
634
        if (LOGGER.isInfoEnabled()) {
×
635
          LOGGER.info(
×
636
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
637
                  + "Skip stopping.",
638
              pipeName,
639
              creationTime,
×
640
              status.name());
×
641
        }
642
        return;
×
643
      default:
644
        throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + status.name());
×
645
    }
646

647
    // Trigger stop() method for each pipe task
648
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
649
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
650
    if (pipeTasks == null) {
×
651
      LOGGER.info(
×
652
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
653
              + "Skip stopping.",
654
          pipeName,
655
          creationTime);
×
656
      return;
×
657
    }
658
    for (PipeTask pipeTask : pipeTasks.values()) {
×
659
      pipeTask.stop();
×
660
    }
×
661

662
    // Set pipe meta status to STOPPED
663
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
×
664
  }
×
665

666
  ///////////////////////// Manage by dataRegionGroupId /////////////////////////
667

668
  private void createPipeTask(
669
      TConsensusGroupId consensusGroupId,
670
      PipeStaticMeta pipeStaticMeta,
671
      PipeTaskMeta pipeTaskMeta) {
672
    if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
×
673
      final PipeTask pipeTask =
×
674
          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
×
675
      pipeTask.create();
×
676
      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
×
677
    }
678
    pipeMetaKeeper
×
679
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
680
        .getRuntimeMeta()
×
681
        .getConsensusGroupId2TaskMetaMap()
×
682
        .put(consensusGroupId, pipeTaskMeta);
×
683
  }
×
684

685
  private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
686
    pipeMetaKeeper
×
687
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
688
        .getRuntimeMeta()
×
689
        .getConsensusGroupId2TaskMetaMap()
×
690
        .remove(dataRegionGroupId);
×
691
    final PipeTask pipeTask = pipeTaskManager.removePipeTask(pipeStaticMeta, dataRegionGroupId);
×
692
    if (pipeTask != null) {
×
693
      pipeTask.drop();
×
694
    }
695
  }
×
696

697
  private void startPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
698
    final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, dataRegionGroupId);
×
699
    if (pipeTask != null) {
×
700
      pipeTask.start();
×
701
    }
702
  }
×
703

704
  ///////////////////////// Heartbeat /////////////////////////
705

706
  public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
707
      throws TException {
708
    acquireReadLock();
×
709
    try {
710
      collectPipeMetaListInternal(req, resp);
×
711
    } finally {
712
      releaseReadLock();
×
713
    }
714
  }
×
715

716
  private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp resp)
717
      throws TException {
718
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
719
    if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
×
720
      return;
×
721
    }
722

723
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
724
    try {
725
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
726
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
727
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
728
      }
×
729
    } catch (IOException e) {
×
730
      throw new TException(e);
×
731
    }
×
732
    resp.setPipeMetaList(pipeMetaBinaryList);
×
733
  }
×
734

735
  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
736
      throws TException {
737
    acquireReadLock();
×
738
    try {
739
      collectPipeMetaListInternal(req, resp);
×
740
    } finally {
741
      releaseReadLock();
×
742
    }
743
  }
×
744

745
  private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
746
      throws TException {
747
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
748
    if (PipeAgent.runtime().isShutdown()) {
×
749
      return;
×
750
    }
751
    LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
×
752

753
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
754
    try {
755
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
756
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
757
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
758
      }
×
759
    } catch (IOException e) {
×
760
      throw new TException(e);
×
761
    }
×
762
    resp.setPipeMetaList(pipeMetaBinaryList);
×
763
  }
×
764
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc