• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9890

22 Aug 2023 09:07AM UTC coverage: 47.922% (-0.07%) from 47.992%
#9890

push

travis_ci

web-flow
[IOTDB-6114] Pipe: Support multi-cluster data sync (#10868)(#10926)

306 of 306 new or added lines in 33 files covered. (100.0%)

79862 of 166649 relevant lines covered (47.92%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.17
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.event.realtime;
21

22
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
23
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
24
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
25
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
26

27
import java.util.Map;
28

29
/**
30
 * PipeRealtimeEvent is an event that decorates the EnrichedEvent with the information of
31
 * TsFileEpoch and schema info. It only exists in the realtime event extractor.
32
 */
33
public class PipeRealtimeEvent extends EnrichedEvent {
34

35
  private final EnrichedEvent event;
36
  private final TsFileEpoch tsFileEpoch;
37

38
  private Map<String, String[]> device2Measurements;
39

40
  public PipeRealtimeEvent(
41
      EnrichedEvent event,
42
      TsFileEpoch tsFileEpoch,
43
      Map<String, String[]> device2Measurements,
44
      String pattern) {
45
    // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent
46
    // is only used in the realtime event extractor, which does not need to report the progress
47
    // of the event, so the pipeTaskMeta is always null.
48
    super(null, pattern);
1✔
49

50
    this.event = event;
1✔
51
    this.tsFileEpoch = tsFileEpoch;
1✔
52
    this.device2Measurements = device2Measurements;
1✔
53
  }
1✔
54

55
  public PipeRealtimeEvent(
56
      EnrichedEvent event,
57
      TsFileEpoch tsFileEpoch,
58
      Map<String, String[]> device2Measurements,
59
      PipeTaskMeta pipeTaskMeta,
60
      String pattern) {
61
    // pipeTaskMeta is used to report the progress of the event, the PipeRealtimeEvent
62
    // is only used in the realtime event extractor, which does not need to report the progress
63
    // of the event, so the pipeTaskMeta is always null.
64
    super(pipeTaskMeta, pattern);
1✔
65

66
    this.event = event;
1✔
67
    this.tsFileEpoch = tsFileEpoch;
1✔
68
    this.device2Measurements = device2Measurements;
1✔
69
  }
1✔
70

71
  public EnrichedEvent getEvent() {
72
    return event;
1✔
73
  }
74

75
  public TsFileEpoch getTsFileEpoch() {
76
    return tsFileEpoch;
1✔
77
  }
78

79
  public Map<String, String[]> getSchemaInfo() {
80
    return device2Measurements;
1✔
81
  }
82

83
  public void gcSchemaInfo() {
84
    device2Measurements = null;
1✔
85
  }
1✔
86

87
  @Override
88
  public boolean increaseReferenceCount(String holderMessage) {
89
    // This method must be overridden, otherwise during the real-time data extraction stage, the
90
    // current PipeRealtimeEvent rather than the member variable EnrichedEvent will increase
91
    // the reference count, resulting in errors in the reference count of the EnrichedEvent
92
    // contained in this PipeRealtimeEvent during the processor and connector stages.
93
    return event.increaseReferenceCount(holderMessage);
1✔
94
  }
95

96
  @Override
97
  public boolean internallyIncreaseResourceReferenceCount(String holderMessage) {
98
    return event.internallyIncreaseResourceReferenceCount(holderMessage);
×
99
  }
100

101
  @Override
102
  public boolean decreaseReferenceCount(String holderMessage) {
103
    // This method must be overridden, otherwise during the real-time data extraction stage, the
104
    // current PipeRealtimeEvent rather than the member variable EnrichedEvent will increase
105
    // the reference count, resulting in errors in the reference count of the EnrichedEvent
106
    // contained in this PipeRealtimeEvent during the processor and connector stages.
107
    return event.decreaseReferenceCount(holderMessage);
1✔
108
  }
109

110
  @Override
111
  public boolean internallyDecreaseResourceReferenceCount(String holderMessage) {
112
    return event.internallyDecreaseResourceReferenceCount(holderMessage);
×
113
  }
114

115
  @Override
116
  public ProgressIndex getProgressIndex() {
117
    return event.getProgressIndex();
×
118
  }
119

120
  @Override
121
  public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
122
      PipeTaskMeta pipeTaskMeta, String pattern) {
123
    return new PipeRealtimeEvent(
1✔
124
        event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, pattern),
1✔
125
        this.tsFileEpoch,
126
        this.device2Measurements,
127
        pipeTaskMeta,
128
        pattern);
129
  }
130

131
  @Override
132
  public boolean isGeneratedByPipe() {
133
    return event.isGeneratedByPipe();
×
134
  }
135

136
  @Override
137
  public String toString() {
138
    return "PipeRealtimeEvent{"
×
139
        + "event="
140
        + event
141
        + ", tsFileEpoch="
142
        + tsFileEpoch
143
        + ", device2Measurements="
144
        + device2Measurements
145
        + '}';
146
  }
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc