• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9911

23 Aug 2023 12:56PM UTC coverage: 47.795% (-0.02%) from 47.813%
#9911

push

travis_ci

web-flow
[IOTDB-6083] Pipe: Fix subscrption running with the pattern option causing OOM & make PipeRawTabletInsertionEvent able to report progress to avoid losing data (#10865) (#10944)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 332521a32)

112 of 112 new or added lines in 12 files covered. (100.0%)

79935 of 167246 relevant lines covered (47.79%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.event;
21

22
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
24
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
25
import org.apache.iotdb.db.pipe.agent.PipeAgent;
26
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
27
import org.apache.iotdb.pipe.api.event.Event;
28

29
import java.util.concurrent.atomic.AtomicInteger;
30

31
/**
32
 * EnrichedEvent is an event that can be enriched with additional runtime information. The
33
 * additional information mainly includes the reference count of the event.
34
 */
35
public abstract class EnrichedEvent implements Event {
36

37
  private final AtomicInteger referenceCount;
38

39
  protected final PipeTaskMeta pipeTaskMeta;
40

41
  private final String pattern;
42
  private final boolean isPatternParsed;
43

44
  protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
1✔
45
    referenceCount = new AtomicInteger(0);
1✔
46
    this.pipeTaskMeta = pipeTaskMeta;
1✔
47
    this.pattern = pattern;
1✔
48
    isPatternParsed = getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
1✔
49
  }
1✔
50

51
  /**
52
   * increase the reference count of this event. when the reference count is positive, the data in
53
   * the resource of this event should be safe to use.
54
   *
55
   * @param holderMessage the message of the invoker
56
   * @return true if the reference count is increased successfully, false if the event is not
57
   */
58
  public boolean increaseReferenceCount(String holderMessage) {
59
    boolean isSuccessful = true;
1✔
60
    synchronized (this) {
1✔
61
      if (referenceCount.get() == 0) {
1✔
62
        isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
1✔
63
      }
64
      referenceCount.incrementAndGet();
1✔
65
    }
1✔
66
    return isSuccessful;
1✔
67
  }
68

69
  /**
70
   * Increase the reference count of the resource of this event.
71
   *
72
   * @param holderMessage the message of the invoker
73
   * @return true if the reference count is increased successfully, false if the event is not
74
   *     controlled by the invoker, which means the data stored in the event is not safe to use
75
   */
76
  public abstract boolean internallyIncreaseResourceReferenceCount(String holderMessage);
77

78
  /**
79
   * Decrease the reference count of this event. If the reference count is decreased to 0, the event
80
   * can be recycled and the data stored in the event is not safe to use, the processing progress of
81
   * the event should be reported to the pipe task meta.
82
   *
83
   * @param holderMessage the message of the invoker
84
   * @return true if the reference count is decreased successfully, false otherwise
85
   */
86
  public boolean decreaseReferenceCount(String holderMessage) {
87
    boolean isSuccessful = true;
1✔
88
    synchronized (this) {
1✔
89
      if (referenceCount.get() == 1) {
1✔
90
        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
1✔
91
        reportProgress();
1✔
92
      }
93
      referenceCount.decrementAndGet();
1✔
94
    }
1✔
95
    return isSuccessful;
1✔
96
  }
97

98
  /**
99
   * Decrease the reference count of this event. If the reference count is decreased to 0, the event
100
   * can be recycled and the data stored in the event is not safe to use.
101
   *
102
   * @param holderMessage the message of the invoker
103
   * @return true if the reference count is decreased successfully, false otherwise
104
   */
105
  public abstract boolean internallyDecreaseResourceReferenceCount(String holderMessage);
106

107
  protected void reportProgress() {
108
    if (pipeTaskMeta != null) {
1✔
109
      pipeTaskMeta.updateProgressIndex(getProgressIndex());
×
110
    }
111
  }
1✔
112

113
  public abstract ProgressIndex getProgressIndex();
114

115
  /**
116
   * Get the reference count of this event.
117
   *
118
   * @return the reference count
119
   */
120
  public int getReferenceCount() {
121
    return referenceCount.get();
×
122
  }
123

124
  /**
125
   * Get the pattern of this event.
126
   *
127
   * @return the pattern
128
   */
129
  public final String getPattern() {
130
    return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
1✔
131
  }
132

133
  public boolean shouldParsePattern() {
134
    return !isPatternParsed;
1✔
135
  }
136

137
  public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
138
      PipeTaskMeta pipeTaskMeta, String pattern);
139

140
  public void reportException(PipeRuntimeException pipeRuntimeException) {
141
    if (pipeTaskMeta != null) {
×
142
      PipeAgent.runtime().report(pipeTaskMeta, pipeRuntimeException);
×
143
    }
144
  }
×
145

146
  public abstract boolean isGeneratedByPipe();
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc