• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq / 7926

pending completion
7926

push

travis-ci-com

GitHub
[ISSUE #5965] Fix lmqTopicQueueTable initialization (#5967)

10 of 10 new or added lines in 2 files covered. (100.0%)

22455 of 43079 relevant lines covered (52.13%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.31
/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.rocketmq.client.impl.consumer;
18

19
import java.util.ArrayList;
20
import java.util.Collection;
21
import java.util.Collections;
22
import java.util.HashMap;
23
import java.util.HashSet;
24
import java.util.Iterator;
25
import java.util.List;
26
import java.util.Map;
27
import java.util.Properties;
28
import java.util.Set;
29
import java.util.concurrent.BlockingQueue;
30
import java.util.concurrent.ConcurrentHashMap;
31
import java.util.concurrent.ConcurrentMap;
32
import java.util.concurrent.Executors;
33
import java.util.concurrent.LinkedBlockingQueue;
34
import java.util.concurrent.ScheduledExecutorService;
35
import java.util.concurrent.ScheduledThreadPoolExecutor;
36
import java.util.concurrent.ThreadFactory;
37
import java.util.concurrent.TimeUnit;
38
import org.apache.rocketmq.client.Validators;
39
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
40
import org.apache.rocketmq.client.consumer.MessageQueueListener;
41
import org.apache.rocketmq.client.consumer.MessageSelector;
42
import org.apache.rocketmq.client.consumer.PullResult;
43
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
44
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
45
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
46
import org.apache.rocketmq.client.consumer.store.OffsetStore;
47
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
48
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
49
import org.apache.rocketmq.client.exception.MQBrokerException;
50
import org.apache.rocketmq.client.exception.MQClientException;
51
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
52
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
53
import org.apache.rocketmq.client.hook.FilterMessageHook;
54
import org.apache.rocketmq.client.impl.CommunicationMode;
55
import org.apache.rocketmq.client.impl.MQClientManager;
56
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
57
import org.apache.rocketmq.client.log.ClientLogger;
58
import org.apache.rocketmq.common.MixAll;
59
import org.apache.rocketmq.common.ServiceState;
60
import org.apache.rocketmq.common.ThreadFactoryImpl;
61
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
62
import org.apache.rocketmq.common.filter.ExpressionType;
63
import org.apache.rocketmq.common.filter.FilterAPI;
64
import org.apache.rocketmq.common.help.FAQUrl;
65
import org.apache.rocketmq.common.message.MessageExt;
66
import org.apache.rocketmq.common.message.MessageQueue;
67
import org.apache.rocketmq.common.protocol.NamespaceUtil;
68
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
69
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
70
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
71
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
72
import org.apache.rocketmq.common.sysflag.PullSysFlag;
73
import org.apache.rocketmq.logging.InternalLogger;
74
import org.apache.rocketmq.remoting.RPCHook;
75
import org.apache.rocketmq.remoting.exception.RemotingException;
76

77
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
78

79
    private final InternalLogger log = ClientLogger.getLog();
2✔
80

81
    private final long consumerStartTimestamp = System.currentTimeMillis();
2✔
82

83
    private final RPCHook rpcHook;
84

85
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
2✔
86

87
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
2✔
88

89
    protected MQClientInstance mQClientFactory;
90

91
    private PullAPIWrapper pullAPIWrapper;
92

93
    private OffsetStore offsetStore;
94

95
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
2✔
96

97
    private enum SubscriptionType {
2✔
98
        NONE, SUBSCRIBE, ASSIGN
2✔
99
    }
100

101
    private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
102

103
    private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
104
    /**
105
     * the type of subscription
106
     */
107
    private SubscriptionType subscriptionType = SubscriptionType.NONE;
2✔
108
    /**
109
     * Delay some time when exception occur
110
     */
111
    private long pullTimeDelayMillsWhenException = 1000;
2✔
112
    /**
113
     * Flow control interval
114
     */
115
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
116
    /**
117
     * Delay some time when suspend pull service
118
     */
119
    private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
120

121
    private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
122

123
    private DefaultLitePullConsumer defaultLitePullConsumer;
124

125
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
2✔
126
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
127

128
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
2✔
129

130
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
2✔
131

132
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
133

134
    private final ScheduledExecutorService scheduledExecutorService;
135

136
    private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
2✔
137

138
    private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
2✔
139

140
    private long consumeRequestFlowControlTimes = 0L;
2✔
141

142
    private long queueFlowControlTimes = 0L;
2✔
143

144
    private long queueMaxSpanFlowControlTimes = 0L;
2✔
145

146
    private long nextAutoCommitDeadline = -1L;
2✔
147

148
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
2✔
149

150
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
2✔
151

152
    public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
2✔
153
        this.defaultLitePullConsumer = defaultLitePullConsumer;
2✔
154
        this.rpcHook = rpcHook;
2✔
155
        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
2✔
156
            this.defaultLitePullConsumer.getPullThreadNums(),
2✔
157
            new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
2✔
158
        );
159
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
2✔
160
            @Override
161
            public Thread newThread(Runnable r) {
162
                return new Thread(r, "MonitorMessageQueueChangeThread");
2✔
163
            }
164
        });
165
        this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
2✔
166
    }
2✔
167

168
    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
169
        this.consumeMessageHookList.add(hook);
2✔
170
        log.info("register consumeMessageHook Hook, {}", hook.hookName());
2✔
171
    }
2✔
172

173
    public void executeHookBefore(final ConsumeMessageContext context) {
174
        if (!this.consumeMessageHookList.isEmpty()) {
2✔
175
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
2✔
176
                try {
177
                    hook.consumeMessageBefore(context);
2✔
178
                } catch (Throwable e) {
×
179
                    log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e);
×
180
                }
2✔
181
            }
2✔
182
        }
183
    }
2✔
184

185
    public void executeHookAfter(final ConsumeMessageContext context) {
186
        if (!this.consumeMessageHookList.isEmpty()) {
2✔
187
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
2✔
188
                try {
189
                    hook.consumeMessageAfter(context);
2✔
190
                } catch (Throwable e) {
×
191
                    log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
×
192
                }
2✔
193
            }
2✔
194
        }
195
    }
2✔
196

197
    private void checkServiceState() {
198
        if (this.serviceState != ServiceState.RUNNING) {
2✔
199
            throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
2✔
200
        }
201
    }
2✔
202

203
    public void updateNameServerAddr(String newAddresses) {
204
        this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
×
205
    }
×
206

207
    private synchronized void setSubscriptionType(SubscriptionType type) {
208
        if (this.subscriptionType == SubscriptionType.NONE) {
2✔
209
            this.subscriptionType = type;
2✔
210
        } else if (this.subscriptionType != type) {
2✔
211
            throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
2✔
212
        }
213
    }
2✔
214

215
    private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
216
        this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
2✔
217
    }
2✔
218

219
    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
220
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
2✔
221
        while (it.hasNext()) {
2✔
222
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
×
223
            if (next.getKey().getTopic().equals(topic)) {
×
224
                if (!mqNewSet.contains(next.getKey())) {
×
225
                    next.getValue().setCancelled(true);
×
226
                    it.remove();
×
227
                }
228
            }
229
        }
×
230
        startPullTask(mqNewSet);
2✔
231
    }
2✔
232

233
    class MessageQueueListenerImpl implements MessageQueueListener {
2✔
234
        @Override
235
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
236
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
2✔
237
            switch (messageModel) {
2✔
238
                case BROADCASTING:
239
                    updateAssignedMessageQueue(topic, mqAll);
2✔
240
                    updatePullTask(topic, mqAll);
2✔
241
                    break;
2✔
242
                case CLUSTERING:
243
                    updateAssignedMessageQueue(topic, mqDivided);
2✔
244
                    updatePullTask(topic, mqDivided);
2✔
245
                    break;
2✔
246
                default:
247
                    break;
248
            }
249
        }
2✔
250
    }
251

252
    public synchronized void shutdown() {
253
        switch (this.serviceState) {
2✔
254
            case CREATE_JUST:
255
                break;
2✔
256
            case RUNNING:
257
                persistConsumerOffset();
2✔
258
                this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
2✔
259
                scheduledThreadPoolExecutor.shutdown();
2✔
260
                scheduledExecutorService.shutdown();
2✔
261
                this.mQClientFactory.shutdown();
2✔
262
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
2✔
263
                log.info("the consumer [{}] shutdown OK", this.defaultLitePullConsumer.getConsumerGroup());
2✔
264
                break;
2✔
265
            default:
266
                break;
267
        }
268
    }
2✔
269

270
    public synchronized boolean isRunning() {
271
        return this.serviceState == ServiceState.RUNNING;
2✔
272
    }
273

274
    public synchronized void start() throws MQClientException {
275
        switch (this.serviceState) {
2✔
276
            case CREATE_JUST:
277
                this.serviceState = ServiceState.START_FAILED;
2✔
278

279
                this.checkConfig();
2✔
280

281
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
2✔
282
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
2✔
283
                }
284

285
                initMQClientFactory();
2✔
286

287
                initRebalanceImpl();
2✔
288

289
                initPullAPIWrapper();
2✔
290

291
                initOffsetStore();
2✔
292

293
                mQClientFactory.start();
2✔
294

295
                startScheduleTask();
2✔
296

297
                this.serviceState = ServiceState.RUNNING;
2✔
298

299
                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
2✔
300

301
                operateAfterRunning();
2✔
302

303
                break;
2✔
304
            case RUNNING:
305
            case START_FAILED:
306
            case SHUTDOWN_ALREADY:
307
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
×
308
                    + this.serviceState
309
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
×
310
                    null);
311
            default:
312
                break;
313
        }
314
    }
2✔
315

316
    private void initMQClientFactory() throws MQClientException {
317
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
2✔
318
        boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
2✔
319
        if (!registerOK) {
2✔
320
            this.serviceState = ServiceState.CREATE_JUST;
×
321

322
            throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
×
323
                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
×
324
                null);
325
        }
326
    }
2✔
327

328
    private void initRebalanceImpl() {
329
        this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
2✔
330
        this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
2✔
331
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
2✔
332
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
2✔
333
    }
2✔
334

335
    private void initPullAPIWrapper() {
336
        this.pullAPIWrapper = new PullAPIWrapper(
2✔
337
            mQClientFactory,
338
            this.defaultLitePullConsumer.getConsumerGroup(), isUnitMode());
2✔
339
        this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
2✔
340
    }
2✔
341

342
    private void initOffsetStore() throws MQClientException {
343
        if (this.defaultLitePullConsumer.getOffsetStore() != null) {
2✔
344
            this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
2✔
345
        } else {
346
            switch (this.defaultLitePullConsumer.getMessageModel()) {
2✔
347
                case BROADCASTING:
348
                    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
2✔
349
                    break;
2✔
350
                case CLUSTERING:
351
                    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
2✔
352
                    break;
2✔
353
                default:
354
                    break;
355
            }
356
            this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
2✔
357
        }
358
        this.offsetStore.load();
2✔
359
    }
2✔
360

361
    private void startScheduleTask() {
362
        scheduledExecutorService.scheduleAtFixedRate(
2✔
363
            new Runnable() {
2✔
364
                @Override
365
                public void run() {
366
                    try {
367
                        fetchTopicMessageQueuesAndCompare();
2✔
368
                    } catch (Exception e) {
×
369
                        log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
×
370
                    }
2✔
371
                }
2✔
372
            }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
2✔
373
    }
2✔
374

375
    private void operateAfterRunning() throws MQClientException {
376
        // If subscribe function invoke before start function, then update topic subscribe info after initialization.
377
        if (subscriptionType == SubscriptionType.SUBSCRIBE) {
2✔
378
            updateTopicSubscribeInfoWhenSubscriptionChanged();
2✔
379
        }
380
        // If assign function invoke before start function, then update pull task after initialization.
381
        if (subscriptionType == SubscriptionType.ASSIGN) {
2✔
382
            updateAssignPullTask(assignedMessageQueue.messageQueues());
2✔
383
        }
384

385
        for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
2✔
386
            Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
×
387
            messageQueuesForTopic.put(topic, messageQueues);
×
388
        }
×
389
        this.mQClientFactory.checkClientInBroker();
2✔
390
    }
2✔
391

392
    private void checkConfig() throws MQClientException {
393
        // Check consumerGroup
394
        Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
2✔
395

396
        // Check consumerGroup name is not equal default consumer group name.
397
        if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
2✔
398
            throw new MQClientException(
2✔
399
                "consumerGroup can not equal "
400
                    + MixAll.DEFAULT_CONSUMER_GROUP
401
                    + ", please specify another one."
402
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
2✔
403
                null);
404
        }
405

406
        // Check messageModel is not null.
407
        if (null == this.defaultLitePullConsumer.getMessageModel()) {
2✔
408
            throw new MQClientException(
2✔
409
                "messageModel is null"
410
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
2✔
411
                null);
412
        }
413

414
        // Check allocateMessageQueueStrategy is not null
415
        if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
2✔
416
            throw new MQClientException(
2✔
417
                "allocateMessageQueueStrategy is null"
418
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
2✔
419
                null);
420
        }
421

422
        if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
2✔
423
            throw new MQClientException(
2✔
424
                "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
425
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
2✔
426
                null);
427
        }
428
    }
2✔
429

430
    public PullAPIWrapper getPullAPIWrapper() {
431
        return pullAPIWrapper;
2✔
432
    }
433

434
    private void startPullTask(Collection<MessageQueue> mqSet) {
435
        for (MessageQueue messageQueue : mqSet) {
2✔
436
            if (!this.taskTable.containsKey(messageQueue)) {
2✔
437
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
2✔
438
                this.taskTable.put(messageQueue, pullTask);
2✔
439
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
2✔
440
            }
441
        }
2✔
442
    }
2✔
443

444
    private void updateAssignPullTask(Collection<MessageQueue> mqNewSet) {
445
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
2✔
446
        while (it.hasNext()) {
2✔
447
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
×
448
            if (!mqNewSet.contains(next.getKey())) {
×
449
                next.getValue().setCancelled(true);
×
450
                it.remove();
×
451
            }
452
        }
×
453

454
        startPullTask(mqNewSet);
2✔
455
    }
2✔
456

457
    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
458
        Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
2✔
459
        if (subTable != null) {
2✔
460
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
2✔
461
                final String topic = entry.getKey();
2✔
462
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
2✔
463
            }
2✔
464
        }
465
    }
2✔
466

467
    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
468
        try {
469
            if (topic == null || "".equals(topic)) {
2✔
470
                throw new IllegalArgumentException("Topic can not be null or empty.");
×
471
            }
472
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
2✔
473
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
2✔
474
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
2✔
475
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
2✔
476
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
2✔
477
            if (serviceState == ServiceState.RUNNING) {
2✔
478
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
2✔
479
                updateTopicSubscribeInfoWhenSubscriptionChanged();
2✔
480
            }
481
        } catch (Exception e) {
×
482
            throw new MQClientException("subscribe exception", e);
×
483
        }
2✔
484
    }
2✔
485

486
    public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
487
        try {
488
            if (topic == null || "".equals(topic)) {
×
489
                throw new IllegalArgumentException("Topic can not be null or empty.");
×
490
            }
491
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
×
492
            if (messageSelector == null) {
×
493
                subscribe(topic, SubscriptionData.SUB_ALL);
×
494
                return;
×
495
            }
496
            SubscriptionData subscriptionData = FilterAPI.build(topic,
×
497
                messageSelector.getExpression(), messageSelector.getExpressionType());
×
498
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
×
499
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
×
500
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
×
501
            if (serviceState == ServiceState.RUNNING) {
×
502
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
×
503
                updateTopicSubscribeInfoWhenSubscriptionChanged();
×
504
            }
505
        } catch (Exception e) {
×
506
            throw new MQClientException("subscribe exception", e);
×
507
        }
×
508
    }
×
509

510
    public synchronized void unsubscribe(final String topic) {
511
        this.rebalanceImpl.getSubscriptionInner().remove(topic);
×
512
        removePullTaskCallback(topic);
×
513
        assignedMessageQueue.removeAssignedMessageQueue(topic);
×
514
    }
×
515

516
    public synchronized void assign(Collection<MessageQueue> messageQueues) {
517
        if (messageQueues == null || messageQueues.isEmpty()) {
2✔
518
            throw new IllegalArgumentException("Message queues can not be null or empty.");
×
519
        }
520
        setSubscriptionType(SubscriptionType.ASSIGN);
2✔
521
        assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
2✔
522
        if (serviceState == ServiceState.RUNNING) {
2✔
523
            updateAssignPullTask(messageQueues);
2✔
524
        }
525
    }
2✔
526

527
    private void maybeAutoCommit() {
528
        long now = System.currentTimeMillis();
2✔
529
        if (now >= nextAutoCommitDeadline) {
2✔
530
            commitAll();
2✔
531
            nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis();
2✔
532
        }
533
    }
2✔
534

535
    public synchronized List<MessageExt> poll(long timeout) {
536
        try {
537
            checkServiceState();
2✔
538
            if (timeout < 0) {
2✔
539
                throw new IllegalArgumentException("Timeout must not be negative");
×
540
            }
541

542
            if (defaultLitePullConsumer.isAutoCommit()) {
2✔
543
                maybeAutoCommit();
2✔
544
            }
545
            long endTime = System.currentTimeMillis() + timeout;
2✔
546

547
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
2✔
548

549
            if (endTime - System.currentTimeMillis() > 0) {
2✔
550
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
2✔
551
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
×
552
                    if (endTime - System.currentTimeMillis() <= 0) {
×
553
                        break;
×
554
                    }
555
                }
556
            }
557

558
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
2✔
559
                List<MessageExt> messages = consumeRequest.getMessageExts();
2✔
560
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
2✔
561
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
2✔
562
                //If namespace not null , reset Topic without namespace.
563
                this.resetTopic(messages);
2✔
564
                return messages;
2✔
565
            }
566
        } catch (InterruptedException ignore) {
×
567

568
        }
2✔
569

570
        return Collections.emptyList();
2✔
571
    }
572

573
    public void pause(Collection<MessageQueue> messageQueues) {
574
        assignedMessageQueue.pause(messageQueues);
2✔
575
    }
2✔
576

577
    public void resume(Collection<MessageQueue> messageQueues) {
578
        assignedMessageQueue.resume(messageQueues);
2✔
579
    }
2✔
580

581
    public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
582
        if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
2✔
583
            if (subscriptionType == SubscriptionType.SUBSCRIBE) {
2✔
584
                throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
2✔
585
            } else {
586
                throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
2✔
587
            }
588
        }
589
        long minOffset = minOffset(messageQueue);
2✔
590
        long maxOffset = maxOffset(messageQueue);
2✔
591
        if (offset < minOffset || offset > maxOffset) {
2✔
592
            throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
2✔
593
        }
594
        final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
2✔
595
        synchronized (objLock) {
2✔
596
            clearMessageQueueInCache(messageQueue);
2✔
597

598
            PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue);
2✔
599
            if (oldPullTaskImpl != null) {
2✔
600
                oldPullTaskImpl.tryInterrupt();
2✔
601
                this.taskTable.remove(messageQueue);
2✔
602
            }
603
            assignedMessageQueue.setSeekOffset(messageQueue, offset);
2✔
604
            if (!this.taskTable.containsKey(messageQueue)) {
2✔
605
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
2✔
606
                this.taskTable.put(messageQueue, pullTask);
2✔
607
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
2✔
608
            }
609
        }
2✔
610
    }
2✔
611

612
    public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
613
        long begin = minOffset(messageQueue);
2✔
614
        this.seek(messageQueue, begin);
2✔
615
    }
2✔
616

617
    public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
618
        long end = maxOffset(messageQueue);
2✔
619
        this.seek(messageQueue, end);
2✔
620
    }
2✔
621

622
    private long maxOffset(MessageQueue messageQueue) throws MQClientException {
623
        checkServiceState();
2✔
624
        return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
2✔
625
    }
626

627
    private long minOffset(MessageQueue messageQueue) throws MQClientException {
628
        checkServiceState();
2✔
629
        return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
2✔
630
    }
631

632
    private void removePullTaskCallback(final String topic) {
633
        removePullTask(topic);
×
634
    }
×
635

636
    private void removePullTask(final String topic) {
637
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
×
638
        while (it.hasNext()) {
×
639
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
×
640
            if (next.getKey().getTopic().equals(topic)) {
×
641
                next.getValue().setCancelled(true);
×
642
                it.remove();
×
643
            }
644
        }
×
645
    }
×
646

647
    public synchronized void commitAll() {
648
        for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
2✔
649
            try {
650
                commit(messageQueue);
2✔
651
            } catch (Exception e) {
×
652
                log.error("An error occurred when update consume offset Automatically.");
×
653
            }
2✔
654
        }
2✔
655
    }
2✔
656

657
    public synchronized void commit(final Set<MessageQueue> messageQueues, boolean persist) {
658
        if (messageQueues == null || messageQueues.size() == 0) {
2✔
659
            return;
×
660
        }
661

662
        for (MessageQueue messageQueue : messageQueues) {
2✔
663
            commit(messageQueue);
2✔
664
        }
2✔
665

666
        if (persist) {
2✔
667
            this.offsetStore.persistAll(messageQueues);
2✔
668
        }
669
    }
2✔
670

671
    private synchronized void commit(MessageQueue messageQueue) {
672
        long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
2✔
673

674
        if (consumerOffset != -1) {
2✔
675
            ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
×
676
            if (processQueue != null && !processQueue.isDropped()) {
×
677
                updateConsumeOffset(messageQueue, consumerOffset);
×
678
            }
679
        } else {
×
680
            log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "].");
2✔
681
        }
682
    }
2✔
683

684
    private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
685
        if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
2✔
686
            assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
2✔
687
        }
688
    }
2✔
689

690
    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
691
        try {
692
            consumeRequestCache.put(consumeRequest);
2✔
693
        } catch (InterruptedException e) {
×
694
            log.error("Submit consumeRequest error", e);
×
695
        }
2✔
696
    }
2✔
697

698
    private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
699
        checkServiceState();
2✔
700
        long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
2✔
701
        return offset;
2✔
702
    }
703

704
    public long committed(MessageQueue messageQueue) throws MQClientException {
705
        checkServiceState();
2✔
706
        long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
2✔
707
        if (offset == -2) {
2✔
708
            throw new MQClientException("Fetch consume offset from broker exception", null);
×
709
        }
710
        return offset;
2✔
711
    }
712

713
    private void clearMessageQueueInCache(MessageQueue messageQueue) {
714
        ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
2✔
715
        if (processQueue != null) {
2✔
716
            processQueue.clear();
2✔
717
        }
718
        Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
2✔
719
        while (iter.hasNext()) {
2✔
720
            if (iter.next().getMessageQueue().equals(messageQueue)) {
×
721
                iter.remove();
×
722
            }
723
        }
724
    }
2✔
725

726
    private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
727
        long offset = -1;
2✔
728
        long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
2✔
729
        if (seekOffset != -1) {
2✔
730
            offset = seekOffset;
2✔
731
            assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
2✔
732
            assignedMessageQueue.setSeekOffset(messageQueue, -1);
2✔
733
        } else {
734
            offset = assignedMessageQueue.getPullOffset(messageQueue);
2✔
735
            if (offset == -1) {
2✔
736
                offset = fetchConsumeOffset(messageQueue);
2✔
737
            }
738
        }
739
        return offset;
2✔
740
    }
741

742
    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
743
        checkServiceState();
2✔
744
        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
2✔
745
    }
746

747
    public class PullTaskImpl implements Runnable {
748
        private final MessageQueue messageQueue;
749
        private volatile boolean cancelled = false;
2✔
750
        private Thread currentThread;
751

752
        public PullTaskImpl(final MessageQueue messageQueue) {
2✔
753
            this.messageQueue = messageQueue;
2✔
754
        }
2✔
755

756
        public void tryInterrupt() {
757
            setCancelled(true);
2✔
758
            if (currentThread == null) {
2✔
759
                return;
2✔
760
            }
761
            if (!currentThread.isInterrupted()) {
×
762
                currentThread.interrupt();
×
763
            }
764
        }
×
765

766
        @Override
767
        public void run() {
768

769
            if (!this.isCancelled()) {
2✔
770

771
                this.currentThread = Thread.currentThread();
2✔
772

773
                if (assignedMessageQueue.isPaused(messageQueue)) {
2✔
774
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
2✔
775
                    log.debug("Message Queue: {} has been paused!", messageQueue);
2✔
776
                    return;
2✔
777
                }
778

779
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
2✔
780

781
                if (null == processQueue || processQueue.isDropped()) {
2✔
782
                    log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
2✔
783
                    return;
2✔
784
                }
785

786
                if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
2✔
787
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
2✔
788
                    if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
2✔
789
                        log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
2✔
790
                    }
791
                    return;
2✔
792
                }
793

794
                long cachedMessageCount = processQueue.getMsgCount().get();
2✔
795
                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
2✔
796

797
                if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
2✔
798
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
2✔
799
                    if ((queueFlowControlTimes++ % 1000) == 0) {
2✔
800
                        log.warn(
2✔
801
                            "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
802
                            defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
×
803
                    }
804
                    return;
2✔
805
                }
806

807
                if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
2✔
808
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
2✔
809
                    if ((queueFlowControlTimes++ % 1000) == 0) {
2✔
810
                        log.warn(
2✔
811
                            "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
812
                            defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
×
813
                    }
814
                    return;
2✔
815
                }
816

817
                if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
2✔
818
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
2✔
819
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
2✔
820
                        log.warn(
2✔
821
                            "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
822
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
×
823
                    }
824
                    return;
2✔
825
                }
826

827
                long offset = 0L;
2✔
828
                try {
829
                    offset = nextPullOffset(messageQueue);
2✔
830
                } catch (Exception e) {
×
831
                    log.error("Failed to get next pull offset", e);
×
832
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
×
833
                    return;
×
834
                }
2✔
835

836
                if (this.isCancelled() || processQueue.isDropped()) {
2✔
837
                    return;
×
838
                }
839
                long pullDelayTimeMills = 0;
2✔
840
                try {
841
                    SubscriptionData subscriptionData;
842
                    String topic = this.messageQueue.getTopic();
2✔
843
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
2✔
844
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
2✔
845
                    } else {
846
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
2✔
847
                    }
848

849
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
2✔
850
                    if (this.isCancelled() || processQueue.isDropped()) {
2✔
851
                        return;
×
852
                    }
853
                    switch (pullResult.getPullStatus()) {
2✔
854
                        case FOUND:
855
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
2✔
856
                            synchronized (objLock) {
2✔
857
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
2✔
858
                                    processQueue.putMessage(pullResult.getMsgFoundList());
2✔
859
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
2✔
860
                                }
861
                            }
2✔
862
                            break;
2✔
863
                        case OFFSET_ILLEGAL:
864
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
×
865
                            break;
×
866
                        default:
867
                            break;
868
                    }
869
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
2✔
870
                } catch (InterruptedException interruptedException) {
×
871
                    log.warn("Polling thread was interrupted.", interruptedException);
×
872
                } catch (Throwable e) {
×
873
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
×
874
                    log.error("An error occurred in pull message process.", e);
×
875
                }
2✔
876

877
                if (!this.isCancelled()) {
2✔
878
                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
2✔
879
                } else {
880
                    log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
×
881
                }
882
            }
883
        }
2✔
884

885
        public boolean isCancelled() {
886
            return cancelled;
2✔
887
        }
888

889
        public void setCancelled(boolean cancelled) {
890
            this.cancelled = cancelled;
2✔
891
        }
2✔
892

893
        public MessageQueue getMessageQueue() {
894
            return messageQueue;
×
895
        }
896
    }
897

898
    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
899
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
900
        return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
2✔
901
    }
902

903
    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout)
904
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
905
        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
2✔
906
    }
907

908
    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
909
        boolean block,
910
        long timeout)
911
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
912

913
        if (null == mq) {
2✔
914
            throw new MQClientException("mq is null", null);
×
915
        }
916

917
        if (offset < 0) {
2✔
918
            throw new MQClientException("offset < 0", null);
×
919
        }
920

921
        if (maxNums <= 0) {
2✔
922
            throw new MQClientException("maxNums <= 0", null);
×
923
        }
924

925
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);
2✔
926

927
        long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
2✔
928

929
        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
2✔
930
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
2✔
931
            mq,
932
            subscriptionData.getSubString(),
2✔
933
            subscriptionData.getExpressionType(),
2✔
934
            isTagType ? 0L : subscriptionData.getSubVersion(),
2✔
935
            offset,
936
            maxNums,
937
            sysFlag,
938
            0,
939
            this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(),
2✔
940
            timeoutMillis,
941
            CommunicationMode.SYNC,
942
            null
943
        );
944
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
2✔
945
        if (!this.consumeMessageHookList.isEmpty()) {
2✔
946
            ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
2✔
947
            consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
2✔
948
            consumeMessageContext.setConsumerGroup(this.groupName());
2✔
949
            consumeMessageContext.setMq(mq);
2✔
950
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
2✔
951
            consumeMessageContext.setSuccess(false);
2✔
952
            this.executeHookBefore(consumeMessageContext);
2✔
953
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
2✔
954
            consumeMessageContext.setSuccess(true);
2✔
955
            this.executeHookAfter(consumeMessageContext);
2✔
956
        }
957
        return pullResult;
2✔
958
    }
959

960
    private void resetTopic(List<MessageExt> msgList) {
961
        if (null == msgList || msgList.size() == 0) {
2✔
962
            return;
×
963
        }
964

965
        //If namespace not null , reset Topic without namespace.
966
        for (MessageExt messageExt : msgList) {
2✔
967
            if (null != this.defaultLitePullConsumer.getNamespace()) {
2✔
968
                messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.defaultLitePullConsumer.getNamespace()));
×
969
            }
970
        }
2✔
971

972
    }
2✔
973

974
    public void updateConsumeOffset(MessageQueue mq, long offset) {
975
        checkServiceState();
×
976
        this.offsetStore.updateOffset(mq, offset, false);
×
977
    }
×
978

979
    @Override
980
    public String groupName() {
981
        return this.defaultLitePullConsumer.getConsumerGroup();
2✔
982
    }
983

984
    @Override
985
    public MessageModel messageModel() {
986
        return this.defaultLitePullConsumer.getMessageModel();
2✔
987
    }
988

989
    @Override
990
    public ConsumeType consumeType() {
991
        return ConsumeType.CONSUME_ACTIVELY;
2✔
992
    }
993

994
    @Override
995
    public ConsumeFromWhere consumeFromWhere() {
996
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
2✔
997
    }
998

999
    @Override
1000
    public Set<SubscriptionData> subscriptions() {
1001
        Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
2✔
1002

1003
        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
2✔
1004

1005
        return subSet;
2✔
1006
    }
1007

1008
    @Override
1009
    public void doRebalance() {
1010
        if (this.rebalanceImpl != null) {
2✔
1011
            this.rebalanceImpl.doRebalance(false);
2✔
1012
        }
1013
    }
2✔
1014

1015
    @Override
1016
    public void persistConsumerOffset() {
1017
        try {
1018
            checkServiceState();
2✔
1019
            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
2✔
1020
            if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
2✔
1021
                Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
2✔
1022
                mqs.addAll(allocateMq);
2✔
1023
            } else if (this.subscriptionType == SubscriptionType.ASSIGN) {
2✔
1024
                Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();
2✔
1025
                mqs.addAll(assignedMessageQueue);
2✔
1026
            }
1027
            this.offsetStore.persistAll(mqs);
2✔
1028
        } catch (Exception e) {
2✔
1029
            log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e);
2✔
1030
        }
2✔
1031
    }
2✔
1032

1033
    @Override
1034
    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
1035
        Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
2✔
1036
        if (subTable != null) {
2✔
1037
            if (subTable.containsKey(topic)) {
2✔
1038
                this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
2✔
1039
            }
1040
        }
1041
    }
2✔
1042

1043
    @Override
1044
    public boolean isSubscribeTopicNeedUpdate(String topic) {
1045
        Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
×
1046
        if (subTable != null) {
×
1047
            if (subTable.containsKey(topic)) {
×
1048
                return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
×
1049
            }
1050
        }
1051

1052
        return false;
×
1053
    }
1054

1055
    @Override
1056
    public boolean isUnitMode() {
1057
        return this.defaultLitePullConsumer.isUnitMode();
2✔
1058
    }
1059

1060
    @Override
1061
    public ConsumerRunningInfo consumerRunningInfo() {
1062
        ConsumerRunningInfo info = new ConsumerRunningInfo();
×
1063

1064
        Properties prop = MixAll.object2Properties(this.defaultLitePullConsumer);
×
1065
        prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));
×
1066
        info.setProperties(prop);
×
1067

1068
        info.getSubscriptionSet().addAll(this.subscriptions());
×
1069
        return info;
×
1070
    }
1071

1072
    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
1073
        MQBrokerException, InterruptedException, MQClientException {
1074
        this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
×
1075
    }
×
1076

1077
    public OffsetStore getOffsetStore() {
1078
        return offsetStore;
2✔
1079
    }
1080

1081
    public DefaultLitePullConsumer getDefaultLitePullConsumer() {
1082
        return defaultLitePullConsumer;
2✔
1083
    }
1084

1085
    public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
1086
        checkServiceState();
2✔
1087
        Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
2✔
1088
        return parseMessageQueues(result);
2✔
1089
    }
1090

1091
    private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
1092
        for (Map.Entry<String, TopicMessageQueueChangeListener> entry : topicMessageQueueChangeListenerMap.entrySet()) {
2✔
1093
            String topic = entry.getKey();
2✔
1094
            TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
2✔
1095
            Set<MessageQueue> oldMessageQueues = messageQueuesForTopic.get(topic);
2✔
1096
            Set<MessageQueue> newMessageQueues = fetchMessageQueues(topic);
2✔
1097
            boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues);
2✔
1098
            if (isChanged) {
2✔
1099
                messageQueuesForTopic.put(topic, newMessageQueues);
2✔
1100
                if (topicMessageQueueChangeListener != null) {
2✔
1101
                    topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
2✔
1102
                }
1103
            }
1104
        }
2✔
1105
    }
2✔
1106

1107
    private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
1108
        if (set1 == null && set2 == null) {
2✔
1109
            return true;
×
1110
        }
1111

1112
        if (set1 == null || set2 == null || set1.size() != set2.size()
2✔
1113
            || set1.size() == 0 || set2.size() == 0) {
1✔
1114
            return false;
2✔
1115
        }
1116

1117
        Iterator iter = set2.iterator();
1✔
1118
        boolean isEqual = true;
1✔
1119
        while (iter.hasNext()) {
1✔
1120
            if (!set1.contains(iter.next())) {
1✔
1121
                isEqual = false;
×
1122
            }
1123
        }
1124
        return isEqual;
1✔
1125
    }
1126

1127
    public synchronized void registerTopicMessageQueueChangeListener(String topic,
1128
        TopicMessageQueueChangeListener listener) throws MQClientException {
1129
        if (topic == null || listener == null) {
2✔
1130
            throw new MQClientException("Topic or listener is null", null);
×
1131
        }
1132
        if (topicMessageQueueChangeListenerMap.containsKey(topic)) {
2✔
1133
            log.warn("Topic {} had been registered, new listener will overwrite the old one", topic);
×
1134
        }
1135
        topicMessageQueueChangeListenerMap.put(topic, listener);
2✔
1136
        if (this.serviceState == ServiceState.RUNNING) {
2✔
1137
            Set<MessageQueue> messageQueues = fetchMessageQueues(topic);
2✔
1138
            messageQueuesForTopic.put(topic, messageQueues);
2✔
1139
        }
1140
    }
2✔
1141

1142
    private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {
1143
        Set<MessageQueue> resultQueues = new HashSet<MessageQueue>();
2✔
1144
        for (MessageQueue messageQueue : queueSet) {
2✔
1145
            String userTopic = NamespaceUtil.withoutNamespace(messageQueue.getTopic(),
2✔
1146
                this.defaultLitePullConsumer.getNamespace());
2✔
1147
            resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
2✔
1148
        }
2✔
1149
        return resultQueues;
2✔
1150
    }
1151

1152
    public class ConsumeRequest {
1153
        private final List<MessageExt> messageExts;
1154
        private final MessageQueue messageQueue;
1155
        private final ProcessQueue processQueue;
1156

1157
        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue,
1158
            final ProcessQueue processQueue) {
2✔
1159
            this.messageExts = messageExts;
2✔
1160
            this.messageQueue = messageQueue;
2✔
1161
            this.processQueue = processQueue;
2✔
1162
        }
2✔
1163

1164
        public List<MessageExt> getMessageExts() {
1165
            return messageExts;
2✔
1166
        }
1167

1168
        public MessageQueue getMessageQueue() {
1169
            return messageQueue;
2✔
1170
        }
1171

1172
        public ProcessQueue getProcessQueue() {
1173
            return processQueue;
2✔
1174
        }
1175

1176
    }
1177

1178
    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
1179
        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
×
1180
    }
×
1181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc