• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9992

04 Sep 2023 08:51AM CUT coverage: 47.736% (-0.02%) from 47.756%
#9992

push

travis_ci

web-flow
Pipe: Report queue size in PipeHeartbeatEvent (#10997)

51 of 51 new or added lines in 7 files covered. (100.0%)

80365 of 168353 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.connection;
21

22
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
23
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
24
import org.apache.iotdb.pipe.api.collector.EventCollector;
25
import org.apache.iotdb.pipe.api.event.Event;
26

27
import java.util.Deque;
28
import java.util.LinkedList;
29

30
public class PipeEventCollector implements EventCollector {
31

32
  private final BoundedBlockingPendingQueue<Event> pendingQueue;
33

34
  private final Deque<Event> bufferQueue;
35

36
  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
×
37
    this.pendingQueue = pendingQueue;
×
38
    bufferQueue = new LinkedList<>();
×
39
  }
×
40

41
  @Override
42
  public synchronized void collect(Event event) {
43
    if (event instanceof EnrichedEvent) {
×
44
      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
×
45
    }
46
    if (event instanceof PipeHeartbeatEvent) {
×
47
      ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
×
48
      ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
×
49
    }
50

51
    while (!bufferQueue.isEmpty()) {
×
52
      final Event bufferedEvent = bufferQueue.peek();
×
53
      // Try to put already buffered events into pending queue, if pending queue is full, wait for
54
      // pending queue to be available with timeout.
55
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
56
        bufferQueue.poll();
×
57
      } else {
58
        // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because they may cause OOM.
59
        if (event instanceof PipeHeartbeatEvent
×
60
            && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
×
61
          ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName());
×
62
        } else {
63
          bufferQueue.offer(event);
×
64
        }
65
        return;
×
66
      }
67
    }
×
68

69
    if (!pendingQueue.waitedOffer(event)) {
×
70
      bufferQueue.offer(event);
×
71
    }
72
  }
×
73

74
  /**
75
   * Try to collect buffered events into pending queue.
76
   *
77
   * @return true if there are still buffered events after this operation, false otherwise.
78
   */
79
  public synchronized boolean tryCollectBufferedEvents() {
80
    while (!bufferQueue.isEmpty()) {
×
81
      final Event bufferedEvent = bufferQueue.peek();
×
82
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
83
        bufferQueue.poll();
×
84
      } else {
85
        return true;
×
86
      }
87
    }
×
88
    return false;
×
89
  }
90
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc