• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9749

pending completion
#9749

push

travis_ci

web-flow
[IOTDB-6097] Pipe: Avoid subscrption running with the pattern option causing OOM & Fix de/ser of RecoverProgressIndex (#10767) (#10775)

This commit fixes 2 issues:

* Subscrption running with the pattern option may cause OOM

  How to reproduce:

  1. execute sql:
  ```
  create pipe test1
  with extractor (
    'extractor.history.enable'='false',
    'extractor'='iotdb-extractor',
    'extractor.realtime.mode'='log',
    'extractor.pattern'='root'
  )
  with connector (
    'connector'='iotdb-thrift-connector-v1',
    'connector.node-urls'='127.0.0.1:6668'
  );

  start pipe test1;
  ```

  2. run benchmark: 1 database, 10 devices, 10 measurements.

* java.lang.UnsupportedOperationException: Unsupported PipeRuntimeException type 0 caused by de/ser issue of RecoverProgressIndex
  <img width="1194" alt="image" src="https://github.com/apache/iotdb/assets/30497621/d2d35ee7-293b-4594-92f3-fc10b2aa8313">

(cherry picked from commit f0f168249)

35 of 35 new or added lines in 7 files covered. (100.0%)

79409 of 165601 relevant lines covered (47.95%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.75
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.pipe.connector.v2;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.commons.client.ClientPoolFactory;
24
import org.apache.iotdb.commons.client.IClientManager;
25
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
26
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
29
import org.apache.iotdb.commons.conf.CommonDescriptor;
30
import org.apache.iotdb.db.pipe.connector.base.IoTDBThriftConnector;
31
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
32
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
33
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
34
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
35
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
36
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferRawTabletInsertionEventHandler;
37
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferTsFileInsertionEventHandler;
38
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
39
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
40
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
41
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
42
import org.apache.iotdb.pipe.api.PipeConnector;
43
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
44
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
45
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
46
import org.apache.iotdb.pipe.api.event.Event;
47
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
48
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
49
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
50
import org.apache.iotdb.pipe.api.exception.PipeException;
51
import org.apache.iotdb.rpc.TSStatusCode;
52
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
53
import org.apache.iotdb.tsfile.utils.Pair;
54

55
import org.apache.thrift.TException;
56
import org.apache.thrift.async.AsyncMethodCallback;
57
import org.slf4j.Logger;
58
import org.slf4j.LoggerFactory;
59

60
import javax.annotation.Nullable;
61

62
import java.util.Comparator;
63
import java.util.Optional;
64
import java.util.PriorityQueue;
65
import java.util.concurrent.Future;
66
import java.util.concurrent.ScheduledExecutorService;
67
import java.util.concurrent.TimeUnit;
68
import java.util.concurrent.atomic.AtomicBoolean;
69
import java.util.concurrent.atomic.AtomicLong;
70
import java.util.concurrent.atomic.AtomicReference;
71

72
public class IoTDBThriftConnectorV2 extends IoTDBThriftConnector {
73

74
  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
1✔
75

76
  private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
77
      "Failed to borrow client from client pool for receiver %s:%s.";
78

79
  private static final AtomicReference<
80
          IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
81
      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
1✔
82
  private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
83
      asyncPipeDataTransferClientManager;
84

85
  private static final AtomicReference<ScheduledExecutorService> RETRY_TRIGGER =
1✔
86
      new AtomicReference<>();
87
  private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
88
  private final AtomicReference<Future<?>> retryTriggerFuture = new AtomicReference<>();
1✔
89
  private final IoTDBThriftConnectorV1 retryConnector = new IoTDBThriftConnectorV1();
1✔
90
  private final PriorityQueue<Pair<Long, Event>> retryEventQueue =
1✔
91
      new PriorityQueue<>(Comparator.comparing(o -> o.left));
1✔
92

93
  private final AtomicLong commitIdGenerator = new AtomicLong(0);
1✔
94
  private final AtomicLong lastCommitId = new AtomicLong(0);
1✔
95
  private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
1✔
96
      new PriorityQueue<>(Comparator.comparing(o -> o.left));
1✔
97

98
  public IoTDBThriftConnectorV2() {
1✔
99
    if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
1✔
100
      synchronized (IoTDBThriftConnectorV2.class) {
1✔
101
        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
1✔
102
          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
1✔
103
              new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>()
104
                  .createClientManager(
1✔
105
                      new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
106
        }
107
      }
1✔
108
    }
109
    asyncPipeDataTransferClientManager = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
1✔
110
  }
1✔
111

112
  @Override
113
  public void validate(PipeParameterValidator validator) throws Exception {
114
    super.validate(validator);
1✔
115
    retryConnector.validate(validator);
1✔
116
  }
1✔
117

118
  @Override
119
  public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
120
      throws Exception {
121
    super.customize(parameters, configuration);
×
122
    retryConnector.customize(parameters, configuration);
×
123
  }
×
124

125
  @Override
126
  // synchronized to avoid close connector when transfer event
127
  public synchronized void handshake() throws Exception {
128
    retryConnector.handshake();
×
129
  }
×
130

131
  @Override
132
  public void heartbeat() {
133
    retryConnector.heartbeat();
×
134
  }
×
135

136
  @Override
137
  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
138
    transferQueuedEventsIfNecessary();
×
139

140
    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
×
141
        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
142
      LOGGER.warn(
×
143
          "IoTDBThriftConnectorV2 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
144
              + "Current event: {}.",
145
          tabletInsertionEvent);
146
      return;
×
147
    }
148

149
    final long requestCommitId = commitIdGenerator.incrementAndGet();
×
150

151
    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
×
152
      final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
×
153
          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
154
      final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
×
155
          PipeTransferInsertNodeReq.toTPipeTransferReq(
×
156
              pipeInsertNodeTabletInsertionEvent.getInsertNode());
×
157
      final PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler =
×
158
          new PipeTransferInsertNodeTabletInsertionEventHandler(
159
              requestCommitId, pipeInsertNodeTabletInsertionEvent, pipeTransferInsertNodeReq, this);
160

161
      transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
×
162
    } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
×
163
      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
×
164
          (PipeRawTabletInsertionEvent) tabletInsertionEvent;
165
      final PipeTransferTabletReq pipeTransferTabletReq =
×
166
          PipeTransferTabletReq.toTPipeTransferReq(
×
167
              pipeRawTabletInsertionEvent.convertToTablet(),
×
168
              pipeRawTabletInsertionEvent.isAligned());
×
169
      final PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler =
×
170
          new PipeTransferRawTabletInsertionEventHandler(
171
              requestCommitId, pipeRawTabletInsertionEvent, pipeTransferTabletReq, this);
172

173
      transfer(requestCommitId, pipeTransferTabletReqHandler);
×
174
    }
175
  }
×
176

177
  private void transfer(
178
      long requestCommitId,
179
      PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeReqHandler) {
180
    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
×
181

182
    try {
183
      final AsyncPipeDataTransferServiceClient client = borrowClient(targetNodeUrl);
×
184

185
      try {
186
        pipeTransferInsertNodeReqHandler.transfer(client);
×
187
      } catch (TException e) {
×
188
        LOGGER.warn(
×
189
            String.format(
×
190
                "Transfer insert node to receiver %s:%s error, retrying...",
191
                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
192
            e);
193
      }
×
194
    } catch (Exception ex) {
×
195
      pipeTransferInsertNodeReqHandler.onError(ex);
×
196
      LOGGER.warn(
×
197
          String.format(
×
198
              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
199
          ex);
200
    }
×
201
  }
×
202

203
  private void transfer(
204
      long requestCommitId,
205
      PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) {
206
    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
×
207

208
    try {
209
      final AsyncPipeDataTransferServiceClient client = borrowClient(targetNodeUrl);
×
210

211
      try {
212
        pipeTransferTabletReqHandler.transfer(client);
×
213
      } catch (TException e) {
×
214
        LOGGER.warn(
×
215
            String.format(
×
216
                "Transfer tablet to receiver %s:%s error, retrying...",
217
                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
218
            e);
219
      }
×
220
    } catch (Exception ex) {
×
221
      pipeTransferTabletReqHandler.onError(ex);
×
222
      LOGGER.warn(
×
223
          String.format(
×
224
              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
225
          ex);
226
    }
×
227
  }
×
228

229
  @Override
230
  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
231
    transferQueuedEventsIfNecessary();
×
232

233
    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
×
234
      LOGGER.warn(
×
235
          "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent. Current event: {}.",
236
          tsFileInsertionEvent);
237
      return;
×
238
    }
239

240
    final long requestCommitId = commitIdGenerator.incrementAndGet();
×
241

242
    final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
×
243
        (PipeTsFileInsertionEvent) tsFileInsertionEvent;
244
    final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler =
×
245
        new PipeTransferTsFileInsertionEventHandler(
246
            requestCommitId, pipeTsFileInsertionEvent, this);
247

248
    pipeTsFileInsertionEvent.waitForTsFileClose();
×
249
    transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
×
250
  }
×
251

252
  private void transfer(
253
      long requestCommitId,
254
      PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) {
255
    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % nodeUrls.size()));
×
256

257
    try {
258
      final AsyncPipeDataTransferServiceClient client = borrowClient(targetNodeUrl);
×
259

260
      try {
261
        pipeTransferTsFileInsertionEventHandler.transfer(client);
×
262
      } catch (TException e) {
×
263
        LOGGER.warn(
×
264
            String.format(
×
265
                "Transfer tsfile to receiver %s:%s error, retrying...",
266
                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
267
            e);
268
      }
×
269
    } catch (Exception ex) {
×
270
      pipeTransferTsFileInsertionEventHandler.onError(ex);
×
271
      LOGGER.warn(
×
272
          String.format(
×
273
              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
274
          ex);
275
    }
×
276
  }
×
277

278
  @Override
279
  public void transfer(Event event) throws Exception {
280
    transferQueuedEventsIfNecessary();
×
281

282
    LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic event: {}.", event);
×
283
  }
×
284

285
  private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint targetNodeUrl)
286
      throws PipeConnectionException {
287
    try {
288
      while (true) {
289
        final AsyncPipeDataTransferServiceClient client =
×
290
            asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
×
291
        if (handshakeIfNecessary(targetNodeUrl, client)) {
×
292
          return client;
×
293
        }
294
      }
×
295
    } catch (Exception e) {
×
296
      throw new PipeConnectionException(
×
297
          String.format(
×
298
              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()),
×
299
          e);
300
    }
301
  }
302

303
  /**
304
   * Handshake with the target if necessary.
305
   *
306
   * @param client client to handshake
307
   * @return true if the handshake is already finished, false if the handshake is not finished yet
308
   *     and finished in this method
309
   * @throws Exception if an error occurs.
310
   */
311
  private boolean handshakeIfNecessary(
312
      TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client) throws Exception {
313
    if (client.isHandshakeFinished()) {
×
314
      return true;
×
315
    }
316

317
    final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
×
318
    final AtomicReference<Exception> exception = new AtomicReference<>();
×
319

320
    client.pipeTransfer(
×
321
        PipeTransferHandshakeReq.toTPipeTransferReq(
×
322
            CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
×
323
        new AsyncMethodCallback<TPipeTransferResp>() {
×
324
          @Override
325
          public void onComplete(TPipeTransferResp response) {
326
            if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
327
              LOGGER.warn(
×
328
                  "Handshake error with receiver {}:{}, code: {}, message: {}.",
329
                  targetNodeUrl.getIp(),
×
330
                  targetNodeUrl.getPort(),
×
331
                  response.getStatus().getCode(),
×
332
                  response.getStatus().getMessage());
×
333
              exception.set(
×
334
                  new PipeConnectionException(
335
                      String.format(
×
336
                          "Handshake error with receiver %s:%s, code: %d, message: %s.",
337
                          targetNodeUrl.getIp(),
×
338
                          targetNodeUrl.getPort(),
×
339
                          response.getStatus().getCode(),
×
340
                          response.getStatus().getMessage())));
×
341
            } else {
342
              LOGGER.info(
×
343
                  "Handshake successfully with receiver {}:{}.",
344
                  targetNodeUrl.getIp(),
×
345
                  targetNodeUrl.getPort());
×
346
              client.markHandshakeFinished();
×
347
            }
348

349
            isHandshakeFinished.set(true);
×
350
          }
×
351

352
          @Override
353
          public void onError(Exception e) {
354
            LOGGER.warn(
×
355
                "Handshake error with receiver {}:{}.",
356
                targetNodeUrl.getIp(),
×
357
                targetNodeUrl.getPort(),
×
358
                e);
359
            exception.set(e);
×
360

361
            isHandshakeFinished.set(true);
×
362
          }
×
363
        });
364

365
    try {
366
      while (!isHandshakeFinished.get()) {
×
367
        Thread.sleep(10);
×
368
      }
369
    } catch (InterruptedException e) {
×
370
      Thread.currentThread().interrupt();
×
371
      throw new PipeException("Interrupted while waiting for handshake response.", e);
×
372
    }
×
373

374
    if (exception.get() != null) {
×
375
      throw new PipeConnectionException("Failed to handshake.", exception.get());
×
376
    }
377

378
    return false;
×
379
  }
380

381
  /**
382
   * Transfer queued events which are waiting for retry.
383
   *
384
   * @throws Exception if an error occurs. The error will be handled by pipe framework, which will
385
   *     retry the event and mark the event as failure and stop the pipe if the retry times exceeds
386
   *     the threshold.
387
   * @see PipeConnector#transfer(Event) for more details.
388
   * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
389
   * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
390
   */
391
  private synchronized void transferQueuedEventsIfNecessary() throws Exception {
392
    while (!retryEventQueue.isEmpty()) {
×
393
      final Pair<Long, Event> queuedEventPair = retryEventQueue.peek();
×
394
      final long requestCommitId = queuedEventPair.getLeft();
×
395
      final Event event = queuedEventPair.getRight();
×
396

397
      if (event instanceof PipeInsertNodeTabletInsertionEvent) {
×
398
        retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) event);
×
399
      } else if (event instanceof PipeRawTabletInsertionEvent) {
×
400
        retryConnector.transfer((PipeRawTabletInsertionEvent) event);
×
401
      } else if (event instanceof PipeTsFileInsertionEvent) {
×
402
        retryConnector.transfer((PipeTsFileInsertionEvent) event);
×
403
      } else {
404
        LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic event: {}.", event);
×
405
      }
406

407
      if (event instanceof EnrichedEvent) {
×
408
        commit(requestCommitId, (EnrichedEvent) event);
×
409
      }
410

411
      retryEventQueue.poll();
×
412
    }
×
413
  }
×
414

415
  /**
416
   * Commit the event. Decrease the reference count of the event. If the reference count is 0, the
417
   * progress index of the event will be recalculated and the resources of the event will be
418
   * released.
419
   *
420
   * <p>The synchronization is necessary because the commit order must be the same as the order of
421
   * the events. Concurrent commit may cause the commit order to be inconsistent with the order of
422
   * the events.
423
   *
424
   * @param requestCommitId commit id of the request
425
   * @param enrichedEvent event to commit
426
   */
427
  public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) {
428
    commitQueue.offer(
×
429
        new Pair<>(
430
            requestCommitId,
×
431
            () ->
432
                Optional.ofNullable(enrichedEvent)
×
433
                    .ifPresent(
×
434
                        event ->
435
                            event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
×
436

437
    while (!commitQueue.isEmpty()) {
×
438
      final Pair<Long, Runnable> committer = commitQueue.peek();
×
439
      if (lastCommitId.get() + 1 != committer.left) {
×
440
        break;
×
441
      }
442

443
      committer.right.run();
×
444
      lastCommitId.incrementAndGet();
×
445

446
      commitQueue.poll();
×
447
    }
×
448
  }
×
449

450
  /**
451
   * Add failure event to retry queue.
452
   *
453
   * @param requestCommitId commit id of the request
454
   * @param event event to retry
455
   */
456
  public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
457
    if (RETRY_TRIGGER.get() == null) {
×
458
      synchronized (IoTDBThriftConnectorV2.class) {
×
459
        if (RETRY_TRIGGER.get() == null) {
×
460
          RETRY_TRIGGER.set(
×
461
              IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
×
462
                  ThreadName.PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER.getName()));
×
463
        }
464
      }
×
465
    }
466

467
    if (retryTriggerFuture.get() == null) {
×
468
      synchronized (IoTDBThriftConnectorV2.class) {
×
469
        if (retryTriggerFuture.get() == null) {
×
470
          retryTriggerFuture.set(
×
471
              ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
×
472
                  RETRY_TRIGGER.get(),
×
473
                  () -> {
474
                    try {
475
                      transferQueuedEventsIfNecessary();
×
476
                    } catch (Exception e) {
×
477
                      LOGGER.warn("Failed to trigger retry.", e);
×
478
                    }
×
479
                  },
×
480
                  RETRY_TRIGGER_INTERVAL_MINUTES,
481
                  RETRY_TRIGGER_INTERVAL_MINUTES,
482
                  TimeUnit.MINUTES));
483
        }
484
      }
×
485
    }
486

487
    retryEventQueue.offer(new Pair<>(requestCommitId, event));
×
488
  }
×
489

490
  @Override
491
  // synchronized to avoid close connector when transfer event
492
  public synchronized void close() throws Exception {
493
    if (retryTriggerFuture.get() != null) {
×
494
      retryTriggerFuture.get().cancel(false);
×
495
    }
496

497
    retryConnector.close();
×
498
  }
×
499
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc