• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9940

28 Aug 2023 02:34PM CUT coverage: 47.667% (-0.02%) from 47.686%
#9940

Pull #10978

travis_ci

web-flow
Merge 64f220724 into ebd2a6f63
Pull Request #10978: [To rel/1.2] Pipe: Increase the injection frequency of HeartBeatEvent to reduce the delay in log transferring (#10970)

30 of 30 new or added lines in 7 files covered. (100.0%)

79985 of 167800 relevant lines covered (47.67%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.53
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.task;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
26
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
28
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
29
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
30
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
31
import org.apache.iotdb.db.conf.IoTDBConfig;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.pipe.agent.PipeAgent;
34
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
35
import org.apache.iotdb.db.pipe.task.PipeBuilder;
36
import org.apache.iotdb.db.pipe.task.PipeTask;
37
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
38
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
39
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
40
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
41
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
42
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
43
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
44

45
import org.apache.thrift.TException;
46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
import javax.validation.constraints.NotNull;
50

51
import java.io.IOException;
52
import java.nio.ByteBuffer;
53
import java.util.ArrayList;
54
import java.util.Collections;
55
import java.util.List;
56
import java.util.Map;
57
import java.util.Objects;
58
import java.util.Set;
59
import java.util.concurrent.atomic.AtomicReference;
60
import java.util.stream.Collectors;
61

62
/**
63
 * State transition diagram of a pipe task:
64
 *
65
 * <p><code>
66
 * |----------------|                     |---------| --> start pipe --> |---------|                   |---------|
67
 * | initial status | --> create pipe --> | STOPPED |                    | RUNNING | --> drop pipe --> | DROPPED |
68
 * |----------------|                     |---------| <-- stop  pipe <-- |---------|                   |---------|
69
 *                                             |                                                            |
70
 *                                             | ----------------------> drop pipe -----------------------> |
71
 * </code>
72
 *
73
 * <p>Other transitions are not allowed, will be ignored when received in the pipe task agent.
74
 */
75
public class PipeTaskAgent {
76

77
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
1✔
78
  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
1✔
79

80
  private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
81
  private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
82

83
  private final PipeMetaKeeper pipeMetaKeeper;
84
  private final PipeTaskManager pipeTaskManager;
85

86
  public PipeTaskAgent() {
1✔
87
    pipeMetaKeeper = new PipeMetaKeeper();
1✔
88
    pipeTaskManager = new PipeTaskManager();
1✔
89
  }
1✔
90

91
  ////////////////////////// PipeMeta Lock Control //////////////////////////
92

93
  private void acquireReadLock() {
94
    pipeMetaKeeper.acquireReadLock();
×
95
  }
×
96

97
  public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
98
    try {
99
      return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
×
100
    } catch (InterruptedException e) {
×
101
      Thread.currentThread().interrupt();
×
102
      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
×
103
      return false;
×
104
    }
105
  }
106

107
  private void releaseReadLock() {
108
    pipeMetaKeeper.releaseReadLock();
×
109
  }
×
110

111
  private void acquireWriteLock() {
112
    pipeMetaKeeper.acquireWriteLock();
×
113
  }
×
114

115
  private void releaseWriteLock() {
116
    pipeMetaKeeper.releaseWriteLock();
×
117
  }
×
118

119
  ////////////////////////// Pipe Task Management Entry //////////////////////////
120

121
  public synchronized TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
122
      PipeMeta pipeMetaFromConfigNode) {
123
    acquireWriteLock();
×
124
    try {
125
      return handleSinglePipeMetaChangesInternal(pipeMetaFromConfigNode);
×
126
    } finally {
127
      releaseWriteLock();
×
128
    }
129
  }
130

131
  private TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
132
      PipeMeta pipeMetaFromConfigNode) {
133
    // Do nothing if data node is removing or removed
134
    if (PipeAgent.runtime().isShutdown()) {
×
135
      return null;
×
136
    }
137

138
    try {
139
      executeSinglePipeMetaChanges(pipeMetaFromConfigNode);
×
140
      return null;
×
141
    } catch (Exception e) {
×
142
      final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
143
      final String errorMessage =
×
144
          String.format(
×
145
              "Failed to handle single pipe meta changes for %s, because %s",
146
              pipeName, e.getMessage());
×
147
      LOGGER.warn("Failed to handle single pipe meta changes for {}", pipeName, e);
×
148
      return new TPushPipeMetaRespExceptionMessage(
×
149
          pipeName, errorMessage, System.currentTimeMillis());
×
150
    }
151
  }
152

153
  public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) {
154
    acquireWriteLock();
×
155
    try {
156
      return handleDropPipeInternal(pipeName);
×
157
    } finally {
158
      releaseWriteLock();
×
159
    }
160
  }
161

162
  private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) {
163
    // Do nothing if data node is removing or removed
164
    if (PipeAgent.runtime().isShutdown()) {
×
165
      return null;
×
166
    }
167

168
    try {
169
      dropPipe(pipeName);
×
170
      return null;
×
171
    } catch (Exception e) {
×
172
      final String errorMessage =
×
173
          String.format("Failed to drop pipe %s, because %s", pipeName, e.getMessage());
×
174
      LOGGER.warn("Failed to drop pipe {}", pipeName, e);
×
175
      return new TPushPipeMetaRespExceptionMessage(
×
176
          pipeName, errorMessage, System.currentTimeMillis());
×
177
    }
178
  }
179

180
  public synchronized List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
181
      List<PipeMeta> pipeMetaListFromConfigNode) {
182
    acquireWriteLock();
×
183
    try {
184
      return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
×
185
    } finally {
186
      releaseWriteLock();
×
187
    }
188
  }
189

190
  private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
191
      List<PipeMeta> pipeMetaListFromConfigNode) {
192
    // Do nothing if data node is removing or removed
193
    if (PipeAgent.runtime().isShutdown()) {
×
194
      return Collections.emptyList();
×
195
    }
196

197
    final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
×
198

199
    // Iterate through pipe meta list from config node, check if pipe meta exists on data node
200
    // or has changed
201
    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
×
202
      try {
203
        executeSinglePipeMetaChanges(metaFromConfigNode);
×
204
      } catch (Exception e) {
×
205
        final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
206
        final String errorMessage =
×
207
            String.format(
×
208
                "Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
×
209
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
210
        exceptionMessages.add(
×
211
            new TPushPipeMetaRespExceptionMessage(
212
                pipeName, errorMessage, System.currentTimeMillis()));
×
213
      }
×
214
    }
×
215

216
    // Check if there are pipes on data node that do not exist on config node, if so, drop them
217
    final Set<String> pipeNamesFromConfigNode =
×
218
        pipeMetaListFromConfigNode.stream()
×
219
            .map(meta -> meta.getStaticMeta().getPipeName())
×
220
            .collect(Collectors.toSet());
×
221
    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
×
222
      final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
×
223

224
      try {
225
        if (!pipeNamesFromConfigNode.contains(pipeName)) {
×
226
          dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
×
227
        }
228
      } catch (Exception e) {
×
229
        // Do not record the error messages for the pipes don't exist on ConfigNode.
230
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
231
      }
×
232
    }
×
233

234
    return exceptionMessages;
×
235
  }
236

237
  private void executeSinglePipeMetaChanges(final PipeMeta metaFromConfigNode) {
238
    final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
239
    final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
×
240

241
    // If pipe meta does not exist on data node, create a new pipe
242
    if (metaOnDataNode == null) {
×
243
      if (createPipe(metaFromConfigNode)) {
×
244
        // If the status recorded in config node is RUNNING, start the pipe
245
        startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
246
      }
247
      // If the status recorded in config node is STOPPED or DROPPED, do nothing
248
      return;
×
249
    }
250

251
    // If pipe meta exists on data node, check if it has changed
252
    final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
×
253
    final PipeStaticMeta staticMetaFromConfigNode = metaFromConfigNode.getStaticMeta();
×
254

255
    // First check if pipe static meta has changed, if so, drop the pipe and create a new one
256
    if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
×
257
      dropPipe(pipeName);
×
258
      if (createPipe(metaFromConfigNode)) {
×
259
        startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
260
      }
261
      // If the status is STOPPED or DROPPED, do nothing
262
      return;
×
263
    }
264

265
    // Then check if pipe runtime meta has changed, if so, update the pipe
266
    final PipeRuntimeMeta runtimeMetaOnDataNode = metaOnDataNode.getRuntimeMeta();
×
267
    final PipeRuntimeMeta runtimeMetaFromConfigNode = metaFromConfigNode.getRuntimeMeta();
×
268
    executeSinglePipeRuntimeMetaChanges(
×
269
        staticMetaFromConfigNode, runtimeMetaFromConfigNode, runtimeMetaOnDataNode);
270
  }
×
271

272
  private void executeSinglePipeRuntimeMetaChanges(
273
      @NotNull PipeStaticMeta pipeStaticMeta,
274
      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
275
      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
276
    // 1. Handle data region group leader changed first
277
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapFromConfigNode =
×
278
        runtimeMetaFromConfigNode.getConsensusGroupId2TaskMetaMap();
×
279
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapOnDataNode =
×
280
        runtimeMetaOnDataNode.getConsensusGroupId2TaskMetaMap();
×
281

282
    // 1.1 Iterate over all consensus group ids in config node's pipe runtime meta, decide if we
283
    // need to drop and create a new task for each consensus group id
284
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
285
        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
×
286
      final TConsensusGroupId consensusGroupIdFromConfigNode = entryFromConfigNode.getKey();
×
287

288
      final PipeTaskMeta taskMetaFromConfigNode = entryFromConfigNode.getValue();
×
289
      final PipeTaskMeta taskMetaOnDataNode =
×
290
          consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
×
291

292
      // If task meta does not exist on data node, create a new task
293
      if (taskMetaOnDataNode == null) {
×
294
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
295
        // We keep the new created task's status consistent with the status recorded in data node's
296
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
297
        // is not reliable, but we will have a check later to make sure the status is correct.
298
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
299
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
300
        }
301
        continue;
302
      }
303

304
      // If task meta exists on data node, check if it has changed
305
      final int dataNodeIdFromConfigNode = taskMetaFromConfigNode.getLeaderDataNodeId();
×
306
      final int dataNodeIdOnDataNode = taskMetaOnDataNode.getLeaderDataNodeId();
×
307

308
      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
×
309
        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
310
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
311
        // We keep the new created task's status consistent with the status recorded in data node's
312
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
313
        // is not reliable, but we will have a check later to make sure the status is correct.
314
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
315
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
316
        }
317
      }
318
    }
×
319

320
    // 1.2 Iterate over all consensus group ids on data node's pipe runtime meta, decide if we need
321
    // to drop any task. we do not need to create any new task here because we have already done
322
    // that in 1.1.
323
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
324
        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
×
325
      final TConsensusGroupId consensusGroupIdOnDataNode = entryOnDataNode.getKey();
×
326
      final PipeTaskMeta taskMetaFromConfigNode =
×
327
          consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
×
328
      if (taskMetaFromConfigNode == null) {
×
329
        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
×
330
      }
331
    }
×
332

333
    // 2. Handle pipe runtime meta status changes
334
    final PipeStatus statusFromConfigNode = runtimeMetaFromConfigNode.getStatus().get();
×
335
    final PipeStatus statusOnDataNode = runtimeMetaOnDataNode.getStatus().get();
×
336
    if (statusFromConfigNode == statusOnDataNode) {
×
337
      return;
×
338
    }
339

340
    switch (statusFromConfigNode) {
×
341
      case RUNNING:
342
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
×
343
          startPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
344
        } else {
345
          throw new IllegalStateException(
×
346
              String.format(
×
347
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
348
        }
349
        break;
350
      case STOPPED:
351
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
×
352
          stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
353
        } else {
354
          throw new IllegalStateException(
×
355
              String.format(
×
356
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
357
        }
358
        break;
359
      case DROPPED:
360
        // This should not happen, but we still handle it here
361
        dropPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
362
        break;
×
363
      default:
364
        throw new IllegalStateException(
×
365
            String.format(
×
366
                MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, pipeStaticMeta.getPipeName()));
×
367
    }
368
  }
×
369

370
  public synchronized void dropAllPipeTasks() {
371
    acquireWriteLock();
×
372
    try {
373
      dropAllPipeTasksInternal();
×
374
    } finally {
375
      releaseWriteLock();
×
376
    }
377
  }
×
378

379
  private void dropAllPipeTasksInternal() {
380
    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
381
      try {
382
        dropPipe(
×
383
            pipeMeta.getStaticMeta().getPipeName(), pipeMeta.getStaticMeta().getCreationTime());
×
384
      } catch (final Exception e) {
×
385
        LOGGER.warn(
×
386
            "Failed to drop pipe {} with creation time {}",
387
            pipeMeta.getStaticMeta().getPipeName(),
×
388
            pipeMeta.getStaticMeta().getCreationTime(),
×
389
            e);
390
      }
×
391
    }
×
392
  }
×
393

394
  public synchronized void stopAllPipesWithCriticalException() {
395
    acquireWriteLock();
×
396
    try {
397
      stopAllPipesWithCriticalExceptionInternal();
×
398
    } finally {
399
      releaseWriteLock();
×
400
    }
401
  }
×
402

403
  private void stopAllPipesWithCriticalExceptionInternal() {
404
    pipeMetaKeeper
×
405
        .getPipeMetaList()
×
406
        .forEach(
×
407
            pipeMeta -> {
408
              final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
×
409
              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
×
410

411
              if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
×
412
                runtimeMeta
×
413
                    .getConsensusGroupId2TaskMetaMap()
×
414
                    .values()
×
415
                    .forEach(
×
416
                        pipeTaskMeta -> {
417
                          for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
×
418
                            if (e instanceof PipeRuntimeCriticalException) {
×
419
                              stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
×
420
                              return;
×
421
                            }
422
                          }
×
423
                        });
×
424
              }
425
            });
×
426
  }
×
427

428
  ////////////////////////// Manage by Pipe Name //////////////////////////
429

430
  /**
431
   * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create
432
   * the pipe and return true.
433
   *
434
   * @param pipeMetaFromConfigNode pipe meta from config node
435
   * @return true if the pipe is created successfully and should be started, false if the pipe
436
   *     already exists or is created but should not be started
437
   * @throws IllegalStateException if the status is illegal
438
   */
439
  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
440
    final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
441
    final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
×
442

443
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
444
    if (existedPipeMeta != null) {
×
445
      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
×
446
        final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
447
        switch (status) {
×
448
          case STOPPED:
449
          case RUNNING:
450
            if (LOGGER.isInfoEnabled()) {
×
451
              LOGGER.info(
×
452
                  "Pipe {} (creation time = {}) has already been created. "
453
                      + "Current status = {}. Skip creating.",
454
                  pipeName,
455
                  creationTime,
×
456
                  status.name());
×
457
            }
458
            return false;
×
459
          case DROPPED:
460
            if (LOGGER.isInfoEnabled()) {
×
461
              LOGGER.info(
×
462
                  "Pipe {} (creation time = {}) has already been dropped, "
463
                      + "but the pipe task meta has not been cleaned up. "
464
                      + "Current status = {}. Try dropping the pipe and recreating it.",
465
                  pipeName,
466
                  creationTime,
×
467
                  status.name());
×
468
            }
469
            // Break to drop the pipe and recreate it
470
            break;
471
          default:
472
            throw new IllegalStateException(
×
473
                MESSAGE_UNEXPECTED_PIPE_STATUS
474
                    + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
475
        }
476
      }
477

478
      // Drop the pipe if
479
      // 1. The pipe with the same name but with different creation time has been created before
480
      // 2. The pipe with the same name and the same creation time has been dropped before, but the
481
      //  pipe task meta has not been cleaned up
482
      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
×
483
    }
484

485
    // Create pipe tasks and trigger create() method for each pipe task
486
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
487
        new PipeBuilder(pipeMetaFromConfigNode).build();
×
488
    for (PipeTask pipeTask : pipeTasks.values()) {
×
489
      pipeTask.create();
×
490
    }
×
491
    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks);
×
492

493
    // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status
494
    // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial
495
    // status of a pipe, which makes the status transition logic simpler.
496
    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
×
497
        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
×
498
    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING;
×
499
    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
×
500

501
    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
×
502

503
    // If the pipe status from config node is RUNNING, we will start the pipe later.
504
    return needToStartPipe;
×
505
  }
506

507
  private void dropPipe(String pipeName, long creationTime) {
508
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
509

510
    if (existedPipeMeta == null) {
×
511
      LOGGER.info(
×
512
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
513
              + "Skip dropping.",
514
          pipeName,
515
          creationTime);
×
516
      return;
×
517
    }
518
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
519
      LOGGER.info(
×
520
          "Pipe {} (creation time = {}) has been created but does not match "
521
              + "the creation time ({}) in dropPipe request. Skip dropping.",
522
          pipeName,
523
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
524
          creationTime);
×
525
      return;
×
526
    }
527

528
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
529
    // but the pipe task meta has not been cleaned up (in case of failure when executing
530
    // dropPipeTaskByConsensusGroup).
531
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
532

533
    // Drop pipe tasks and trigger drop() method for each pipe task
534
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
535
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
536
    if (pipeTasks == null) {
×
537
      LOGGER.info(
×
538
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
539
              + "Skip dropping.",
540
          pipeName,
541
          creationTime);
×
542
      return;
×
543
    }
544
    for (PipeTask pipeTask : pipeTasks.values()) {
×
545
      pipeTask.drop();
×
546
    }
×
547

548
    // Remove pipe meta from pipe meta keeper
549
    pipeMetaKeeper.removePipeMeta(pipeName);
×
550
  }
×
551

552
  private void dropPipe(String pipeName) {
553
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
554

555
    if (existedPipeMeta == null) {
×
556
      LOGGER.info(
×
557
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
558
      return;
×
559
    }
560

561
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
562
    // but the pipe task meta has not been cleaned up (in case of failure when executing
563
    // dropPipeTaskByConsensusGroup).
564
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
565

566
    // Drop pipe tasks and trigger drop() method for each pipe task
567
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
568
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
569
    if (pipeTasks == null) {
×
570
      LOGGER.info(
×
571
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
572
      return;
×
573
    }
574
    for (PipeTask pipeTask : pipeTasks.values()) {
×
575
      pipeTask.drop();
×
576
    }
×
577

578
    // Remove pipe meta from pipe meta keeper
579
    pipeMetaKeeper.removePipeMeta(pipeName);
×
580
  }
×
581

582
  private void startPipe(String pipeName, long creationTime) {
583
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
584

585
    if (existedPipeMeta == null) {
×
586
      LOGGER.info(
×
587
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
588
              + "Skip starting.",
589
          pipeName,
590
          creationTime);
×
591
      return;
×
592
    }
593
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
594
      LOGGER.info(
×
595
          "Pipe {} (creation time = {}) has been created but does not match "
596
              + "the creation time ({}) in startPipe request. Skip starting.",
597
          pipeName,
598
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
599
          creationTime);
×
600
      return;
×
601
    }
602

603
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
604
    switch (status) {
×
605
      case STOPPED:
606
        if (LOGGER.isInfoEnabled()) {
×
607
          LOGGER.info(
×
608
              "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
609
              pipeName,
610
              creationTime,
×
611
              status.name());
×
612
        }
613
        break;
614
      case RUNNING:
615
        if (LOGGER.isInfoEnabled()) {
×
616
          LOGGER.info(
×
617
              "Pipe {} (creation time = {}) has already been started. Current status = {}. "
618
                  + "Skip starting.",
619
              pipeName,
620
              creationTime,
×
621
              status.name());
×
622
        }
623
        return;
×
624
      case DROPPED:
625
        if (LOGGER.isInfoEnabled()) {
×
626
          LOGGER.info(
×
627
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
628
                  + "Skip starting.",
629
              pipeName,
630
              creationTime,
×
631
              status.name());
×
632
        }
633
        return;
×
634
      default:
635
        throw new IllegalStateException(
×
636
            MESSAGE_UNEXPECTED_PIPE_STATUS
637
                + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
638
    }
639

640
    // Trigger start() method for each pipe task
641
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
642
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
643
    if (pipeTasks == null) {
×
644
      LOGGER.info(
×
645
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
646
              + "Skip starting.",
647
          pipeName,
648
          creationTime);
×
649
      return;
×
650
    }
651
    for (PipeTask pipeTask : pipeTasks.values()) {
×
652
      pipeTask.start();
×
653
    }
×
654

655
    // Set pipe meta status to RUNNING
656
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
×
657
    // Clear exception messages if started successfully
658
    existedPipeMeta
×
659
        .getRuntimeMeta()
×
660
        .getConsensusGroupId2TaskMetaMap()
×
661
        .values()
×
662
        .forEach(PipeTaskMeta::clearExceptionMessages);
×
663
  }
×
664

665
  private void stopPipe(String pipeName, long creationTime) {
666
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
667

668
    if (existedPipeMeta == null) {
×
669
      LOGGER.info(
×
670
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
671
              + "Skip stopping.",
672
          pipeName,
673
          creationTime);
×
674
      return;
×
675
    }
676
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
677
      LOGGER.info(
×
678
          "Pipe {} (creation time = {}) has been created but does not match "
679
              + "the creation time ({}) in stopPipe request. Skip stopping.",
680
          pipeName,
681
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
682
          creationTime);
×
683
      return;
×
684
    }
685

686
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
687
    switch (status) {
×
688
      case STOPPED:
689
        if (LOGGER.isInfoEnabled()) {
×
690
          LOGGER.info(
×
691
              "Pipe {} (creation time = {}) has already been stopped. Current status = {}. "
692
                  + "Skip stopping.",
693
              pipeName,
694
              creationTime,
×
695
              status.name());
×
696
        }
697
        return;
×
698
      case RUNNING:
699
        if (LOGGER.isInfoEnabled()) {
×
700
          LOGGER.info(
×
701
              "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
702
              pipeName,
703
              creationTime,
×
704
              status.name());
×
705
        }
706
        break;
707
      case DROPPED:
708
        if (LOGGER.isInfoEnabled()) {
×
709
          LOGGER.info(
×
710
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
711
                  + "Skip stopping.",
712
              pipeName,
713
              creationTime,
×
714
              status.name());
×
715
        }
716
        return;
×
717
      default:
718
        throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + status.name());
×
719
    }
720

721
    // Trigger stop() method for each pipe task
722
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
723
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
724
    if (pipeTasks == null) {
×
725
      LOGGER.info(
×
726
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
727
              + "Skip stopping.",
728
          pipeName,
729
          creationTime);
×
730
      return;
×
731
    }
732
    for (PipeTask pipeTask : pipeTasks.values()) {
×
733
      pipeTask.stop();
×
734
    }
×
735

736
    // Set pipe meta status to STOPPED
737
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
×
738
  }
×
739

740
  ///////////////////////// Manage by dataRegionGroupId /////////////////////////
741

742
  private void createPipeTask(
743
      TConsensusGroupId consensusGroupId,
744
      PipeStaticMeta pipeStaticMeta,
745
      PipeTaskMeta pipeTaskMeta) {
746
    if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
×
747
      final PipeTask pipeTask =
×
748
          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
×
749
      pipeTask.create();
×
750
      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
×
751
    }
752
    pipeMetaKeeper
×
753
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
754
        .getRuntimeMeta()
×
755
        .getConsensusGroupId2TaskMetaMap()
×
756
        .put(consensusGroupId, pipeTaskMeta);
×
757
  }
×
758

759
  private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
760
    pipeMetaKeeper
×
761
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
762
        .getRuntimeMeta()
×
763
        .getConsensusGroupId2TaskMetaMap()
×
764
        .remove(dataRegionGroupId);
×
765
    final PipeTask pipeTask = pipeTaskManager.removePipeTask(pipeStaticMeta, dataRegionGroupId);
×
766
    if (pipeTask != null) {
×
767
      pipeTask.drop();
×
768
    }
769
  }
×
770

771
  private void startPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
772
    final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, dataRegionGroupId);
×
773
    if (pipeTask != null) {
×
774
      pipeTask.start();
×
775
    }
776
  }
×
777

778
  ///////////////////////// Heartbeat /////////////////////////
779

780
  public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
781
      throws TException {
782
    // If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't
783
    // need to be acquired
784
    if (!req.isNeedPipeMetaList()) {
×
785
      return;
×
786
    }
787
    // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat
788
    // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode
789
    if (!tryReadLockWithTimeOut(10)) {
×
790
      return;
×
791
    }
792
    try {
793
      collectPipeMetaListInternal(resp);
×
794
    } finally {
795
      releaseReadLock();
×
796
    }
797
  }
×
798

799
  private void collectPipeMetaListInternal(THeartbeatResp resp) throws TException {
800
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
801
    if (PipeAgent.runtime().isShutdown()) {
×
802
      return;
×
803
    }
804

805
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
806
    try {
807
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
808
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
809
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
810
      }
×
811
    } catch (IOException e) {
×
812
      throw new TException(e);
×
813
    }
×
814
    resp.setPipeMetaList(pipeMetaBinaryList);
×
815
  }
×
816

817
  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
818
      throws TException {
819
    acquireReadLock();
×
820
    try {
821
      collectPipeMetaListInternal(req, resp);
×
822
    } finally {
823
      releaseReadLock();
×
824
    }
825
  }
×
826

827
  private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
828
      throws TException {
829
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
830
    if (PipeAgent.runtime().isShutdown()) {
×
831
      return;
×
832
    }
833
    LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
×
834

835
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
836
    try {
837
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
838
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
839
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
840
      }
×
841
    } catch (IOException e) {
×
842
      throw new TException(e);
×
843
    }
×
844
    resp.setPipeMetaList(pipeMetaBinaryList);
×
845

846
    PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
×
847
  }
×
848
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc