• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9666

pending completion
#9666

push

travis_ci

web-flow
[IOTDB-5557] [RatisConsensus] Support linearizable read during recovery (#10597)

53 of 53 new or added lines in 4 files covered. (100.0%)

79130 of 165784 relevant lines covered (47.73%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.52
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.extractor;
21

22
import org.apache.iotdb.commons.consensus.DataRegionId;
23
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
24
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
25
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
26
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
27
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
28
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
29
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor;
30
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
31
import org.apache.iotdb.db.storageengine.StorageEngine;
32
import org.apache.iotdb.pipe.api.PipeExtractor;
33
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
34
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
35
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
36
import org.apache.iotdb.pipe.api.event.Event;
37
import org.apache.iotdb.pipe.api.exception.PipeException;
38

39
import org.slf4j.Logger;
40
import org.slf4j.LoggerFactory;
41

42
import java.util.concurrent.atomic.AtomicBoolean;
43
import java.util.concurrent.atomic.AtomicReference;
44

45
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
46
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
47
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
48
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
49
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG;
50
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
51
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
52

53
public class IoTDBDataRegionExtractor implements PipeExtractor {
54

55
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
1✔
56

57
  private final AtomicBoolean hasBeenStarted;
58

59
  private PipeHistoricalDataRegionExtractor historicalExtractor;
60
  private PipeRealtimeDataRegionExtractor realtimeExtractor;
61

62
  private int dataRegionId;
63

64
  public IoTDBDataRegionExtractor() {
1✔
65
    this.hasBeenStarted = new AtomicBoolean(false);
1✔
66
  }
1✔
67

68
  @Override
69
  public void validate(PipeParameterValidator validator) throws Exception {
70
    // Validate extractor.history.enable and extractor.realtime.enable
71
    validator
1✔
72
        .validateAttributeValueRange(
1✔
73
            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
74
        .validateAttributeValueRange(
1✔
75
            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
76
        .validate(
1✔
77
            args -> (boolean) args[0] || (boolean) args[1],
1✔
78
            String.format(
1✔
79
                "Should not set both %s and %s to false.",
80
                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
81
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true),
1✔
82
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
1✔
83

84
    // Validate extractor.realtime.mode
85
    if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
86
      validator.validateAttributeValueRange(
1✔
87
          EXTRACTOR_REALTIME_MODE,
88
          true,
89
          EXTRACTOR_REALTIME_MODE_FILE,
90
          EXTRACTOR_REALTIME_MODE_HYBRID,
91
          EXTRACTOR_REALTIME_MODE_LOG,
92
          EXTRACTOR_REALTIME_MODE_FORCED_LOG);
93
    }
94

95
    constructHistoricalExtractor();
1✔
96
    constructRealtimeExtractor(validator.getParameters());
1✔
97

98
    historicalExtractor.validate(validator);
1✔
99
    realtimeExtractor.validate(validator);
1✔
100
  }
1✔
101

102
  private void constructHistoricalExtractor() {
103
    // Enable historical extractor by default
104
    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
1✔
105
  }
1✔
106

107
  private void constructRealtimeExtractor(PipeParameters parameters) {
108
    // Enable realtime extractor by default
109
    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
110
      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
×
111
      LOGGER.info("'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE);
×
112
      return;
×
113
    }
114

115
    // Use hybrid mode by default
116
    if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
1✔
117
      realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
×
118
      LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE);
×
119
      return;
×
120
    }
121

122
    switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
1✔
123
      case EXTRACTOR_REALTIME_MODE_FILE:
124
        realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
×
125
        break;
×
126
      case EXTRACTOR_REALTIME_MODE_HYBRID:
127
      case EXTRACTOR_REALTIME_MODE_LOG:
128
        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
1✔
129
        break;
1✔
130
      case EXTRACTOR_REALTIME_MODE_FORCED_LOG:
131
        realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
×
132
        break;
×
133
      default:
134
        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
×
135
        if (LOGGER.isWarnEnabled()) {
×
136
          LOGGER.warn(
×
137
              "Unsupported extractor realtime mode: {}, create a hybrid extractor.",
138
              parameters.getString(EXTRACTOR_REALTIME_MODE));
×
139
        }
140
    }
141
  }
1✔
142

143
  @Override
144
  public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
145
      throws Exception {
146
    dataRegionId =
×
147
        ((PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId();
×
148

149
    historicalExtractor.customize(parameters, configuration);
×
150
    realtimeExtractor.customize(parameters, configuration);
×
151
  }
×
152

153
  @Override
154
  public void start() throws Exception {
155
    if (hasBeenStarted.get()) {
×
156
      return;
×
157
    }
158
    hasBeenStarted.set(true);
×
159

160
    final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
×
161
    final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId);
×
162
    while (true) {
163
      // try to start extractors in the data region ...
164
      // first try to run if data region exists, then try to run if data region does not exist.
165
      // both conditions fail is not common, which means the data region is created during the
166
      // runIfPresent and runIfAbsent operations. in this case, we need to retry.
167
      if (StorageEngine.getInstance()
×
168
              .runIfPresent(
×
169
                  dataRegionIdObject,
170
                  (dataRegion -> {
171
                    dataRegion.writeLock(
×
172
                        String.format(
×
173
                            "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
×
174
                    try {
175
                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
×
176
                    } finally {
177
                      dataRegion.writeUnlock();
×
178
                    }
179
                  }))
×
180
          || StorageEngine.getInstance()
×
181
              .runIfAbsent(
×
182
                  dataRegionIdObject,
183
                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
×
184
        rethrowExceptionIfAny(exceptionHolder);
×
185
        return;
×
186
      }
187
      rethrowExceptionIfAny(exceptionHolder);
×
188
    }
189
  }
190

191
  private void startHistoricalExtractorAndRealtimeExtractor(
192
      AtomicReference<Exception> exceptionHolder) {
193
    try {
194
      historicalExtractor.start();
×
195
      realtimeExtractor.start();
×
196
    } catch (Exception e) {
×
197
      exceptionHolder.set(e);
×
198
      LOGGER.warn(
×
199
          String.format(
×
200
              "Start historical extractor %s and realtime extractor %s error.",
201
              historicalExtractor, realtimeExtractor),
202
          e);
203
    }
×
204
  }
×
205

206
  private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
207
    if (exceptionHolder.get() != null) {
×
208
      throw new PipeException("failed to start extractors.", exceptionHolder.get());
×
209
    }
210
  }
×
211

212
  @Override
213
  public Event supply() throws Exception {
214
    return historicalExtractor.hasConsumedAll()
×
215
        ? realtimeExtractor.supply()
×
216
        : historicalExtractor.supply();
×
217
  }
218

219
  @Override
220
  public void close() throws Exception {
221
    historicalExtractor.close();
×
222
    realtimeExtractor.close();
×
223
  }
×
224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc