• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10029

07 Sep 2023 05:31PM UTC coverage: 47.638% (+0.008%) from 47.63%
#10029

push

travis_ci

web-flow
Pipe: Clear reference count of on-the-fly EnrichedEvent in queues when pipe is dropped (#11077) (#11087)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit c4ff2b134)

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

57 of 57 new or added lines in 8 files covered. (100.0%)

80267 of 168493 relevant lines covered (47.64%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.event;
21

22
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
23
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
24
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
25
import org.apache.iotdb.db.pipe.agent.PipeAgent;
26
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
27
import org.apache.iotdb.pipe.api.event.Event;
28

29
import java.util.concurrent.atomic.AtomicInteger;
30

31
/**
32
 * EnrichedEvent is an event that can be enriched with additional runtime information. The
33
 * additional information mainly includes the reference count of the event.
34
 */
35
public abstract class EnrichedEvent implements Event {
36

37
  private final AtomicInteger referenceCount;
38

39
  protected final PipeTaskMeta pipeTaskMeta;
40

41
  private final String pattern;
42
  protected boolean isPatternAndTimeParsed;
43

44
  protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
1✔
45
    referenceCount = new AtomicInteger(0);
1✔
46
    this.pipeTaskMeta = pipeTaskMeta;
1✔
47
    this.pattern = pattern;
1✔
48
    isPatternAndTimeParsed =
1✔
49
        getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
1✔
50
  }
1✔
51

52
  /**
53
   * increase the reference count of this event. when the reference count is positive, the data in
54
   * the resource of this event should be safe to use.
55
   *
56
   * @param holderMessage the message of the invoker
57
   * @return true if the reference count is increased successfully, false if the event is not
58
   */
59
  public boolean increaseReferenceCount(String holderMessage) {
60
    boolean isSuccessful = true;
1✔
61
    synchronized (this) {
1✔
62
      if (referenceCount.get() == 0) {
1✔
63
        isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
1✔
64
      }
65
      referenceCount.incrementAndGet();
1✔
66
    }
1✔
67
    return isSuccessful;
1✔
68
  }
69

70
  /**
71
   * Increase the reference count of the resource of this event.
72
   *
73
   * @param holderMessage the message of the invoker
74
   * @return true if the reference count is increased successfully, false if the event is not
75
   *     controlled by the invoker, which means the data stored in the event is not safe to use
76
   */
77
  public abstract boolean internallyIncreaseResourceReferenceCount(String holderMessage);
78

79
  /**
80
   * Decrease the reference count of this event by 1. If the reference count is decreased to 0, the
81
   * event can be recycled and the data stored in the event is not safe to use, the processing
82
   * progress of the event should be reported to the pipe task meta.
83
   *
84
   * @param holderMessage the message of the invoker
85
   * @return true if the reference count is decreased successfully, false otherwise
86
   */
87
  public boolean decreaseReferenceCount(String holderMessage) {
88
    boolean isSuccessful = true;
1✔
89
    synchronized (this) {
1✔
90
      if (referenceCount.get() == 1) {
1✔
91
        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
1✔
92
        reportProgress();
1✔
93
      }
94
      referenceCount.decrementAndGet();
1✔
95
    }
1✔
96
    return isSuccessful;
1✔
97
  }
98

99
  /**
100
   * Decrease the reference count of this event to 0. The event can be recycled and the data stored
101
   * in the event is not safe to use, the processing progress of the event should be reported to the
102
   * pipe task meta.
103
   *
104
   * @param holderMessage the message of the invoker
105
   * @return true if the reference count is decreased successfully, false otherwise
106
   */
107
  public boolean clearReferenceCount(String holderMessage) {
108
    boolean isSuccessful = true;
×
109
    synchronized (this) {
×
110
      if (referenceCount.get() >= 1) {
×
111
        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
×
112
        reportProgress();
×
113
      }
114
      referenceCount.set(0);
×
115
    }
×
116
    return isSuccessful;
×
117
  }
118

119
  /**
120
   * Decrease the reference count of this event. If the reference count is decreased to 0, the event
121
   * can be recycled and the data stored in the event is not safe to use.
122
   *
123
   * @param holderMessage the message of the invoker
124
   * @return true if the reference count is decreased successfully, false otherwise
125
   */
126
  public abstract boolean internallyDecreaseResourceReferenceCount(String holderMessage);
127

128
  protected void reportProgress() {
129
    if (pipeTaskMeta != null) {
1✔
130
      pipeTaskMeta.updateProgressIndex(getProgressIndex());
×
131
    }
132
  }
1✔
133

134
  public abstract ProgressIndex getProgressIndex();
135

136
  /**
137
   * Get the reference count of this event.
138
   *
139
   * @return the reference count
140
   */
141
  public int getReferenceCount() {
142
    return referenceCount.get();
×
143
  }
144

145
  /**
146
   * Get the pattern of this event.
147
   *
148
   * @return the pattern
149
   */
150
  public final String getPattern() {
151
    return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
1✔
152
  }
153

154
  public boolean shouldParsePatternOrTime() {
155
    return !isPatternAndTimeParsed;
1✔
156
  }
157

158
  public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
159
      PipeTaskMeta pipeTaskMeta, String pattern);
160

161
  public void reportException(PipeRuntimeException pipeRuntimeException) {
162
    if (pipeTaskMeta != null) {
×
163
      PipeAgent.runtime().report(pipeTaskMeta, pipeRuntimeException);
×
164
    }
165
  }
×
166

167
  public abstract boolean isGeneratedByPipe();
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc