• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9931

28 Aug 2023 02:32AM UTC coverage: 47.714% (-0.03%) from 47.739%
#9931

push

travis_ci

web-flow
[IOTDB-6127] Pipe: buffered events in processor stage can not be consumed by connector (#10962) (#10963)

(cherry picked from commit 1ae952ce1)

29 of 29 new or added lines in 7 files covered. (100.0%)

79971 of 167605 relevant lines covered (47.71%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.connection;
21

22
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
23
import org.apache.iotdb.pipe.api.collector.EventCollector;
24
import org.apache.iotdb.pipe.api.event.Event;
25

26
import java.util.LinkedList;
27
import java.util.Queue;
28

29
public class PipeEventCollector implements EventCollector {
30

31
  private final BoundedBlockingPendingQueue<Event> pendingQueue;
32

33
  private final Queue<Event> bufferQueue;
34

35
  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
×
36
    this.pendingQueue = pendingQueue;
×
37
    bufferQueue = new LinkedList<>();
×
38
  }
×
39

40
  @Override
41
  public synchronized void collect(Event event) {
42
    if (event instanceof EnrichedEvent) {
×
43
      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
×
44
    }
45

46
    while (!bufferQueue.isEmpty()) {
×
47
      final Event bufferedEvent = bufferQueue.peek();
×
48
      // Try to put already buffered events into pending queue, if pending queue is full, wait for
49
      // pending queue to be available with timeout.
50
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
51
        bufferQueue.poll();
×
52
      } else {
53
        bufferQueue.offer(event);
×
54
        return;
×
55
      }
56
    }
×
57

58
    if (!pendingQueue.waitedOffer(event)) {
×
59
      bufferQueue.offer(event);
×
60
    }
61
  }
×
62

63
  /**
64
   * Try to collect buffered events into pending queue.
65
   *
66
   * @return true if there are still buffered events after this operation, false otherwise.
67
   */
68
  public synchronized boolean tryCollectBufferedEvents() {
69
    while (!bufferQueue.isEmpty()) {
×
70
      final Event bufferedEvent = bufferQueue.peek();
×
71
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
72
        bufferQueue.poll();
×
73
      } else {
74
        return true;
×
75
      }
76
    }
×
77
    return false;
×
78
  }
79
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc