• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10026

07 Sep 2023 11:55AM UTC coverage: 47.63% (-0.09%) from 47.718%
#10026

push

travis_ci

web-flow
[To rel/1.2] Fix negative requested permit of CompactionWriteRateLimiter (#11065)

14 of 14 new or added lines in 1 file covered. (100.0%)

80239 of 168462 relevant lines covered (47.63%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.stage;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
24
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
25
import org.apache.iotdb.db.pipe.agent.PipeAgent;
26
import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
27
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
28
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
29
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
30
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
31
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
32
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
33
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
34
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
35
import org.apache.iotdb.pipe.api.PipeProcessor;
36
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
37
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
38
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
39
import org.apache.iotdb.pipe.api.event.Event;
40
import org.apache.iotdb.pipe.api.exception.PipeException;
41

42
public class PipeTaskProcessorStage extends PipeTaskStage {
43

44
  private final PipeProcessorSubtaskExecutor executor =
×
45
      PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
×
46

47
  private final PipeProcessorSubtask pipeProcessorSubtask;
48

49
  /**
50
   * @param pipeName pipe name
51
   * @param creationTime pipe creation time
52
   * @param pipeProcessorParameters used to create pipe processor
53
   * @param dataRegionId data region id
54
   * @param pipeExtractorInputEventSupplier used to input events from pipe extractor
55
   * @param pipeConnectorOutputPendingQueue used to output events to pipe connector
56
   * @throws PipeException if failed to validate or customize
57
   */
58
  public PipeTaskProcessorStage(
59
      String pipeName,
60
      long creationTime,
61
      PipeParameters pipeProcessorParameters,
62
      TConsensusGroupId dataRegionId,
63
      EventSupplier pipeExtractorInputEventSupplier,
64
      BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
×
65
    final PipeProcessor pipeProcessor =
66
        pipeProcessorParameters
67
                .getStringOrDefault(
×
68
                    PipeProcessorConstant.PROCESSOR_KEY,
69
                    BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
×
70
                .equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
×
71
            ? new DoNothingProcessor()
×
72
            : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
×
73

74
    // validate and customize should be called before createSubtask. this allows extractor exposing
75
    // exceptions in advance.
76
    try {
77
      // 1. validate processor parameters
78
      pipeProcessor.validate(new PipeParameterValidator(pipeProcessorParameters));
×
79

80
      // 2. customize processor
81
      final PipeProcessorRuntimeConfiguration runtimeConfiguration =
×
82
          new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(pipeName, creationTime));
83
      pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
×
84
    } catch (Exception e) {
×
85
      throw new PipeException(e.getMessage(), e);
×
86
    }
×
87

88
    // Should add creation time in taskID, because subtasks are stored in the hashmap
89
    // PipeProcessorSubtaskWorker.subtasks, and deleted subtasks will be removed by
90
    // a timed thread. If a pipe is deleted and created again before its subtask is
91
    // removed, the new subtask will have the same pipeName and dataRegionId as the
92
    // old one, so we need creationTime to make their hash code different in the map.
93
    final String taskId = pipeName + "_" + dataRegionId + "_" + creationTime;
×
94
    final PipeEventCollector pipeConnectorOutputEventCollector =
×
95
        new PipeEventCollector(pipeConnectorOutputPendingQueue);
96
    this.pipeProcessorSubtask =
×
97
        new PipeProcessorSubtask(
98
            taskId,
99
            pipeExtractorInputEventSupplier,
100
            pipeProcessor,
101
            pipeConnectorOutputEventCollector);
102
  }
×
103

104
  @Override
105
  public void createSubtask() throws PipeException {
106
    executor.register(pipeProcessorSubtask);
×
107
  }
×
108

109
  @Override
110
  public void startSubtask() throws PipeException {
111
    executor.start(pipeProcessorSubtask.getTaskID());
×
112
  }
×
113

114
  @Override
115
  public void stopSubtask() throws PipeException {
116
    executor.stop(pipeProcessorSubtask.getTaskID());
×
117
  }
×
118

119
  @Override
120
  public void dropSubtask() throws PipeException {
121
    executor.deregister(pipeProcessorSubtask.getTaskID());
×
122
  }
×
123
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc