• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq-flink / 147

pending completion
147

push

travis-ci-com

web-flow
[ISSUE #76] Fix bug when the job restore from ck (#77)

Fix bug when the job restore from ck

Co-authored-by: 高思伟 <siwei.gao@amh-group.com>

3 of 3 new or added lines in 1 file covered. (100.0%)

813 of 2518 relevant lines covered (32.29%)

1.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.2
/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.rocketmq.flink.legacy;
19

20
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
21
import org.apache.rocketmq.client.consumer.MessageSelector;
22
import org.apache.rocketmq.client.exception.MQClientException;
23
import org.apache.rocketmq.common.message.MessageExt;
24
import org.apache.rocketmq.common.message.MessageQueue;
25
import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
26
import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
27
import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
28
import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
29
import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
30
import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils;
31
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkForAll;
32
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkPerQueue;
33

34
import org.apache.flink.api.common.functions.RuntimeContext;
35
import org.apache.flink.api.common.state.ListState;
36
import org.apache.flink.api.common.state.ListStateDescriptor;
37
import org.apache.flink.api.common.typeinfo.TypeHint;
38
import org.apache.flink.api.common.typeinfo.TypeInformation;
39
import org.apache.flink.api.java.tuple.Tuple2;
40
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
41
import org.apache.flink.configuration.Configuration;
42
import org.apache.flink.metrics.Counter;
43
import org.apache.flink.metrics.Meter;
44
import org.apache.flink.metrics.MeterView;
45
import org.apache.flink.metrics.SimpleCounter;
46
import org.apache.flink.runtime.state.CheckpointListener;
47
import org.apache.flink.runtime.state.FunctionInitializationContext;
48
import org.apache.flink.runtime.state.FunctionSnapshotContext;
49
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
50
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
51
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
52
import org.apache.flink.util.Preconditions;
53

54
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
55
import org.apache.flink.shaded.curator5.com.google.common.util.concurrent.ThreadFactoryBuilder;
56

57
import org.apache.commons.collections.CollectionUtils;
58
import org.apache.commons.collections.map.LinkedMap;
59
import org.apache.commons.lang.Validate;
60
import org.apache.commons.lang3.StringUtils;
61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
import java.lang.management.ManagementFactory;
65
import java.nio.charset.StandardCharsets;
66
import java.util.Collection;
67
import java.util.Collections;
68
import java.util.HashMap;
69
import java.util.List;
70
import java.util.Map;
71
import java.util.Properties;
72
import java.util.concurrent.ConcurrentHashMap;
73
import java.util.concurrent.ExecutorService;
74
import java.util.concurrent.Executors;
75
import java.util.concurrent.ScheduledExecutorService;
76
import java.util.concurrent.ThreadFactory;
77
import java.util.concurrent.TimeUnit;
78
import java.util.concurrent.locks.ReentrantLock;
79

80
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
81
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_TIMEOUT;
82
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
83
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TIMEOUT;
84
import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
85

86
/**
87
 * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability
88
 * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
89
 * guarantees.
90
 */
91
public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
92
        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
93

94
    private static final long serialVersionUID = 1L;
95

96
    private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class);
4✔
97
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
98
    private RunningChecker runningChecker;
99

100
    private transient DefaultLitePullConsumer consumer;
101

102
    private KeyValueDeserializationSchema<OUT> schema;
103
    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
104
    private Map<MessageQueue, Long> offsetTable;
105
    private Map<MessageQueue, Long> restoredOffsets;
106
    private List<MessageQueue> messageQueues;
107
    private ExecutorService executor;
108

109
    // watermark in source
110
    private WaterMarkPerQueue waterMarkPerQueue;
111
    private WaterMarkForAll waterMarkForAll;
112

113
    private ScheduledExecutorService timer;
114
    /** Data for pending but uncommitted offsets. */
115
    private LinkedMap pendingOffsetsToCommit;
116

117
    private Properties props;
118
    private String topic;
119
    private String group;
120
    private transient volatile boolean restored;
121
    private transient boolean enableCheckpoint;
122
    private volatile Object checkPointLock;
123

124
    private Meter tpsMetric;
125
    private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge();
4✔
126
    private MetricUtils.TimestampGauge emitDelay = new MetricUtils.TimestampGauge();
4✔
127

128
    /** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
129
    private StartupMode startMode = StartupMode.GROUP_OFFSETS;
4✔
130

131
    /**
132
     * If StartupMode#GROUP_OFFSETS has no commit offset.OffsetResetStrategy would offer init
133
     * strategy.
134
     */
135
    private OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.LATEST;
4✔
136

137
    /**
138
     * Specific startup offsets; only relevant when startup mode is {@link
139
     * StartupMode#SPECIFIC_OFFSETS}.
140
     */
141
    private Map<MessageQueue, Long> specificStartupOffsets;
142

143
    /**
144
     * Specific startup offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.
145
     */
146
    private long specificTimeStamp;
147

148
    public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) {
4✔
149
        this.schema = schema;
4✔
150
        this.props = props;
4✔
151
    }
4✔
152

153
    @Override
154
    public void open(Configuration parameters) throws Exception {
155
        log.debug("source open....");
×
156
        Validate.notEmpty(props, "Consumer properties can not be empty");
×
157

158
        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
×
159
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
×
160

161
        Validate.notEmpty(topic, "Consumer topic can not be empty");
×
162
        Validate.notEmpty(group, "Consumer group can not be empty");
×
163

164
        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
×
165
        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
×
166
        Validate.isTrue(
×
167
                !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
×
168
                "Consumer tag and sql can not set value at the same time");
169

170
        this.enableCheckpoint =
×
171
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
×
172

173
        if (offsetTable == null) {
×
174
            offsetTable = new ConcurrentHashMap<>();
×
175
        }
176
        if (restoredOffsets == null) {
×
177
            restoredOffsets = new ConcurrentHashMap<>();
×
178
        }
179
        if (pendingOffsetsToCommit == null) {
×
180
            pendingOffsetsToCommit = new LinkedMap();
×
181
        }
182
        if (checkPointLock == null) {
×
183
            checkPointLock = new ReentrantLock();
×
184
        }
185
        if (waterMarkPerQueue == null) {
×
186
            waterMarkPerQueue = new WaterMarkPerQueue(5000);
×
187
        }
188
        if (waterMarkForAll == null) {
×
189
            waterMarkForAll = new WaterMarkForAll(5000);
×
190
        }
191
        if (timer == null) {
×
192
            timer = Executors.newSingleThreadScheduledExecutor();
×
193
        }
194

195
        runningChecker = new RunningChecker();
×
196
        runningChecker.setRunning(true);
×
197

198
        final ThreadFactory threadFactory =
×
199
                new ThreadFactoryBuilder()
200
                        .setDaemon(true)
×
201
                        .setNameFormat("rmq-pull-thread-%d")
×
202
                        .build();
×
203
        executor = Executors.newCachedThreadPool(threadFactory);
×
204

205
        int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
×
206
        consumer = new DefaultLitePullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
×
207
        RocketMQConfig.buildConsumerConfigs(props, consumer);
×
208

209
        // set unique instance name, avoid exception:
210
        // https://help.aliyun.com/document_detail/29646.html
211
        String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
×
212
        String instanceName =
×
213
                RocketMQUtils.getInstanceName(
×
214
                        runtimeName,
215
                        topic,
216
                        group,
217
                        String.valueOf(indexOfThisSubTask),
×
218
                        String.valueOf(System.nanoTime()));
×
219
        consumer.setInstanceName(instanceName);
×
220
        consumer.start();
×
221

222
        Counter outputCounter =
×
223
                getRuntimeContext()
×
224
                        .getMetricGroup()
×
225
                        .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
×
226
        tpsMetric =
×
227
                getRuntimeContext()
×
228
                        .getMetricGroup()
×
229
                        .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
×
230

231
        getRuntimeContext()
×
232
                .getMetricGroup()
×
233
                .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
×
234
        getRuntimeContext()
×
235
                .getMetricGroup()
×
236
                .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
×
237

238
        final RuntimeContext ctx = getRuntimeContext();
×
239
        // The lock that guarantees that record emission and state updates are atomic,
240
        // from the view of taking a checkpoint.
241
        int taskNumber = ctx.getNumberOfParallelSubtasks();
×
242
        int taskIndex = ctx.getIndexOfThisSubtask();
×
243
        log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
×
244
        Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
×
245
        messageQueues =
×
246
                RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
×
247
        // If the job recovers from the state, the state has already contained the offsets of last
248
        // commit.
249
        if (restored) {
×
250
            initOffsetTableFromRestoredOffsets(messageQueues);
×
251
        } else {
252
            initOffsets(messageQueues);
×
253
        }
254
    }
×
255

256
    @Override
257
    public void run(SourceContext context) throws Exception {
258
        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
×
259
        String tag =
×
260
                props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
×
261
        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
×
262
        timer.scheduleAtFixedRate(
×
263
                () -> {
264
                    // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
265
                    context.emitWatermark(waterMarkForAll.getCurrentWatermark());
×
266
                },
×
267
                5,
268
                5,
269
                TimeUnit.SECONDS);
270
        if (StringUtils.isEmpty(sql)) {
×
271
            consumer.subscribe(topic, tag);
×
272
        } else {
273
            // pull with sql do not support block pull.
274
            consumer.subscribe(topic, MessageSelector.bySql(sql));
×
275
        }
276
        for (MessageQueue mq : messageQueues) {
×
277
            this.executor.execute(
×
278
                    () ->
279
                            RetryUtil.call(
×
280
                                    () -> {
281
                                        while (runningChecker.isRunning()) {
×
282
                                            try {
283
                                                Long offset = offsetTable.get(mq);
×
284
                                                consumer.setPullBatchSize(pullBatchSize);
×
285
                                                consumer.seek(mq, offset);
×
286
                                                boolean found = false;
×
287
                                                List<MessageExt> messages =
×
288
                                                        consumer.poll(
×
289
                                                                getInteger(
×
290
                                                                        props,
291
                                                                        CONSUMER_TIMEOUT,
292
                                                                        DEFAULT_CONSUMER_TIMEOUT));
293
                                                if (CollectionUtils.isNotEmpty(messages)) {
×
294
                                                    long fetchTime = System.currentTimeMillis();
×
295
                                                    for (MessageExt msg : messages) {
×
296
                                                        byte[] key =
297
                                                                msg.getKeys() != null
×
298
                                                                        ? msg.getKeys()
×
299
                                                                                .getBytes(
×
300
                                                                                        StandardCharsets
301
                                                                                                .UTF_8)
302
                                                                        : null;
303
                                                        byte[] value = msg.getBody();
×
304
                                                        OUT data =
×
305
                                                                schema.deserializeKeyAndValue(
×
306
                                                                        key, value);
307

308
                                                        // output and state update are atomic
309
                                                        synchronized (checkPointLock) {
×
310
                                                            log.debug(
×
311
                                                                    msg.getMsgId()
×
312
                                                                            + "_"
313
                                                                            + msg.getBrokerName()
×
314
                                                                            + " "
315
                                                                            + msg.getQueueId()
×
316
                                                                            + " "
317
                                                                            + msg.getQueueOffset());
×
318
                                                            context.collectWithTimestamp(
×
319
                                                                    data, msg.getBornTimestamp());
×
320
                                                            long emitTime =
321
                                                                    System.currentTimeMillis();
×
322
                                                            // update max eventTime per queue
323
                                                            // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
324
                                                            waterMarkForAll.extractTimestamp(
×
325
                                                                    msg.getBornTimestamp());
×
326
                                                            tpsMetric.markEvent();
×
327
                                                            long eventTime =
×
328
                                                                    msg.getStoreTimestamp();
×
329
                                                            fetchDelay.report(
×
330
                                                                    Math.abs(
×
331
                                                                            fetchTime - eventTime));
332
                                                            emitDelay.report(
×
333
                                                                    Math.abs(emitTime - eventTime));
×
334
                                                        }
×
335
                                                    }
×
336
                                                    found = true;
×
337
                                                }
338
                                                synchronized (checkPointLock) {
×
339
                                                    updateMessageQueueOffset(
×
340
                                                            mq, consumer.committed(mq));
×
341
                                                }
×
342

343
                                                if (!found) {
×
344
                                                    RetryUtil.waitForMs(
×
345
                                                            RocketMQConfig
346
                                                                    .DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
347
                                                }
348
                                            } catch (Exception e) {
×
349
                                                throw new RuntimeException(e);
×
350
                                            }
×
351
                                        }
352
                                        return true;
×
353
                                    },
354
                                    "RuntimeException",
355
                                    runningChecker));
356
        }
×
357

358
        awaitTermination();
×
359
    }
×
360

361
    private void awaitTermination() throws InterruptedException {
362
        while (runningChecker.isRunning()) {
×
363
            Thread.sleep(50);
×
364
        }
365
    }
×
366

367
    /**
368
     * only flink job start with no state can init offsets from broker
369
     *
370
     * @param messageQueues
371
     * @throws MQClientException
372
     */
373
    private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
374
        for (MessageQueue mq : messageQueues) {
×
375
            long offset;
376
            switch (startMode) {
×
377
                case LATEST:
378
                    consumer.seekToEnd(mq);
×
379
                    offset = consumer.committed(mq);
×
380
                    break;
×
381
                case EARLIEST:
382
                    consumer.seekToBegin(mq);
×
383
                    offset = consumer.committed(mq);
×
384
                    break;
×
385
                case GROUP_OFFSETS:
386
                    offset = consumer.committed(mq);
×
387
                    // the min offset return if consumer group first join,return a negative number
388
                    // if
389
                    // catch exception when fetch from broker.
390
                    // If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
391
                    if (offset <= 0) {
×
392
                        switch (offsetResetStrategy) {
×
393
                            case LATEST:
394
                                consumer.seekToEnd(mq);
×
395
                                offset = consumer.committed(mq);
×
396
                                log.info(
×
397
                                        "current consumer thread:{} has no committed offset,use Strategy:{} instead",
398
                                        mq,
399
                                        offsetResetStrategy);
400
                                break;
×
401
                            case EARLIEST:
402
                                log.info(
×
403
                                        "current consumer thread:{} has no committed offset,use Strategy:{} instead",
404
                                        mq,
405
                                        offsetResetStrategy);
406
                                consumer.seekToBegin(mq);
×
407
                                offset = consumer.committed(mq);
×
408
                                break;
×
409
                            default:
410
                                break;
×
411
                        }
412
                    }
413
                    break;
414
                case TIMESTAMP:
415
                    offset = consumer.offsetForTimestamp(mq, specificTimeStamp);
×
416
                    break;
×
417
                case SPECIFIC_OFFSETS:
418
                    if (specificStartupOffsets == null) {
×
419
                        throw new RuntimeException(
×
420
                                "StartMode is specific_offsets.But none offsets has been specified");
421
                    }
422
                    Long specificOffset = specificStartupOffsets.get(mq);
×
423
                    if (specificOffset != null) {
×
424
                        offset = specificOffset;
×
425
                    } else {
426
                        offset = consumer.committed(mq);
×
427
                    }
428
                    break;
×
429
                default:
430
                    throw new IllegalArgumentException(
×
431
                            "current startMode is not supported" + startMode);
432
            }
433
            log.info(
×
434
                    "current consumer queue:{} start from offset of: {}",
435
                    mq.getBrokerName() + "-" + mq.getQueueId(),
×
436
                    offset);
×
437
            offsetTable.put(mq, offset);
×
438
        }
×
439
    }
×
440

441
    /** consume from the min offset at every restart with no state */
442
    public RocketMQSourceFunction<OUT> setStartFromEarliest() {
443
        this.startMode = StartupMode.EARLIEST;
4✔
444
        return this;
4✔
445
    }
446

447
    /** consume from the max offset of each broker's queue at every restart with no state */
448
    public RocketMQSourceFunction<OUT> setStartFromLatest() {
449
        this.startMode = StartupMode.LATEST;
4✔
450
        return this;
4✔
451
    }
452

453
    /** consume from the closest offset */
454
    public RocketMQSourceFunction<OUT> setStartFromTimeStamp(long timeStamp) {
455
        this.startMode = StartupMode.TIMESTAMP;
4✔
456
        this.specificTimeStamp = timeStamp;
4✔
457
        return this;
4✔
458
    }
459

460
    /** consume from the group offsets those was stored in brokers. */
461
    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets() {
462
        this.startMode = StartupMode.GROUP_OFFSETS;
4✔
463
        return this;
4✔
464
    }
465

466
    /**
467
     * consume from the group offsets those was stored in brokers. If there is no committed
468
     * offset,#{@link OffsetResetStrategy} would provide initialization policy.
469
     */
470
    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets(
471
            OffsetResetStrategy offsetResetStrategy) {
472
        this.startMode = StartupMode.GROUP_OFFSETS;
4✔
473
        this.offsetResetStrategy = offsetResetStrategy;
4✔
474
        return this;
4✔
475
    }
476

477
    /**
478
     * consume from the specific offset. Group offsets is enable while the broker didn't specify
479
     * offset.
480
     */
481
    public RocketMQSourceFunction<OUT> setStartFromSpecificOffsets(
482
            Map<MessageQueue, Long> specificOffsets) {
483
        this.specificStartupOffsets = specificOffsets;
4✔
484
        this.startMode = StartupMode.SPECIFIC_OFFSETS;
4✔
485
        return this;
4✔
486
    }
487

488
    private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
489
        offsetTable.put(mq, offset);
×
490
        if (!enableCheckpoint) {
×
491
            consumer.getOffsetStore().updateOffset(mq, offset, false);
×
492
        }
493
    }
×
494

495
    @Override
496
    public void cancel() {
497
        log.debug("cancel ...");
×
498
        runningChecker.setRunning(false);
×
499

500
        if (timer != null) {
×
501
            timer.shutdown();
×
502
            timer = null;
×
503
        }
504

505
        if (executor != null) {
×
506
            executor.shutdown();
×
507
            executor = null;
×
508
        }
509

510
        if (consumer != null) {
×
511
            consumer.shutdown();
×
512
            consumer = null;
×
513
        }
514

515
        if (offsetTable != null) {
×
516
            offsetTable.clear();
×
517
            offsetTable = null;
×
518
        }
519
        if (restoredOffsets != null) {
×
520
            restoredOffsets.clear();
×
521
            restoredOffsets = null;
×
522
        }
523
        if (pendingOffsetsToCommit != null) {
×
524
            pendingOffsetsToCommit.clear();
×
525
            pendingOffsetsToCommit = null;
×
526
        }
527
    }
×
528

529
    @Override
530
    public void close() throws Exception {
531
        log.debug("close ...");
×
532
        // pretty much the same logic as cancelling
533
        try {
534
            cancel();
×
535
        } finally {
536
            super.close();
×
537
        }
538
    }
×
539

540
    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
541
        Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
4✔
542
        restoredOffsets.forEach(
4✔
543
                (mq, offset) -> {
544
                    if (messageQueues.contains(mq)) {
4✔
545
                        offsetTable.put(mq, offset);
4✔
546
                    }
547
                });
4✔
548
        log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
4✔
549
    }
4✔
550

551
    @Override
552
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
553
        // called when a snapshot for a checkpoint is requested
554
        log.info("Snapshotting state {} ...", context.getCheckpointId());
×
555
        if (!runningChecker.isRunning()) {
×
556
            log.info("snapshotState() called on closed source; returning null.");
×
557
            return;
×
558
        }
559

560
        Map<MessageQueue, Long> currentOffsets;
561
        try {
562
            // Discovers topic route change when snapshot
563
            RetryUtil.call(
×
564
                    () -> {
565
                        Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
×
566
                        int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
×
567
                        int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
×
568
                        List<MessageQueue> newQueues =
×
569
                                RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
×
570
                        Collections.sort(newQueues);
×
571
                        log.debug(taskIndex + " Topic route is same.");
×
572
                        if (!messageQueues.equals(newQueues)) {
×
573
                            throw new RuntimeException();
×
574
                        }
575
                        return true;
×
576
                    },
577
                    "RuntimeException due to topic route changed");
578

579
            unionOffsetStates.clear();
×
580
            currentOffsets = new HashMap<>(offsetTable.size());
×
581
        } catch (RuntimeException e) {
×
582
            log.warn("Retry failed multiple times for topic route change, keep previous offset.");
×
583
            // If the retry fails for multiple times, the message queue and its offset in the
584
            // previous checkpoint will be retained.
585
            List<Tuple2<MessageQueue, Long>> unionOffsets =
×
586
                    Lists.newArrayList(unionOffsetStates.get().iterator());
×
587
            Map<MessageQueue, Long> queueOffsets = new HashMap<>(unionOffsets.size());
×
588
            unionOffsets.forEach(queueOffset -> queueOffsets.put(queueOffset.f0, queueOffset.f1));
×
589
            currentOffsets = new HashMap<>(unionOffsets.size() + offsetTable.size());
×
590
            currentOffsets.putAll(queueOffsets);
×
591
        }
×
592

593
        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
×
594
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
×
595
            currentOffsets.put(entry.getKey(), entry.getValue());
×
596
        }
×
597
        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
×
598
        log.info(
×
599
                "Snapshot state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
600
                offsetTable,
601
                context.getCheckpointId(),
×
602
                context.getCheckpointTimestamp());
×
603
    }
×
604

605
    /**
606
     * called every time the user-defined function is initialized, be that when the function is
607
     * first initialized or be that when the function is actually recovering from an earlier
608
     * checkpoint. Given this, initializeState() is not only the place where different types of
609
     * state are initialized, but also where state recovery logic is included.
610
     */
611
    @Override
612
    public void initializeState(FunctionInitializationContext context) throws Exception {
613
        log.info("initialize State ...");
×
614

615
        this.unionOffsetStates =
×
616
                context.getOperatorStateStore()
×
617
                        .getUnionListState(
×
618
                                new ListStateDescriptor<>(
619
                                        OFFSETS_STATE_NAME,
620
                                        TypeInformation.of(
×
621
                                                new TypeHint<Tuple2<MessageQueue, Long>>() {})));
×
622
        this.restored = context.isRestored();
×
623

624
        if (restored) {
×
625
            if (restoredOffsets == null) {
×
626
                restoredOffsets = new ConcurrentHashMap<>();
×
627
            }
628
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
×
629
                if (!restoredOffsets.containsKey(mqOffsets.f0)
×
630
                        || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
×
631
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
×
632
                }
633
            }
×
634
            log.info(
×
635
                    "Setting restore state in the consumer. Using the following offsets: {}",
636
                    restoredOffsets);
637
        } else {
638
            log.info("No restore state for the consumer.");
×
639
        }
640
    }
×
641

642
    @Override
643
    public TypeInformation<OUT> getProducedType() {
644
        return schema.getProducedType();
×
645
    }
646

647
    @Override
648
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
649
        // callback when checkpoint complete
650
        if (!runningChecker.isRunning()) {
×
651
            log.info("notifyCheckpointComplete() called on closed source; returning null.");
×
652
            return;
×
653
        }
654

655
        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
×
656
        if (posInMap == -1) {
×
657
            log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
×
658
            return;
×
659
        }
660

661
        Map<MessageQueue, Long> offsets =
×
662
                (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
×
663

664
        // remove older checkpoints in map
665
        for (int i = 0; i < posInMap; i++) {
×
666
            pendingOffsetsToCommit.remove(0);
×
667
        }
668

669
        if (offsets == null || offsets.size() == 0) {
×
670
            log.debug("Checkpoint state was empty.");
×
671
            return;
×
672
        }
673

674
        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
×
675
            consumer.getOffsetStore().updateOffset(entry.getKey(), entry.getValue(), false);
×
676
            consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey()));
×
677
        }
×
678
    }
×
679
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc