• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9749

pending completion
#9749

push

travis_ci

web-flow
[IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)

This commit fixes 2 issues:

* Subscrption running with the pattern option may cause OOM

  How to reproduce:

  1. execute sql:
  ```
  create pipe test1
  with extractor (
    'extractor.history.enable'='false',
    'extractor'='iotdb-extractor',
    'extractor.realtime.mode'='log',
    'extractor.pattern'='root'
  )
  with connector (
    'connector'='iotdb-thrift-connector-v1',
    'connector.node-urls'='127.0.0.1:6668'
  );

  start pipe test1;
  ```

  2. run benchmark: 1 database, 10 devices, 10 measurements.

* java.lang.UnsupportedOperationException: Unsupported PipeRuntimeException type 0 caused by de/ser issue of RecoverProgressIndex
  <img width="1194" alt="image" src="https://github.com/apache/iotdb/assets/30497621/d2d35ee7-293b-4594-92f3-fc10b2aa8313">

(cherry picked from commit f0f168249)

35 of 35 new or added lines in 7 files covered. (100.0%)

79409 of 165601 relevant lines covered (47.95%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.task.connection;
21

22
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
23
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
24
import org.apache.iotdb.pipe.api.collector.EventCollector;
25
import org.apache.iotdb.pipe.api.event.Event;
26

27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
import java.util.LinkedList;
31
import java.util.Queue;
32

33
public class PipeEventCollector implements EventCollector {
34

35
  private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
×
36

37
  private final BoundedBlockingPendingQueue<Event> pendingQueue;
38

39
  private final Queue<Event> bufferQueue;
40

41
  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
×
42
    this.pendingQueue = pendingQueue;
×
43
    bufferQueue = new LinkedList<>();
×
44
  }
×
45

46
  @Override
47
  public synchronized void collect(Event event) {
48
    if (event instanceof EnrichedEvent) {
×
49
      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
×
50
    }
51

52
    while (!bufferQueue.isEmpty()) {
×
53
      final Event bufferedEvent = bufferQueue.peek();
×
54
      // Try to put already buffered events into pending queue, if pending queue is full, wait for
55
      // pending queue to be available with timeout.
56
      if (pendingQueue.waitedOffer(bufferedEvent)) {
×
57
        bufferQueue.poll();
×
58
      } else {
59
        // If timeout, we judge whether the new event is a PipeRawTabletInsertionEvent. If it is,
60
        // we wait for pending queue to be available without timeout until the pending queue is
61
        // available. We don't put PipeRawTabletInsertionEvent into buffer queue, because it is
62
        // memory consuming, holding too many PipeRawTabletInsertionEvent in buffer queue may cause
63
        // OOM.
64
        if (event instanceof PipeRawTabletInsertionEvent) {
×
65
          if (pendingQueue.put(bufferedEvent)) {
×
66
            bufferQueue.poll();
×
67
          } else {
68
            LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
×
69
            bufferQueue.offer(event);
×
70
            return;
×
71
          }
72
        } else {
73
          bufferQueue.offer(event);
×
74
          return;
×
75
        }
76
      }
77
    }
×
78

79
    if (!pendingQueue.waitedOffer(event)) {
×
80
      // PipeRawTabletInsertionEvent is memory consuming, so we should not put it into buffer queue
81
      // when pending queue is full. Otherwise, it may cause OOM.
82
      if (event instanceof PipeRawTabletInsertionEvent) {
×
83
        if (!pendingQueue.put(event)) {
×
84
          LOGGER.warn("interrupted when putting event into pending queue, event: {}", event);
×
85
          bufferQueue.offer(event);
×
86
        }
87
      } else {
88
        bufferQueue.offer(event);
×
89
      }
90
    }
91
  }
×
92
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc