• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9910

23 Aug 2023 10:24AM UTC coverage: 47.829% (-0.03%) from 47.856%
#9910

push

travis_ci

web-flow
[IOTDB-6103] Adding count_time aggregation feature (#10756)

403 of 403 new or added lines in 33 files covered. (100.0%)

80118 of 167508 relevant lines covered (47.83%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.52
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.extractor;
21

22
import org.apache.iotdb.commons.consensus.DataRegionId;
23
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
24
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
25
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
26
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
27
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
28
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
29
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor;
30
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
31
import org.apache.iotdb.db.storageengine.StorageEngine;
32
import org.apache.iotdb.pipe.api.PipeExtractor;
33
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
34
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
35
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
36
import org.apache.iotdb.pipe.api.event.Event;
37
import org.apache.iotdb.pipe.api.exception.PipeException;
38

39
import org.slf4j.Logger;
40
import org.slf4j.LoggerFactory;
41

42
import java.util.concurrent.atomic.AtomicBoolean;
43
import java.util.concurrent.atomic.AtomicReference;
44

45
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
46
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
47
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
48
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
49
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG;
50
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_HYBRID;
51
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_LOG;
52

53
public class IoTDBDataRegionExtractor implements PipeExtractor {
54

55
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
1✔
56

57
  private final AtomicBoolean hasBeenStarted;
58

59
  private PipeHistoricalDataRegionExtractor historicalExtractor;
60
  private PipeRealtimeDataRegionExtractor realtimeExtractor;
61

62
  private int dataRegionId;
63

64
  public IoTDBDataRegionExtractor() {
1✔
65
    this.hasBeenStarted = new AtomicBoolean(false);
1✔
66
  }
1✔
67

68
  @Override
69
  public void validate(PipeParameterValidator validator) throws Exception {
70
    // Validate extractor.history.enable and extractor.realtime.enable
71
    validator
1✔
72
        .validateAttributeValueRange(
1✔
73
            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
74
        .validateAttributeValueRange(
1✔
75
            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
76
        .validate(
1✔
77
            args -> (boolean) args[0] || (boolean) args[1],
1✔
78
            String.format(
1✔
79
                "Should not set both %s and %s to false.",
80
                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
81
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true),
1✔
82
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
1✔
83

84
    // Validate extractor.realtime.mode
85
    if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
86
      validator.validateAttributeValueRange(
1✔
87
          EXTRACTOR_REALTIME_MODE,
88
          true,
89
          EXTRACTOR_REALTIME_MODE_FILE,
90
          EXTRACTOR_REALTIME_MODE_HYBRID,
91
          EXTRACTOR_REALTIME_MODE_LOG,
92
          EXTRACTOR_REALTIME_MODE_FORCED_LOG);
93
    }
94

95
    constructHistoricalExtractor();
1✔
96
    constructRealtimeExtractor(validator.getParameters());
1✔
97

98
    historicalExtractor.validate(validator);
1✔
99
    realtimeExtractor.validate(validator);
1✔
100
  }
1✔
101

102
  private void constructHistoricalExtractor() {
103
    // Enable historical extractor by default
104
    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
1✔
105
  }
1✔
106

107
  private void constructRealtimeExtractor(PipeParameters parameters) {
108
    // Enable realtime extractor by default
109
    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
110
      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
×
111
      LOGGER.info("'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE);
×
112
      return;
×
113
    }
114

115
    // Use hybrid mode by default
116
    if (!parameters.hasAttribute(EXTRACTOR_REALTIME_MODE)) {
1✔
117
      realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
×
118
      LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE);
×
119
      return;
×
120
    }
121

122
    switch (parameters.getString(EXTRACTOR_REALTIME_MODE)) {
1✔
123
      case EXTRACTOR_REALTIME_MODE_FILE:
124
        realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
×
125
        break;
×
126
      case EXTRACTOR_REALTIME_MODE_HYBRID:
127
      case EXTRACTOR_REALTIME_MODE_LOG:
128
        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
1✔
129
        break;
1✔
130
      case EXTRACTOR_REALTIME_MODE_FORCED_LOG:
131
        realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
×
132
        break;
×
133
      default:
134
        realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
×
135
        if (LOGGER.isWarnEnabled()) {
×
136
          LOGGER.warn(
×
137
              "Unsupported extractor realtime mode: {}, create a hybrid extractor.",
138
              parameters.getString(EXTRACTOR_REALTIME_MODE));
×
139
        }
140
    }
141
  }
1✔
142

143
  @Override
144
  public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
145
      throws Exception {
146
    dataRegionId =
×
147
        ((PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId();
×
148

149
    historicalExtractor.customize(parameters, configuration);
×
150
    realtimeExtractor.customize(parameters, configuration);
×
151
  }
×
152

153
  @Override
154
  public void start() throws Exception {
155
    if (hasBeenStarted.get()) {
×
156
      return;
×
157
    }
158
    hasBeenStarted.set(true);
×
159

160
    final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
×
161
    final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId);
×
162
    while (true) {
163
      // try to start extractors in the data region ...
164
      // first try to run if data region exists, then try to run if data region does not exist.
165
      // both conditions fail is not common, which means the data region is created during the
166
      // runIfPresent and runIfAbsent operations. in this case, we need to retry.
167
      if (StorageEngine.getInstance()
×
168
              .runIfPresent(
×
169
                  dataRegionIdObject,
170
                  (dataRegion -> {
171
                    dataRegion.writeLock(
×
172
                        String.format(
×
173
                            "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
×
174
                    try {
175
                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
×
176
                    } finally {
177
                      dataRegion.writeUnlock();
×
178
                    }
179
                  }))
×
180
          || StorageEngine.getInstance()
×
181
              .runIfAbsent(
×
182
                  dataRegionIdObject,
183
                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
×
184
        rethrowExceptionIfAny(exceptionHolder);
×
185
        return;
×
186
      }
187
      rethrowExceptionIfAny(exceptionHolder);
×
188
    }
189
  }
190

191
  private void startHistoricalExtractorAndRealtimeExtractor(
192
      AtomicReference<Exception> exceptionHolder) {
193
    try {
194
      // Start realtimeExtractor first to avoid losing data. This may cause some
195
      // retransmission, yet it is OK according to the idempotency of IoTDB.
196
      // Note: The order of historical collection is flushing data -> adding all tsFile events.
197
      // There can still be writing when tsFile events are added. If we start
198
      // realtimeExtractor after the process, then this part of data will be lost.
199
      realtimeExtractor.start();
×
200
      historicalExtractor.start();
×
201
    } catch (Exception e) {
×
202
      exceptionHolder.set(e);
×
203
      LOGGER.warn(
×
204
          String.format(
×
205
              "Start historical extractor %s and realtime extractor %s error.",
206
              historicalExtractor, realtimeExtractor),
207
          e);
208
    }
×
209
  }
×
210

211
  private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
212
    if (exceptionHolder.get() != null) {
×
213
      throw new PipeException("failed to start extractors.", exceptionHolder.get());
×
214
    }
215
  }
×
216

217
  @Override
218
  public Event supply() throws Exception {
219
    return historicalExtractor.hasConsumedAll()
×
220
        ? realtimeExtractor.supply()
×
221
        : historicalExtractor.supply();
×
222
  }
223

224
  @Override
225
  public void close() throws Exception {
226
    historicalExtractor.close();
×
227
    realtimeExtractor.close();
×
228
  }
×
229
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc