• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9911

23 Aug 2023 12:56PM UTC coverage: 47.795% (-0.02%) from 47.813%
#9911

push

travis_ci

web-flow
[IOTDB-6083] Pipe: Fix subscrption running with the pattern option causing OOM & make PipeRawTabletInsertionEvent able to report progress to avoid losing data (#10865) (#10944)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit 332521a32)

112 of 112 new or added lines in 12 files covered. (100.0%)

79935 of 167246 relevant lines covered (47.79%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.connection;
21

22
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
23
import org.apache.iotdb.pipe.api.collector.EventCollector;
24
import org.apache.iotdb.pipe.api.event.Event;
25

26
import org.slf4j.Logger;
27
import org.slf4j.LoggerFactory;
28

29
import java.util.LinkedList;
30
import java.util.Queue;
31

32
public class PipeEventCollector implements EventCollector {
33

34
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
×
35

36
  private final BoundedBlockingPendingQueue<Event> pendingQueue;
37

38
  private final Queue<Event> bufferQueue;
39

40
  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
×
41
    this.pendingQueue = pendingQueue;
×
42
    bufferQueue = new LinkedList<>();
×
43
  }
×
44

45
  @Override
46
  public synchronized void collect(Event event) {
47
    if (event instanceof EnrichedEvent) {
×
48
      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
×
49
    }
50

51
    while (!bufferQueue.isEmpty()) {
×
52
      final Event bufferedEvent = bufferQueue.peek();
×
53
      // Try to put already buffered events into pending queue, if pending queue is full, wait for
54
      // pending queue to be available with timeout.
55
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
56
        bufferQueue.poll();
×
57
      } else {
58
        bufferQueue.offer(event);
×
59
        return;
×
60
      }
61
    }
×
62

63
    if (!pendingQueue.waitedOffer(event)) {
×
64
      bufferQueue.offer(event);
×
65
    }
66
  }
×
67
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc