• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10029

07 Sep 2023 05:31PM UTC coverage: 47.638% (+0.008%) from 47.63%
#10029

push

travis_ci

web-flow
Pipe: Clear reference count of on-the-fly EnrichedEvent in queues when pipe is dropped (#11077) (#11087)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit c4ff2b134)

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

57 of 57 new or added lines in 8 files covered. (100.0%)

80267 of 168493 relevant lines covered (47.64%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.event.realtime;
21

22
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
23
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
24
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
25
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
26

27
import java.util.Map;
28

29
/**
30
 * PipeRealtimeEvent is an event that decorates the EnrichedEvent with the information of
31
 * TsFileEpoch and schema info. It only exists in the realtime event extractor.
32
 */
33
public class PipeRealtimeEvent extends EnrichedEvent {
34

35
  private final EnrichedEvent event;
36
  private final TsFileEpoch tsFileEpoch;
37

38
  private Map<String, String[]> device2Measurements;
39

40
  public PipeRealtimeEvent(
41
      EnrichedEvent event,
42
      TsFileEpoch tsFileEpoch,
43
      Map<String, String[]> device2Measurements,
44
      String pattern) {
45
    // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent
46
    // is only used in the realtime event extractor, which does not need to report the progress
47
    // of the event, so the pipeTaskMeta is always null.
48
    super(null, pattern);
1✔
49

50
    this.event = event;
1✔
51
    this.tsFileEpoch = tsFileEpoch;
1✔
52
    this.device2Measurements = device2Measurements;
1✔
53
  }
1✔
54

55
  public PipeRealtimeEvent(
56
      EnrichedEvent event,
57
      TsFileEpoch tsFileEpoch,
58
      Map<String, String[]> device2Measurements,
59
      PipeTaskMeta pipeTaskMeta,
60
      String pattern) {
61
    // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent
62
    // is only used in the realtime event extractor, which does not need to report the progress
63
    // of the event, so the pipeTaskMeta is always null.
64
    super(pipeTaskMeta, pattern);
1✔
65

66
    this.event = event;
1✔
67
    this.tsFileEpoch = tsFileEpoch;
1✔
68
    this.device2Measurements = device2Measurements;
1✔
69
  }
1✔
70

71
  public EnrichedEvent getEvent() {
72
    return event;
1✔
73
  }
74

75
  public TsFileEpoch getTsFileEpoch() {
76
    return tsFileEpoch;
1✔
77
  }
78

79
  public Map<String, String[]> getSchemaInfo() {
80
    return device2Measurements;
1✔
81
  }
82

83
  public void gcSchemaInfo() {
84
    device2Measurements = null;
1✔
85
  }
1✔
86

87
  @Override
88
  public boolean increaseReferenceCount(String holderMessage) {
89
    // This method must be overridden, otherwise during the real-time data extraction stage, the
90
    // current PipeRealtimeEvent rather than the member variable EnrichedEvent will increase
91
    // the reference count, resulting in errors in the reference count of the EnrichedEvent
92
    // contained in this PipeRealtimeEvent during the processor and connector stages.
93
    return event.increaseReferenceCount(holderMessage);
1✔
94
  }
95

96
  @Override
97
  public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
98
    return event.internallyIncreaseResourceReferenceCount(holderMessage);
×
99
  }
100

101
  @Override
102
  public boolean decreaseReferenceCount(String holderMessage) {
103
    // This method must be overridden, otherwise during the real-time data extraction stage, the
104
    // current PipeRealtimeEvent rather than the member variable EnrichedEvent will decrease
105
    // the reference count, resulting in errors in the reference count of the EnrichedEvent
106
    // contained in this PipeRealtimeEvent during the processor and connector stages.
107
    return event.decreaseReferenceCount(holderMessage);
1✔
108
  }
109

110
  @Override
111
  public boolean clearReferenceCount(String holderMessage) {
112
    // This method must be overridden, otherwise during the real-time data extraction stage, the
113
    // current PipeRealtimeEvent rather than the member variable EnrichedEvent will clear
114
    // the reference count.
115
    return event.clearReferenceCount(holderMessage);
×
116
  }
117

118
  @Override
119
  public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
120
    return event.internallyDecreaseResourceReferenceCount(holderMessage);
×
121
  }
122

123
  @Override
124
  public ProgressIndex getProgressIndex() {
125
    return event.getProgressIndex();
×
126
  }
127

128
  @Override
129
  public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
130
      PipeTaskMeta pipeTaskMeta, String pattern) {
131
    return new PipeRealtimeEvent(
1✔
132
        event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, pattern),
1✔
133
        this.tsFileEpoch,
134
        this.device2Measurements,
135
        pipeTaskMeta,
136
        pattern);
137
  }
138

139
  @Override
140
  public boolean isGeneratedByPipe() {
141
    return event.isGeneratedByPipe();
×
142
  }
143

144
  @Override
145
  public String toString() {
146
    return "PipeRealtimeEvent{"
×
147
        + "event="
148
        + event
149
        + ", tsFileEpoch="
150
        + tsFileEpoch
151
        + ", device2Measurements="
152
        + device2Measurements
153
        + '}';
154
  }
155
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc