• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9871

18 Aug 2023 08:01AM UTC coverage: 47.982% (-0.02%) from 48.003%
#9871

push

travis_ci

web-flow
[IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN send exactly one pipeMeta to each DN upon create/start/stop/drop pipe (#10875) (#10905)

Currently, CN sends pipeMeta of all existing pipes to each dn upon create/start/stop/drop pipe, which may be time-comsuming.

In this commit, CN will send exactly one pipeMeta to each DN upon create/start/stop/drop pipe.

---------

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 38b36006b)

95 of 95 new or added lines in 12 files covered. (100.0%)

79801 of 166313 relevant lines covered (47.98%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.54
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.agent.task;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
24
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
25
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
26
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
27
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
28
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
29
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
30
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
31
import org.apache.iotdb.db.conf.IoTDBConfig;
32
import org.apache.iotdb.db.conf.IoTDBDescriptor;
33
import org.apache.iotdb.db.pipe.agent.PipeAgent;
34
import org.apache.iotdb.db.pipe.task.PipeBuilder;
35
import org.apache.iotdb.db.pipe.task.PipeTask;
36
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
37
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
38
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
39
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
40
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
41
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
42
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
43

44
import org.apache.thrift.TException;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
import javax.validation.constraints.NotNull;
49

50
import java.io.IOException;
51
import java.nio.ByteBuffer;
52
import java.util.ArrayList;
53
import java.util.Collections;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.Objects;
57
import java.util.Set;
58
import java.util.concurrent.atomic.AtomicReference;
59
import java.util.stream.Collectors;
60

61
/**
62
 * State transition diagram of a pipe task:
63
 *
64
 * <p><code>
65
 * |----------------|                     |---------| --> start pipe --> |---------|                   |---------|
66
 * | initial status | --> create pipe --> | STOPPED |                    | RUNNING | --> drop pipe --> | DROPPED |
67
 * |----------------|                     |---------| <-- stop  pipe <-- |---------|                   |---------|
68
 *                                             |                                                            |
69
 *                                             | ----------------------> drop pipe -----------------------> |
70
 * </code>
71
 *
72
 * <p>Other transitions are not allowed, will be ignored when received in the pipe task agent.
73
 */
74
public class PipeTaskAgent {
75

76
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
1✔
77
  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
1✔
78

79
  private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
80
  private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
81

82
  private final PipeMetaKeeper pipeMetaKeeper;
83
  private final PipeTaskManager pipeTaskManager;
84

85
  public PipeTaskAgent() {
1✔
86
    pipeMetaKeeper = new PipeMetaKeeper();
1✔
87
    pipeTaskManager = new PipeTaskManager();
1✔
88
  }
1✔
89

90
  ////////////////////////// PipeMeta Lock Control //////////////////////////
91

92
  private void acquireReadLock() {
93
    pipeMetaKeeper.acquireReadLock();
×
94
  }
×
95

96
  public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
97
    try {
98
      return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
×
99
    } catch (InterruptedException e) {
×
100
      Thread.currentThread().interrupt();
×
101
      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
×
102
      return false;
×
103
    }
104
  }
105

106
  private void releaseReadLock() {
107
    pipeMetaKeeper.releaseReadLock();
×
108
  }
×
109

110
  private void acquireWriteLock() {
111
    pipeMetaKeeper.acquireWriteLock();
×
112
  }
×
113

114
  private void releaseWriteLock() {
115
    pipeMetaKeeper.releaseWriteLock();
×
116
  }
×
117

118
  ////////////////////////// Pipe Task Management Entry //////////////////////////
119

120
  public synchronized TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
121
      PipeMeta pipeMetaFromConfigNode) {
122
    acquireWriteLock();
×
123
    try {
124
      return handleSinglePipeMetaChangesInternal(pipeMetaFromConfigNode);
×
125
    } finally {
126
      releaseWriteLock();
×
127
    }
128
  }
129

130
  private TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
131
      PipeMeta pipeMetaFromConfigNode) {
132
    // Do nothing if data node is removing or removed
133
    if (PipeAgent.runtime().isShutdown()) {
×
134
      return null;
×
135
    }
136

137
    try {
138
      executeSinglePipeMetaChanges(pipeMetaFromConfigNode);
×
139
      return null;
×
140
    } catch (Exception e) {
×
141
      final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
142
      final String errorMessage =
×
143
          String.format(
×
144
              "Failed to handle single pipe meta changes for %s, because %s",
145
              pipeName, e.getMessage());
×
146
      LOGGER.warn("Failed to handle single pipe meta changes for {}", pipeName, e);
×
147
      return new TPushPipeMetaRespExceptionMessage(
×
148
          pipeName, errorMessage, System.currentTimeMillis());
×
149
    }
150
  }
151

152
  public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) {
153
    acquireWriteLock();
×
154
    try {
155
      return handleDropPipeInternal(pipeName);
×
156
    } finally {
157
      releaseWriteLock();
×
158
    }
159
  }
160

161
  private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) {
162
    // Do nothing if data node is removing or removed
163
    if (PipeAgent.runtime().isShutdown()) {
×
164
      return null;
×
165
    }
166

167
    try {
168
      dropPipe(pipeName);
×
169
      return null;
×
170
    } catch (Exception e) {
×
171
      final String errorMessage =
×
172
          String.format("Failed to drop pipe %s, because %s", pipeName, e.getMessage());
×
173
      LOGGER.warn("Failed to drop pipe {}", pipeName, e);
×
174
      return new TPushPipeMetaRespExceptionMessage(
×
175
          pipeName, errorMessage, System.currentTimeMillis());
×
176
    }
177
  }
178

179
  public synchronized List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
180
      List<PipeMeta> pipeMetaListFromConfigNode) {
181
    acquireWriteLock();
×
182
    try {
183
      return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
×
184
    } finally {
185
      releaseWriteLock();
×
186
    }
187
  }
188

189
  private List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
190
      List<PipeMeta> pipeMetaListFromConfigNode) {
191
    // Do nothing if data node is removing or removed
192
    if (PipeAgent.runtime().isShutdown()) {
×
193
      return Collections.emptyList();
×
194
    }
195

196
    final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
×
197

198
    // Iterate through pipe meta list from config node, check if pipe meta exists on data node
199
    // or has changed
200
    for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
×
201
      try {
202
        executeSinglePipeMetaChanges(metaFromConfigNode);
×
203
      } catch (Exception e) {
×
204
        final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
205
        final String errorMessage =
×
206
            String.format(
×
207
                "Failed to handle pipe meta changes for %s, because %s", pipeName, e.getMessage());
×
208
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
209
        exceptionMessages.add(
×
210
            new TPushPipeMetaRespExceptionMessage(
211
                pipeName, errorMessage, System.currentTimeMillis()));
×
212
      }
×
213
    }
×
214

215
    // Check if there are pipes on data node that do not exist on config node, if so, drop them
216
    final Set<String> pipeNamesFromConfigNode =
×
217
        pipeMetaListFromConfigNode.stream()
×
218
            .map(meta -> meta.getStaticMeta().getPipeName())
×
219
            .collect(Collectors.toSet());
×
220
    for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
×
221
      final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
×
222

223
      try {
224
        if (!pipeNamesFromConfigNode.contains(pipeName)) {
×
225
          dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
×
226
        }
227
      } catch (Exception e) {
×
228
        // Do not record the error messages for the pipes don't exist on ConfigNode.
229
        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
×
230
      }
×
231
    }
×
232

233
    return exceptionMessages;
×
234
  }
235

236
  private void executeSinglePipeMetaChanges(final PipeMeta metaFromConfigNode) {
237
    final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
×
238
    final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
×
239

240
    // If pipe meta does not exist on data node, create a new pipe
241
    if (metaOnDataNode == null) {
×
242
      if (createPipe(metaFromConfigNode)) {
×
243
        // If the status recorded in config node is RUNNING, start the pipe
244
        startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
245
      }
246
      // If the status recorded in config node is STOPPED or DROPPED, do nothing
247
      return;
×
248
    }
249

250
    // If pipe meta exists on data node, check if it has changed
251
    final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
×
252
    final PipeStaticMeta staticMetaFromConfigNode = metaFromConfigNode.getStaticMeta();
×
253

254
    // First check if pipe static meta has changed, if so, drop the pipe and create a new one
255
    if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
×
256
      dropPipe(pipeName);
×
257
      if (createPipe(metaFromConfigNode)) {
×
258
        startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
×
259
      }
260
      // If the status is STOPPED or DROPPED, do nothing
261
      return;
×
262
    }
263

264
    // Then check if pipe runtime meta has changed, if so, update the pipe
265
    final PipeRuntimeMeta runtimeMetaOnDataNode = metaOnDataNode.getRuntimeMeta();
×
266
    final PipeRuntimeMeta runtimeMetaFromConfigNode = metaFromConfigNode.getRuntimeMeta();
×
267
    executeSinglePipeRuntimeMetaChanges(
×
268
        staticMetaFromConfigNode, runtimeMetaFromConfigNode, runtimeMetaOnDataNode);
269
  }
×
270

271
  private void executeSinglePipeRuntimeMetaChanges(
272
      @NotNull PipeStaticMeta pipeStaticMeta,
273
      @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
274
      @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
275
    // 1. Handle data region group leader changed first
276
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapFromConfigNode =
×
277
        runtimeMetaFromConfigNode.getConsensusGroupId2TaskMetaMap();
×
278
    final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMapOnDataNode =
×
279
        runtimeMetaOnDataNode.getConsensusGroupId2TaskMetaMap();
×
280

281
    // 1.1 Iterate over all consensus group ids in config node's pipe runtime meta, decide if we
282
    // need to drop and create a new task for each consensus group id
283
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
284
        consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
×
285
      final TConsensusGroupId consensusGroupIdFromConfigNode = entryFromConfigNode.getKey();
×
286

287
      final PipeTaskMeta taskMetaFromConfigNode = entryFromConfigNode.getValue();
×
288
      final PipeTaskMeta taskMetaOnDataNode =
×
289
          consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
×
290

291
      // If task meta does not exist on data node, create a new task
292
      if (taskMetaOnDataNode == null) {
×
293
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
294
        // We keep the new created task's status consistent with the status recorded in data node's
295
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
296
        // is not reliable, but we will have a check later to make sure the status is correct.
297
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
298
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
299
        }
300
        continue;
301
      }
302

303
      // If task meta exists on data node, check if it has changed
304
      final int dataNodeIdFromConfigNode = taskMetaFromConfigNode.getLeaderDataNodeId();
×
305
      final int dataNodeIdOnDataNode = taskMetaOnDataNode.getLeaderDataNodeId();
×
306

307
      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
×
308
        dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
309
        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, taskMetaFromConfigNode);
×
310
        // We keep the new created task's status consistent with the status recorded in data node's
311
        // pipe runtime meta. please note that the status recorded in data node's pipe runtime meta
312
        // is not reliable, but we will have a check later to make sure the status is correct.
313
        if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
×
314
          startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
×
315
        }
316
      }
317
    }
×
318

319
    // 1.2 Iterate over all consensus group ids on data node's pipe runtime meta, decide if we need
320
    // to drop any task. we do not need to create any new task here because we have already done
321
    // that in 1.1.
322
    for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
323
        consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
×
324
      final TConsensusGroupId consensusGroupIdOnDataNode = entryOnDataNode.getKey();
×
325
      final PipeTaskMeta taskMetaFromConfigNode =
×
326
          consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
×
327
      if (taskMetaFromConfigNode == null) {
×
328
        dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
×
329
      }
330
    }
×
331

332
    // 2. Handle pipe runtime meta status changes
333
    final PipeStatus statusFromConfigNode = runtimeMetaFromConfigNode.getStatus().get();
×
334
    final PipeStatus statusOnDataNode = runtimeMetaOnDataNode.getStatus().get();
×
335
    if (statusFromConfigNode == statusOnDataNode) {
×
336
      return;
×
337
    }
338

339
    switch (statusFromConfigNode) {
×
340
      case RUNNING:
341
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
×
342
          startPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
343
        } else {
344
          throw new IllegalStateException(
×
345
              String.format(
×
346
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
347
        }
348
        break;
349
      case STOPPED:
350
        if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
×
351
          stopPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
352
        } else {
353
          throw new IllegalStateException(
×
354
              String.format(
×
355
                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, pipeStaticMeta.getPipeName()));
×
356
        }
357
        break;
358
      case DROPPED:
359
        // This should not happen, but we still handle it here
360
        dropPipe(pipeStaticMeta.getPipeName(), pipeStaticMeta.getCreationTime());
×
361
        break;
×
362
      default:
363
        throw new IllegalStateException(
×
364
            String.format(
×
365
                MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, pipeStaticMeta.getPipeName()));
×
366
    }
367
  }
×
368

369
  public synchronized void dropAllPipeTasks() {
370
    acquireWriteLock();
×
371
    try {
372
      dropAllPipeTasksInternal();
×
373
    } finally {
374
      releaseWriteLock();
×
375
    }
376
  }
×
377

378
  private void dropAllPipeTasksInternal() {
379
    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
380
      try {
381
        dropPipe(
×
382
            pipeMeta.getStaticMeta().getPipeName(), pipeMeta.getStaticMeta().getCreationTime());
×
383
      } catch (final Exception e) {
×
384
        LOGGER.warn(
×
385
            "Failed to drop pipe {} with creation time {}",
386
            pipeMeta.getStaticMeta().getPipeName(),
×
387
            pipeMeta.getStaticMeta().getCreationTime(),
×
388
            e);
389
      }
×
390
    }
×
391
  }
×
392

393
  public synchronized void stopAllPipesWithCriticalException() {
394
    acquireWriteLock();
×
395
    try {
396
      stopAllPipesWithCriticalExceptionInternal();
×
397
    } finally {
398
      releaseWriteLock();
×
399
    }
400
  }
×
401

402
  private void stopAllPipesWithCriticalExceptionInternal() {
403
    pipeMetaKeeper
×
404
        .getPipeMetaList()
×
405
        .forEach(
×
406
            pipeMeta -> {
407
              final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
×
408
              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
×
409

410
              if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
×
411
                runtimeMeta
×
412
                    .getConsensusGroupId2TaskMetaMap()
×
413
                    .values()
×
414
                    .forEach(
×
415
                        pipeTaskMeta -> {
416
                          for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
×
417
                            if (e instanceof PipeRuntimeCriticalException) {
×
418
                              stopPipe(staticMeta.getPipeName(), staticMeta.getCreationTime());
×
419
                              return;
×
420
                            }
421
                          }
×
422
                        });
×
423
              }
424
            });
×
425
  }
×
426

427
  ////////////////////////// Manage by Pipe Name //////////////////////////
428

429
  /**
430
   * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create
431
   * the pipe and return true.
432
   *
433
   * @param pipeMetaFromConfigNode pipe meta from config node
434
   * @return true if the pipe is created successfully and should be started, false if the pipe
435
   *     already exists or is created but should not be started
436
   * @throws IllegalStateException if the status is illegal
437
   */
438
  private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
439
    final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
×
440
    final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
×
441

442
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
443
    if (existedPipeMeta != null) {
×
444
      if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
×
445
        final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
446
        switch (status) {
×
447
          case STOPPED:
448
          case RUNNING:
449
            if (LOGGER.isInfoEnabled()) {
×
450
              LOGGER.info(
×
451
                  "Pipe {} (creation time = {}) has already been created. "
452
                      + "Current status = {}. Skip creating.",
453
                  pipeName,
454
                  creationTime,
×
455
                  status.name());
×
456
            }
457
            return false;
×
458
          case DROPPED:
459
            if (LOGGER.isInfoEnabled()) {
×
460
              LOGGER.info(
×
461
                  "Pipe {} (creation time = {}) has already been dropped, "
462
                      + "but the pipe task meta has not been cleaned up. "
463
                      + "Current status = {}. Try dropping the pipe and recreating it.",
464
                  pipeName,
465
                  creationTime,
×
466
                  status.name());
×
467
            }
468
            // Break to drop the pipe and recreate it
469
            break;
470
          default:
471
            throw new IllegalStateException(
×
472
                MESSAGE_UNEXPECTED_PIPE_STATUS
473
                    + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
474
        }
475
      }
476

477
      // Drop the pipe if
478
      // 1. The pipe with the same name but with different creation time has been created before
479
      // 2. The pipe with the same name and the same creation time has been dropped before, but the
480
      //  pipe task meta has not been cleaned up
481
      dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
×
482
    }
483

484
    // Create pipe tasks and trigger create() method for each pipe task
485
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
486
        new PipeBuilder(pipeMetaFromConfigNode).build();
×
487
    for (PipeTask pipeTask : pipeTasks.values()) {
×
488
      pipeTask.create();
×
489
    }
×
490
    pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks);
×
491

492
    // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status
493
    // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial
494
    // status of a pipe, which makes the status transition logic simpler.
495
    final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
×
496
        pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
×
497
    final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING;
×
498
    pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
×
499

500
    pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
×
501

502
    // If the pipe status from config node is RUNNING, we will start the pipe later.
503
    return needToStartPipe;
×
504
  }
505

506
  private void dropPipe(String pipeName, long creationTime) {
507
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
508

509
    if (existedPipeMeta == null) {
×
510
      LOGGER.info(
×
511
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
512
              + "Skip dropping.",
513
          pipeName,
514
          creationTime);
×
515
      return;
×
516
    }
517
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
518
      LOGGER.info(
×
519
          "Pipe {} (creation time = {}) has been created but does not match "
520
              + "the creation time ({}) in dropPipe request. Skip dropping.",
521
          pipeName,
522
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
523
          creationTime);
×
524
      return;
×
525
    }
526

527
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
528
    // but the pipe task meta has not been cleaned up (in case of failure when executing
529
    // dropPipeTaskByConsensusGroup).
530
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
531

532
    // Drop pipe tasks and trigger drop() method for each pipe task
533
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
534
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
535
    if (pipeTasks == null) {
×
536
      LOGGER.info(
×
537
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
538
              + "Skip dropping.",
539
          pipeName,
540
          creationTime);
×
541
      return;
×
542
    }
543
    for (PipeTask pipeTask : pipeTasks.values()) {
×
544
      pipeTask.drop();
×
545
    }
×
546

547
    // Remove pipe meta from pipe meta keeper
548
    pipeMetaKeeper.removePipeMeta(pipeName);
×
549
  }
×
550

551
  private void dropPipe(String pipeName) {
552
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
553

554
    if (existedPipeMeta == null) {
×
555
      LOGGER.info(
×
556
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
557
      return;
×
558
    }
559

560
    // Mark pipe meta as dropped first. This will help us detect if the pipe meta has been dropped
561
    // but the pipe task meta has not been cleaned up (in case of failure when executing
562
    // dropPipeTaskByConsensusGroup).
563
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
×
564

565
    // Drop pipe tasks and trigger drop() method for each pipe task
566
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
567
        pipeTaskManager.removePipeTasks(existedPipeMeta.getStaticMeta());
×
568
    if (pipeTasks == null) {
×
569
      LOGGER.info(
×
570
          "Pipe {} has already been dropped or has not been created. Skip dropping.", pipeName);
571
      return;
×
572
    }
573
    for (PipeTask pipeTask : pipeTasks.values()) {
×
574
      pipeTask.drop();
×
575
    }
×
576

577
    // Remove pipe meta from pipe meta keeper
578
    pipeMetaKeeper.removePipeMeta(pipeName);
×
579
  }
×
580

581
  private void startPipe(String pipeName, long creationTime) {
582
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
583

584
    if (existedPipeMeta == null) {
×
585
      LOGGER.info(
×
586
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
587
              + "Skip starting.",
588
          pipeName,
589
          creationTime);
×
590
      return;
×
591
    }
592
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
593
      LOGGER.info(
×
594
          "Pipe {} (creation time = {}) has been created but does not match "
595
              + "the creation time ({}) in startPipe request. Skip starting.",
596
          pipeName,
597
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
598
          creationTime);
×
599
      return;
×
600
    }
601

602
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
603
    switch (status) {
×
604
      case STOPPED:
605
        if (LOGGER.isInfoEnabled()) {
×
606
          LOGGER.info(
×
607
              "Pipe {} (creation time = {}) has been created. Current status = {}. Starting.",
608
              pipeName,
609
              creationTime,
×
610
              status.name());
×
611
        }
612
        break;
613
      case RUNNING:
614
        if (LOGGER.isInfoEnabled()) {
×
615
          LOGGER.info(
×
616
              "Pipe {} (creation time = {}) has already been started. Current status = {}. "
617
                  + "Skip starting.",
618
              pipeName,
619
              creationTime,
×
620
              status.name());
×
621
        }
622
        return;
×
623
      case DROPPED:
624
        if (LOGGER.isInfoEnabled()) {
×
625
          LOGGER.info(
×
626
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
627
                  + "Skip starting.",
628
              pipeName,
629
              creationTime,
×
630
              status.name());
×
631
        }
632
        return;
×
633
      default:
634
        throw new IllegalStateException(
×
635
            MESSAGE_UNEXPECTED_PIPE_STATUS
636
                + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
×
637
    }
638

639
    // Trigger start() method for each pipe task
640
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
641
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
642
    if (pipeTasks == null) {
×
643
      LOGGER.info(
×
644
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
645
              + "Skip starting.",
646
          pipeName,
647
          creationTime);
×
648
      return;
×
649
    }
650
    for (PipeTask pipeTask : pipeTasks.values()) {
×
651
      pipeTask.start();
×
652
    }
×
653

654
    // Set pipe meta status to RUNNING
655
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
×
656
    // Clear exception messages if started successfully
657
    existedPipeMeta
×
658
        .getRuntimeMeta()
×
659
        .getConsensusGroupId2TaskMetaMap()
×
660
        .values()
×
661
        .forEach(PipeTaskMeta::clearExceptionMessages);
×
662
  }
×
663

664
  private void stopPipe(String pipeName, long creationTime) {
665
    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
×
666

667
    if (existedPipeMeta == null) {
×
668
      LOGGER.info(
×
669
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
670
              + "Skip stopping.",
671
          pipeName,
672
          creationTime);
×
673
      return;
×
674
    }
675
    if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
×
676
      LOGGER.info(
×
677
          "Pipe {} (creation time = {}) has been created but does not match "
678
              + "the creation time ({}) in stopPipe request. Skip stopping.",
679
          pipeName,
680
          existedPipeMeta.getStaticMeta().getCreationTime(),
×
681
          creationTime);
×
682
      return;
×
683
    }
684

685
    final PipeStatus status = existedPipeMeta.getRuntimeMeta().getStatus().get();
×
686
    switch (status) {
×
687
      case STOPPED:
688
        if (LOGGER.isInfoEnabled()) {
×
689
          LOGGER.info(
×
690
              "Pipe {} (creation time = {}) has already been stopped. Current status = {}. "
691
                  + "Skip stopping.",
692
              pipeName,
693
              creationTime,
×
694
              status.name());
×
695
        }
696
        return;
×
697
      case RUNNING:
698
        if (LOGGER.isInfoEnabled()) {
×
699
          LOGGER.info(
×
700
              "Pipe {} (creation time = {}) has been started. Current status = {}. Stopping.",
701
              pipeName,
702
              creationTime,
×
703
              status.name());
×
704
        }
705
        break;
706
      case DROPPED:
707
        if (LOGGER.isInfoEnabled()) {
×
708
          LOGGER.info(
×
709
              "Pipe {} (creation time = {}) has already been dropped. Current status = {}. "
710
                  + "Skip stopping.",
711
              pipeName,
712
              creationTime,
×
713
              status.name());
×
714
        }
715
        return;
×
716
      default:
717
        throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + status.name());
×
718
    }
719

720
    // Trigger stop() method for each pipe task
721
    final Map<TConsensusGroupId, PipeTask> pipeTasks =
×
722
        pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
×
723
    if (pipeTasks == null) {
×
724
      LOGGER.info(
×
725
          "Pipe {} (creation time = {}) has already been dropped or has not been created. "
726
              + "Skip stopping.",
727
          pipeName,
728
          creationTime);
×
729
      return;
×
730
    }
731
    for (PipeTask pipeTask : pipeTasks.values()) {
×
732
      pipeTask.stop();
×
733
    }
×
734

735
    // Set pipe meta status to STOPPED
736
    existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
×
737
  }
×
738

739
  ///////////////////////// Manage by dataRegionGroupId /////////////////////////
740

741
  private void createPipeTask(
742
      TConsensusGroupId consensusGroupId,
743
      PipeStaticMeta pipeStaticMeta,
744
      PipeTaskMeta pipeTaskMeta) {
745
    if (pipeTaskMeta.getLeaderDataNodeId() == CONFIG.getDataNodeId()) {
×
746
      final PipeTask pipeTask =
×
747
          new PipeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build();
×
748
      pipeTask.create();
×
749
      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
×
750
    }
751
    pipeMetaKeeper
×
752
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
753
        .getRuntimeMeta()
×
754
        .getConsensusGroupId2TaskMetaMap()
×
755
        .put(consensusGroupId, pipeTaskMeta);
×
756
  }
×
757

758
  private void dropPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
759
    pipeMetaKeeper
×
760
        .getPipeMeta(pipeStaticMeta.getPipeName())
×
761
        .getRuntimeMeta()
×
762
        .getConsensusGroupId2TaskMetaMap()
×
763
        .remove(dataRegionGroupId);
×
764
    final PipeTask pipeTask = pipeTaskManager.removePipeTask(pipeStaticMeta, dataRegionGroupId);
×
765
    if (pipeTask != null) {
×
766
      pipeTask.drop();
×
767
    }
768
  }
×
769

770
  private void startPipeTask(TConsensusGroupId dataRegionGroupId, PipeStaticMeta pipeStaticMeta) {
771
    final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, dataRegionGroupId);
×
772
    if (pipeTask != null) {
×
773
      pipeTask.start();
×
774
    }
775
  }
×
776

777
  ///////////////////////// Heartbeat /////////////////////////
778

779
  public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
780
      throws TException {
781
    // If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't
782
    // need to be acquired
783
    if (!req.isNeedPipeMetaList()) {
×
784
      return;
×
785
    }
786
    // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat
787
    // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode
788
    if (!tryReadLockWithTimeOut(10)) {
×
789
      return;
×
790
    }
791
    try {
792
      collectPipeMetaListInternal(resp);
×
793
    } finally {
794
      releaseReadLock();
×
795
    }
796
  }
×
797

798
  private void collectPipeMetaListInternal(THeartbeatResp resp) throws TException {
799
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
800
    if (PipeAgent.runtime().isShutdown()) {
×
801
      return;
×
802
    }
803

804
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
805
    try {
806
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
807
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
808
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
809
      }
×
810
    } catch (IOException e) {
×
811
      throw new TException(e);
×
812
    }
×
813
    resp.setPipeMetaList(pipeMetaBinaryList);
×
814
  }
×
815

816
  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
817
      throws TException {
818
    acquireReadLock();
×
819
    try {
820
      collectPipeMetaListInternal(req, resp);
×
821
    } finally {
822
      releaseReadLock();
×
823
    }
824
  }
×
825

826
  private void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
827
      throws TException {
828
    // Do nothing if data node is removing or removed, or request does not need pipe meta list
829
    if (PipeAgent.runtime().isShutdown()) {
×
830
      return;
×
831
    }
832
    LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
×
833

834
    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
×
835
    try {
836
      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
×
837
        pipeMetaBinaryList.add(pipeMeta.serialize());
×
838
        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
×
839
      }
×
840
    } catch (IOException e) {
×
841
      throw new TException(e);
×
842
    }
×
843
    resp.setPipeMetaList(pipeMetaBinaryList);
×
844
  }
×
845
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc