• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9911

23 Aug 2023 12:56PM UTC coverage: 47.795% (-0.02%) from 47.813%
#9911

push

travis_ci

web-flow
[IOTDB-6083] Pipe: Fix subscrption running with the pattern option causing OOM & make PipeRawTabletInsertionEvent able to report progress to avoid losing data (#10865) (#10944)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 332521a32)

112 of 112 new or added lines in 12 files covered. (100.0%)

79935 of 167246 relevant lines covered (47.79%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.14
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.extractor;
21

22
import org.apache.iotdb.commons.consensus.DataRegionId;
23
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
24
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
25
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
26
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
27
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
28
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
29
import org.apache.iotdb.db.storageengine.StorageEngine;
30
import org.apache.iotdb.pipe.api.PipeExtractor;
31
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
32
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
33
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
34
import org.apache.iotdb.pipe.api.event.Event;
35
import org.apache.iotdb.pipe.api.exception.PipeException;
36

37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
import java.util.concurrent.atomic.AtomicBoolean;
41
import java.util.concurrent.atomic.AtomicReference;
42

43
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
44
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE;
45
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE;
46
import static org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE;
47

48
public class IoTDBDataRegionExtractor implements PipeExtractor {
49

50
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
1✔
51

52
  private final AtomicBoolean hasBeenStarted;
53

54
  private PipeHistoricalDataRegionExtractor historicalExtractor;
55
  private PipeRealtimeDataRegionExtractor realtimeExtractor;
56

57
  private int dataRegionId;
58

59
  public IoTDBDataRegionExtractor() {
1✔
60
    this.hasBeenStarted = new AtomicBoolean(false);
1✔
61
  }
1✔
62

63
  @Override
64
  public void validate(PipeParameterValidator validator) throws Exception {
65
    // Validate extractor.history.enable and extractor.realtime.enable
66
    validator
1✔
67
        .validateAttributeValueRange(
1✔
68
            EXTRACTOR_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
69
        .validateAttributeValueRange(
1✔
70
            EXTRACTOR_REALTIME_ENABLE, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
1✔
71
        .validate(
1✔
72
            args -> (boolean) args[0] || (boolean) args[1],
1✔
73
            String.format(
1✔
74
                "Should not set both %s and %s to false.",
75
                EXTRACTOR_HISTORY_ENABLE_KEY, EXTRACTOR_REALTIME_ENABLE),
76
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_HISTORY_ENABLE_KEY, true),
1✔
77
            validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true));
1✔
78

79
    // Validate extractor.realtime.mode
80
    if (validator.getParameters().getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
81
      validator.validateAttributeValueRange(
1✔
82
          EXTRACTOR_REALTIME_MODE, true, EXTRACTOR_REALTIME_MODE_FILE);
83
    }
84

85
    constructHistoricalExtractor();
1✔
86
    constructRealtimeExtractor(validator.getParameters());
1✔
87

88
    historicalExtractor.validate(validator);
1✔
89
    realtimeExtractor.validate(validator);
1✔
90
  }
1✔
91

92
  private void constructHistoricalExtractor() {
93
    // Enable historical extractor by default
94
    historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
1✔
95
  }
1✔
96

97
  private void constructRealtimeExtractor(PipeParameters parameters) {
98
    // Enable realtime extractor by default
99
    if (!parameters.getBooleanOrDefault(EXTRACTOR_REALTIME_ENABLE, true)) {
1✔
100
      realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
×
101
      LOGGER.info("'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE);
×
102
      return;
×
103
    }
104

105
    realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
1✔
106
  }
1✔
107

108
  @Override
109
  public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
110
      throws Exception {
111
    dataRegionId =
×
112
        ((PipeTaskExtractorRuntimeEnvironment) configuration.getRuntimeEnvironment()).getRegionId();
×
113

114
    historicalExtractor.customize(parameters, configuration);
×
115
    realtimeExtractor.customize(parameters, configuration);
×
116
  }
×
117

118
  @Override
119
  public void start() throws Exception {
120
    if (hasBeenStarted.get()) {
×
121
      return;
×
122
    }
123
    hasBeenStarted.set(true);
×
124

125
    final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(null);
×
126
    final DataRegionId dataRegionIdObject = new DataRegionId(this.dataRegionId);
×
127
    while (true) {
128
      // try to start extractors in the data region ...
129
      // first try to run if data region exists, then try to run if data region does not exist.
130
      // both conditions fail is not common, which means the data region is created during the
131
      // runIfPresent and runIfAbsent operations. in this case, we need to retry.
132
      if (StorageEngine.getInstance()
×
133
              .runIfPresent(
×
134
                  dataRegionIdObject,
135
                  (dataRegion -> {
136
                    dataRegion.writeLock(
×
137
                        String.format(
×
138
                            "Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
×
139
                    try {
140
                      startHistoricalExtractorAndRealtimeExtractor(exceptionHolder);
×
141
                    } finally {
142
                      dataRegion.writeUnlock();
×
143
                    }
144
                  }))
×
145
          || StorageEngine.getInstance()
×
146
              .runIfAbsent(
×
147
                  dataRegionIdObject,
148
                  () -> startHistoricalExtractorAndRealtimeExtractor(exceptionHolder))) {
×
149
        rethrowExceptionIfAny(exceptionHolder);
×
150
        return;
×
151
      }
152
      rethrowExceptionIfAny(exceptionHolder);
×
153
    }
154
  }
155

156
  private void startHistoricalExtractorAndRealtimeExtractor(
157
      AtomicReference<Exception> exceptionHolder) {
158
    try {
159
      // Start realtimeExtractor first to avoid losing data. This may cause some
160
      // retransmission, yet it is OK according to the idempotency of IoTDB.
161
      // Note: The order of historical collection is flushing data -> adding all tsFile events.
162
      // There can still be writing when tsFile events are added. If we start
163
      // realtimeExtractor after the process, then this part of data will be lost.
164
      realtimeExtractor.start();
×
165
      historicalExtractor.start();
×
166
    } catch (Exception e) {
×
167
      exceptionHolder.set(e);
×
168
      LOGGER.warn(
×
169
          String.format(
×
170
              "Start historical extractor %s and realtime extractor %s error.",
171
              historicalExtractor, realtimeExtractor),
172
          e);
173
    }
×
174
  }
×
175

176
  private void rethrowExceptionIfAny(AtomicReference<Exception> exceptionHolder) {
177
    if (exceptionHolder.get() != null) {
×
178
      throw new PipeException("failed to start extractors.", exceptionHolder.get());
×
179
    }
180
  }
×
181

182
  @Override
183
  public Event supply() throws Exception {
184
    return historicalExtractor.hasConsumedAll()
×
185
        ? realtimeExtractor.supply()
×
186
        : historicalExtractor.supply();
×
187
  }
188

189
  @Override
190
  public void close() throws Exception {
191
    historicalExtractor.close();
×
192
    realtimeExtractor.close();
×
193
  }
×
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc