• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9941

28 Aug 2023 04:15PM CUT coverage: 47.749% (+0.01%) from 47.736%
#9941

push

travis_ci

web-flow
Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM (#10977)

17 of 17 new or added lines in 5 files covered. (100.0%)

80250 of 168065 relevant lines covered (47.75%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.connection;
21

22
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
23
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
24
import org.apache.iotdb.pipe.api.collector.EventCollector;
25
import org.apache.iotdb.pipe.api.event.Event;
26

27
import java.util.Deque;
28
import java.util.LinkedList;
29

30
public class PipeEventCollector implements EventCollector {
31

32
  private final BoundedBlockingPendingQueue<Event> pendingQueue;
33

34
  private final Deque<Event> bufferQueue;
35

36
  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
×
37
    this.pendingQueue = pendingQueue;
×
38
    bufferQueue = new LinkedList<>();
×
39
  }
×
40

41
  @Override
42
  public synchronized void collect(Event event) {
43
    if (event instanceof EnrichedEvent) {
×
44
      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
×
45
    }
46

47
    while (!bufferQueue.isEmpty()) {
×
48
      final Event bufferedEvent = bufferQueue.peek();
×
49
      // Try to put already buffered events into pending queue, if pending queue is full, wait for
50
      // pending queue to be available with timeout.
51
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
52
        bufferQueue.poll();
×
53
      } else {
54
        // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because they may cause OOM.
55
        if (event instanceof PipeHeartbeatEvent
×
56
            && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
×
57
          ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName());
×
58
        } else {
59
          bufferQueue.offer(event);
×
60
        }
61
        return;
×
62
      }
63
    }
×
64

65
    if (!pendingQueue.waitedOffer(event)) {
×
66
      bufferQueue.offer(event);
×
67
    }
68
  }
×
69

70
  /**
71
   * Try to collect buffered events into pending queue.
72
   *
73
   * @return true if there are still buffered events after this operation, false otherwise.
74
   */
75
  public synchronized boolean tryCollectBufferedEvents() {
76
    while (!bufferQueue.isEmpty()) {
×
77
      final Event bufferedEvent = bufferQueue.peek();
×
78
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
79
        bufferQueue.poll();
×
80
      } else {
81
        return true;
×
82
      }
83
    }
×
84
    return false;
×
85
  }
86
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc