• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.14
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.extractor;
21

22
import org.apache.iotdb.commons.consensus.DataRegionId;
23
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
24
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
25
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
26
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
27
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
28
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
29
import org.apache.iotdb.db.storageengine.StorageEngine;
30
import org.apache.iotdb.pipe.api.PipeExtractor;
31
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
32
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
33
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
34
import org.apache.iotdb.pipe.api.event.Event;
35
import org.apache.iotdb.pipe.api.exception.PipeException;
36

37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
import java.util.concurrent.atomic.AtomicBoolean;
41
import java.util.concurrent.atomic.AtomicReference;
42

43
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
44
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
45
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
46
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
47

48
public class IoTDBDataRegionExtractor implements PipeExtractor {
49

50
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
1✔
51

52
  private final AtomicBoolean hasBeenStarted;
53

54
  private PipeHistoricalDataRegionExtractor historicalExtractor;
55
  private PipeRealtimeDataRegionExtractor realtimeExtractor;
56

57
  private int dataRegionId;
58

59
  public IoTDBDataRegionExtractor() {
1✔
60
    this.hasBeenStarted = new AtomicBoolean(false);
1✔
61
  }
1✔
62

63
  @Override
64
  public void validate(PipeParameterValidator validator) throws Exception {
65
    // Validate extractor.history.enable and extractor.realtime.enable
66
    validator
1✔
67
        .validateAttributeValueRange(
1✔
68
            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
69
        .validateAttributeValueRange(
1✔
70
            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
71
        .validate(
1✔
72
            args -> (boolean) args[0] || (boolean) args[1],
1✔
73
            String.format(
1✔
74
                "Should not set both %s and %s to false.",
75
                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
76
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true),
1✔
77
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
1✔
78

79
    // Validate extractor.realtime.mode
80
    if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
81
      validator.validateAttributeValueRange(
1✔
82
          EXTRACTOR_REALTIME_MODE, true, EXTRACTOR_REALTIME_MODE_FILE);
83
    }
84

85
    constructHistoricalExtractor();
1✔
86
    constructRealtimeExtractor(validator.getParameters());
1✔
87

88
    historicalExtractor.validate(validator);
1✔
89
    realtimeExtractor.validate(validator);
1✔
90
  }
1✔
91

92
  private void constructHistoricalExtractor() {
93
    // Enable historical extractor by default
94
    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
1✔
95
  }
1✔
96

97
  private void constructRealtimeExtractor(PipeParameters parameters) {
98
    // Enable realtime extractor by default
99
    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
100
      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
×
101
      LOGGER.info("'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE);
×
102
      return;
×
103
    }
104

105
    realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
1✔
106
  }
1✔
107

108
  @Override
109
  public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
110
      throws Exception {
111
    dataRegionId =
×
112
        ((PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId();
×
113

114
    historicalExtractor.customize(parameters, configuration);
×
115
    realtimeExtractor.customize(parameters, configuration);
×
116
  }
×
117

118
  @Override
119
  public void start() throws Exception {
120
    if (hasBeenStarted.get()) {
×
121
      return;
×
122
    }
123
    hasBeenStarted.set(true);
×
124

125
    final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
×
126
    final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId);
×
127
    while (true) {
128
      // try to start extractors in the data region ...
129
      // first try to run if data region exists, then try to run if data region does not exist.
130
      // both conditions fail is not common, which means the data region is created during the
131
      // runIfPresent and runIfAbsent operations. in this case, we need to retry.
132
      if (StorageEngine.getInstance()
×
133
              .runIfPresent(
×
134
                  dataRegionIdObject,
135
                  (dataRegion -> {
136
                    dataRegion.writeLock(
×
137
                        String.format(
×
138
                            "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
×
139
                    try {
140
                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
×
141
                    } finally {
142
                      dataRegion.writeUnlock();
×
143
                    }
144
                  }))
×
145
          || StorageEngine.getInstance()
×
146
              .runIfAbsent(
×
147
                  dataRegionIdObject,
148
                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
×
149
        rethrowExceptionIfAny(exceptionHolder);
×
150
        return;
×
151
      }
152
      rethrowExceptionIfAny(exceptionHolder);
×
153
    }
154
  }
155

156
  private void startHistoricalExtractorAndRealtimeExtractor(
157
      AtomicReference<Exception> exceptionHolder) {
158
    try {
159
      historicalExtractor.start();
×
160
      realtimeExtractor.start();
×
161
    } catch (Exception e) {
×
162
      exceptionHolder.set(e);
×
163
      LOGGER.warn(
×
164
          String.format(
×
165
              "Start historical extractor %s and realtime extractor %s error.",
166
              historicalExtractor, realtimeExtractor),
167
          e);
168
    }
×
169
  }
×
170

171
  private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
172
    if (exceptionHolder.get() != null) {
×
173
      throw new PipeException("failed to start extractors.", exceptionHolder.get());
×
174
    }
175
  }
×
176

177
  @Override
178
  public Event supply() throws Exception {
179
    return historicalExtractor.hasConsumedAll()
×
180
        ? realtimeExtractor.supply()
×
181
        : historicalExtractor.supply();
×
182
  }
183

184
  @Override
185
  public void close() throws Exception {
186
    historicalExtractor.close();
×
187
    realtimeExtractor.close();
×
188
  }
×
189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc