• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq-spring / 474

pending completion
474

push

travis-ci-com

web-flow
[ISSUE #507]Feat: support send oderly delay message for RocketMQTemplate

7 of 7 new or added lines in 1 file covered. (100.0%)

803 of 1272 relevant lines covered (63.13%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.7
/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.rocketmq.spring.core;
19

20
import org.apache.rocketmq.client.Validators;
21
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
22
import org.apache.rocketmq.client.exception.MQClientException;
23
import org.apache.rocketmq.client.producer.DefaultMQProducer;
24
import org.apache.rocketmq.client.producer.MessageQueueSelector;
25
import org.apache.rocketmq.client.producer.RequestCallback;
26
import org.apache.rocketmq.client.producer.SendCallback;
27
import org.apache.rocketmq.client.producer.SendResult;
28
import org.apache.rocketmq.client.producer.TransactionMQProducer;
29
import org.apache.rocketmq.client.producer.TransactionSendResult;
30
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
31
import org.apache.rocketmq.common.message.MessageBatch;
32
import org.apache.rocketmq.common.message.MessageClientIDSetter;
33
import org.apache.rocketmq.common.message.MessageExt;
34
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
35
import org.apache.rocketmq.spring.support.RocketMQUtil;
36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
38
import org.springframework.aop.framework.AopProxyUtils;
39
import org.springframework.beans.factory.DisposableBean;
40
import org.springframework.beans.factory.InitializingBean;
41
import org.springframework.messaging.Message;
42
import org.springframework.messaging.MessageHeaders;
43
import org.springframework.messaging.MessagingException;
44
import org.springframework.messaging.converter.SmartMessageConverter;
45
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
46
import org.springframework.messaging.core.MessagePostProcessor;
47
import org.springframework.messaging.support.MessageBuilder;
48
import org.springframework.util.MimeTypeUtils;
49

50
import java.lang.reflect.ParameterizedType;
51
import java.lang.reflect.Type;
52
import java.nio.charset.Charset;
53
import java.util.ArrayList;
54
import java.util.Collection;
55
import java.util.List;
56
import java.util.Map;
57
import java.util.Objects;
58
import java.util.concurrent.ExecutorService;
59

60
@SuppressWarnings({"WeakerAccess", "unused"})
61
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
1✔
62
    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
1✔
63

64
    private DefaultMQProducer producer;
65

66
    private DefaultLitePullConsumer consumer;
67

68
    private String charset = "UTF-8";
1✔
69

70
    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
1✔
71

72
    private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
1✔
73

74
    public DefaultMQProducer getProducer() {
75
        return producer;
1✔
76
    }
77

78
    public void setProducer(DefaultMQProducer producer) {
79
        this.producer = producer;
1✔
80
    }
1✔
81

82
    public DefaultLitePullConsumer getConsumer() {
83
        return consumer;
1✔
84
    }
85

86
    public void setConsumer(DefaultLitePullConsumer consumer) {
87
        this.consumer = consumer;
1✔
88
    }
1✔
89

90
    public String getCharset() {
91
        return charset;
×
92
    }
93

94
    public void setCharset(String charset) {
95
        this.charset = charset;
×
96
    }
×
97

98
    public MessageQueueSelector getMessageQueueSelector() {
99
        return messageQueueSelector;
×
100
    }
101

102
    public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
103
        this.messageQueueSelector = messageQueueSelector;
×
104
    }
×
105

106
    public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
107
        this.producer.setAsyncSenderExecutor(asyncSenderExecutor);
1✔
108
    }
1✔
109

110
    /**
111
     * @param destination formats: `topicName:tags`
112
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
113
     * @param type The type of T
114
     * @return
115
     */
116
    public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
117
        return sendAndReceive(destination, message, type, null, producer.getSendMsgTimeout(), 0);
×
118
    }
119

120
    /**
121
     * @param destination formats: `topicName:tags`
122
     * @param payload the payload to be sent.
123
     * @param type The type of T
124
     * @return
125
     */
126
    public <T> T sendAndReceive(String destination, Object payload, Type type) {
127
        return sendAndReceive(destination, payload, type, null, producer.getSendMsgTimeout(), 0);
×
128
    }
129

130
    /**
131
     * @param destination formats: `topicName:tags`
132
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
133
     * @param type The type of T
134
     * @param timeout send timeout in millis
135
     * @return
136
     */
137
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
138
        return sendAndReceive(destination, message, type, null, timeout, 0);
×
139
    }
140

141
    /**
142
     * @param destination formats: `topicName:tags`
143
     * @param payload the payload to be sent.
144
     * @param type The type of T
145
     * @param timeout send timeout in millis
146
     * @return
147
     */
148
    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
149
        return sendAndReceive(destination, payload, type, null, timeout, 0);
×
150
    }
151

152
    /**
153
     * @param destination formats: `topicName:tags`
154
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
155
     * @param type The type of T
156
     * @param timeout send timeout in millis
157
     * @param delayLevel message delay level(0 means no delay)
158
     * @return
159
     */
160
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
161
        return sendAndReceive(destination, message, type, null, timeout, delayLevel);
×
162
    }
163

164
    /**
165
     * @param destination formats: `topicName:tags`
166
     * @param payload the payload to be sent.
167
     * @param type The type of T
168
     * @param timeout send timeout in millis
169
     * @param delayLevel message delay level(0 means no delay)
170
     * @return
171
     */
172
    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
173
        return sendAndReceive(destination, payload, type, null, timeout, delayLevel);
×
174
    }
175

176
    /**
177
     * @param destination formats: `topicName:tags`
178
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
179
     * @param type The type of T
180
     * @param hashKey needed when sending message orderly
181
     * @return
182
     */
183
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
184
        return sendAndReceive(destination, message, type, hashKey, producer.getSendMsgTimeout(), 0);
×
185
    }
186

187
    /**
188
     * @param destination formats: `topicName:tags`
189
     * @param payload the payload to be sent.
190
     * @param type The type of T
191
     * @param hashKey needed when sending message orderly
192
     * @return
193
     */
194
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
195
        return sendAndReceive(destination, payload, type, hashKey, producer.getSendMsgTimeout(), 0);
×
196
    }
197

198
    /**
199
     * @param destination formats: `topicName:tags`
200
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
201
     * @param type The type of T
202
     * @param hashKey needed when sending message orderly
203
     * @param timeout send timeout in millis
204
     * @return
205
     */
206
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
207
        return sendAndReceive(destination, message, type, hashKey, timeout, 0);
×
208
    }
209

210
    /**
211
     * @param destination formats: `topicName:tags`
212
     * @param payload the payload to be sent.
213
     * @param type The type of T
214
     * @param hashKey
215
     * @return
216
     */
217
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, long timeout) {
218
        return sendAndReceive(destination, payload, type, hashKey, timeout, 0);
×
219
    }
220

221
    /**
222
     * @param destination formats: `topicName:tags`
223
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
224
     * @param type The type that receive
225
     * @param hashKey needed when sending message orderly
226
     * @param timeout send timeout in millis
227
     * @param delayLevel message delay level(0 means no delay)
228
     * @return
229
     */
230
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
231
        long timeout, int delayLevel) {
232
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
233
            log.error("send request message failed. destination:{}, message is null ", destination);
1✔
234
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
1✔
235
        }
236

237
        try {
238
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
239
            if (delayLevel > 0) {
1✔
240
                rocketMsg.setDelayTimeLevel(delayLevel);
×
241
            }
242
            MessageExt replyMessage;
243

244
            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
1✔
245
                replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
×
246
            } else {
247
                replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
×
248
            }
249
            return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
×
250
        } catch (Exception e) {
1✔
251
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
1✔
252
            throw new MessagingException(e.getMessage(), e);
1✔
253
        }
254
    }
255

256
    /**
257
     * @param destination formats: `topicName:tags`
258
     * @param payload the payload to be sent.
259
     * @param type The type that receive
260
     * @param hashKey needed when sending message orderly
261
     * @param timeout send timeout in millis
262
     * @param delayLevel message delay level(0 means no delay)
263
     * @return
264
     */
265
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
266
        long timeout, int delayLevel) {
267
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
268
        return sendAndReceive(destination, message, type, hashKey, timeout, delayLevel);
×
269
    }
270

271
    /**
272
     * @param destination formats: `topicName:tags`
273
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
274
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
275
     * @return
276
     */
277
    public void sendAndReceive(String destination, Message<?> message,
278
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
279
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
×
280
    }
×
281

282
    /**
283
     * @param destination formats: `topicName:tags`
284
     * @param payload the payload to be sent.
285
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
286
     * @return
287
     */
288
    public void sendAndReceive(String destination, Object payload,
289
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
290
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
×
291
    }
×
292

293
    /**
294
     * @param destination formats: `topicName:tags`
295
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
296
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
297
     * @param timeout send timeout in millis
298
     * @return
299
     */
300
    public void sendAndReceive(String destination, Message<?> message,
301
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
302
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, 0);
×
303
    }
×
304

305
    /**
306
     * @param destination formats: `topicName:tags`
307
     * @param payload the payload to be sent.
308
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
309
     * @param timeout send timeout in millis
310
     * @return
311
     */
312
    public void sendAndReceive(String destination, Object payload,
313
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
314
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, 0);
×
315
    }
×
316

317
    /**
318
     * @param destination formats: `topicName:tags`
319
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
320
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
321
     * @param timeout send timeout in millis
322
     * @param delayLevel message delay level(0 means no delay)
323
     * @return
324
     */
325
    public void sendAndReceive(String destination, Message<?> message,
326
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
327
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, delayLevel);
×
328
    }
×
329

330
    /**
331
     * @param destination formats: `topicName:tags`
332
     * @param payload the payload to be sent.
333
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
334
     * @param hashKey needed when sending message orderly
335
     * @return
336
     */
337
    public void sendAndReceive(String destination, Object payload,
338
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
339
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
×
340
    }
×
341

342
    /**
343
     * @param destination formats: `topicName:tags`
344
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
345
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
346
     * @param hashKey needed when sending message orderly
347
     * @param timeout send timeout in millis
348
     * @return
349
     */
350
    public void sendAndReceive(String destination, Message<?> message,
351
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
352
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, 0);
×
353
    }
×
354

355
    /**
356
     * @param destination formats: `topicName:tags`
357
     * @param payload the payload to be sent.
358
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
359
     * @param hashKey needed when sending message orderly
360
     * @param timeout send timeout in millis
361
     * @return
362
     */
363
    public void sendAndReceive(String destination, Object payload,
364
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
365
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, timeout, 0);
×
366
    }
×
367

368
    /**
369
     * @param destination formats: `topicName:tags`
370
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
371
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
372
     * @param hashKey needed when sending message orderly
373
     * @return
374
     */
375
    public void sendAndReceive(String destination, Message<?> message,
376
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
377
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
×
378
    }
×
379

380
    /**
381
     * @param destination formats: `topicName:tags`
382
     * @param payload the payload to be sent.
383
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
384
     * @param timeout send timeout in millis
385
     * @param delayLevel message delay level(0 means no delay)
386
     * @return
387
     */
388
    public void sendAndReceive(String destination, Object payload,
389
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
390
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, delayLevel);
×
391
    }
×
392

393
    /**
394
     * @param destination formats: `topicName:tags`
395
     * @param payload the payload to be sent.
396
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
397
     * @param hashKey needed when sending message orderly
398
     * @param timeout send timeout in millis
399
     * @param delayLevel message delay level(0 means no delay)
400
     * @return
401
     */
402
    public void sendAndReceive(String destination, Object payload,
403
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
404
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
405
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, delayLevel);
×
406
    }
×
407

408
    /**
409
     * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
410
     * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
411
     *
412
     * @param destination formats: `topicName:tags`
413
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
414
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
415
     * @param hashKey needed when sending message orderly
416
     * @param timeout send timeout in millis
417
     * @param delayLevel message delay level(0 means no delay)
418
     * @return
419
     */
420
    public void sendAndReceive(String destination, Message<?> message,
421
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
422
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
423
            log.error("send request message failed. destination:{}, message is null ", destination);
×
424
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
425
        }
426

427
        try {
428
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
429
            if (delayLevel > 0) {
1✔
430
                rocketMsg.setDelayTimeLevel(delayLevel);
×
431
            }
432
            if (timeout <= 0) {
1✔
433
                timeout = producer.getSendMsgTimeout();
×
434
            }
435
            RequestCallback requestCallback = null;
1✔
436
            if (rocketMQLocalRequestCallback != null) {
1✔
437
                requestCallback = new RequestCallback() {
1✔
438
                    @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
439
                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
×
440
                    }
×
441

442
                    @Override public void onException(Throwable e) {
443
                        rocketMQLocalRequestCallback.onException(e);
×
444
                    }
×
445
                };
446
            }
447
            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
1✔
448
                producer.request(rocketMsg, requestCallback, timeout);
×
449
            } else {
450
                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
×
451
            }
452
        } catch (
1✔
453
            Exception e) {
454
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
1✔
455
            throw new MessagingException(e.getMessage(), e);
1✔
456
        }
×
457

458
    }
×
459

460
    /**
461
     * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
462
     * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
463
     * notification, SMS marketing system, etc.. </p>
464
     * <p>
465
     * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
466
     * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
467
     * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
468
     * duplication issue.
469
     *
470
     * @param destination formats: `topicName:tags`
471
     * @param message {@link org.springframework.messaging.Message}
472
     * @return {@link SendResult}
473
     */
474
    public SendResult syncSend(String destination, Message<?> message) {
475
        return syncSend(destination, message, producer.getSendMsgTimeout());
×
476
    }
477

478
    /**
479
     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
480
     *
481
     * @param destination formats: `topicName:tags`
482
     * @param message {@link org.springframework.messaging.Message}
483
     * @param timeout send timeout with millis
484
     * @return {@link SendResult}
485
     */
486
    public SendResult syncSend(String destination, Message<?> message, long timeout) {
487
        return syncSend(destination, message, timeout, 0);
×
488
    }
489

490
    /**
491
     * syncSend batch messages
492
     *
493
     * @param destination formats: `topicName:tags`
494
     * @param messages Collection of {@link org.springframework.messaging.Message}
495
     * @return {@link SendResult}
496
     */
497
    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
498
        return syncSend(destination, messages, producer.getSendMsgTimeout());
×
499
    }
500

501
    /**
502
     * syncSend batch messages in a given timeout.
503
     *
504
     * @param destination formats: `topicName:tags`
505
     * @param messages Collection of {@link org.springframework.messaging.Message}
506
     * @param timeout send timeout with millis
507
     * @return {@link SendResult}
508
     */
509
    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
510
        if (Objects.isNull(messages) || messages.size() == 0) {
1✔
511
            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
1✔
512
            throw new IllegalArgumentException("`messages` can not be empty");
1✔
513
        }
514

515
        try {
516
            long now = System.currentTimeMillis();
×
517
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
×
518
            for (Message msg : messages) {
×
519
                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
×
520
                    log.warn("Found a message empty in the batch, skip it");
×
521
                    continue;
×
522
                }
523
                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
×
524
            }
×
525

526
            SendResult sendResult = producer.send(rmqMsgs, timeout);
×
527
            long costTime = System.currentTimeMillis() - now;
×
528
            if (log.isDebugEnabled()) {
×
529
                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
530
            }
531
            return sendResult;
×
532
        } catch (Exception e) {
×
533
            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
×
534
            throw new MessagingException(e.getMessage(), e);
×
535
        }
536
    }
537

538
    /**
539
     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
540
     *
541
     * @param destination formats: `topicName:tags`
542
     * @param message {@link org.springframework.messaging.Message}
543
     * @param timeout send timeout with millis
544
     * @param delayLevel level for the delay message
545
     * @return {@link SendResult}
546
     */
547
    public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
548
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
549
            log.error("syncSend failed. destination:{}, message is null ", destination);
×
550
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
551
        }
552
        try {
553
            long now = System.currentTimeMillis();
1✔
554
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
555
            if (delayLevel > 0) {
1✔
556
                rocketMsg.setDelayTimeLevel(delayLevel);
×
557
            }
558
            SendResult sendResult = producer.send(rocketMsg, timeout);
×
559
            long costTime = System.currentTimeMillis() - now;
×
560
            if (log.isDebugEnabled()) {
×
561
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
562
            }
563
            return sendResult;
×
564
        } catch (Exception e) {
1✔
565
            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
1✔
566
            throw new MessagingException(e.getMessage(), e);
1✔
567
        }
568
    }
569

570
    /**
571
     * Same to {@link #syncSend(String, Message)}.
572
     *
573
     * @param destination formats: `topicName:tags`
574
     * @param payload the Object to use as payload
575
     * @return {@link SendResult}
576
     */
577
    public SendResult syncSend(String destination, Object payload) {
578
        return syncSend(destination, payload, producer.getSendMsgTimeout());
×
579
    }
580

581
    /**
582
     * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
583
     *
584
     * @param destination formats: `topicName:tags`
585
     * @param payload the Object to use as payload
586
     * @param timeout send timeout with millis
587
     * @return {@link SendResult}
588
     */
589
    public SendResult syncSend(String destination, Object payload, long timeout) {
590
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
591
        return syncSend(destination, message, timeout);
×
592
    }
593

594
    /**
595
     * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
596
     *
597
     * @param destination formats: `topicName:tags`
598
     * @param message {@link org.springframework.messaging.Message}
599
     * @param hashKey use this key to select queue. for example: orderId, productId ...
600
     * @return {@link SendResult}
601
     */
602
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
603
        return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
×
604
    }
605

606
    /**
607
     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
608
     *
609
     * @param destination formats: `topicName:tags`
610
     * @param message {@link org.springframework.messaging.Message}
611
     * @param hashKey use this key to select queue. for example: orderId, productId ...
612
     * @param timeout send timeout with millis
613
     * @return {@link SendResult}
614
     */
615
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
616
        return syncSendOrderly(destination, message, hashKey, timeout, 0);
×
617
    }
618

619
    /**
620
     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
621
     *
622
     * @param destination formats: `topicName:tags`
623
     * @param message {@link org.springframework.messaging.Message}
624
     * @param hashKey use this key to select queue. for example: orderId, productId ...
625
     * @param timeout send timeout with millis
626
     * @param delayLevel level for the delay message
627
     * @return {@link SendResult}
628
     */
629
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
630
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
631
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
×
632
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
633
        }
634
        try {
635
            long now = System.currentTimeMillis();
1✔
636
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
637
            if (delayLevel > 0) {
1✔
638
                rocketMsg.setDelayTimeLevel(delayLevel);
×
639
            }
640
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
×
641
            long costTime = System.currentTimeMillis() - now;
×
642
            if (log.isDebugEnabled()) {
×
643
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
644
            }
645
            return sendResult;
×
646
        } catch (Exception e) {
1✔
647
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
1✔
648
            throw new MessagingException(e.getMessage(), e);
1✔
649
        }
650
    }
651

652
    /**
653
     * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
654
     *
655
     * @param destination formats: `topicName:tags`
656
     * @param payload the Object to use as payload
657
     * @param hashKey use this key to select queue. for example: orderId, productId ...
658
     * @return {@link SendResult}
659
     */
660
    public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
661
        return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
×
662
    }
663

664
    /**
665
     * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
666
     *
667
     * @param destination formats: `topicName:tags`
668
     * @param payload the Object to use as payload
669
     * @param hashKey use this key to select queue. for example: orderId, productId ...
670
     * @param timeout send timeout with millis
671
     * @return {@link SendResult}
672
     */
673
    public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
674
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
675
        return syncSendOrderly(destination, message, hashKey, timeout);
×
676
    }
677

678
    /**
679
     * syncSend batch messages orderly.
680
     *
681
     * @param destination formats: `topicName:tags`
682
     * @param messages    Collection of {@link org.springframework.messaging.Message}
683
     * @param hashKey     use this key to select queue. for example: orderId, productId ...
684
     * @return {@link SendResult}
685
     */
686
    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
687
        return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
×
688
    }
689

690
    /**
691
     * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
692
     *
693
     * @param destination formats: `topicName:tags`
694
     * @param messages    Collection of {@link org.springframework.messaging.Message}
695
     * @param hashKey     use this key to select queue. for example: orderId, productId ...
696
     * @param timeout     send timeout with millis
697
     * @return {@link SendResult}
698
     */
699
    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
700
        if (Objects.isNull(messages) || messages.isEmpty()) {
×
701
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
×
702
            throw new IllegalArgumentException("`messages` can not be empty");
×
703
        }
704
        try {
705
            long now = System.currentTimeMillis();
×
706
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
×
707
            for (T message : messages) {
×
708
                if (Objects.isNull(message)) {
×
709
                    continue;
×
710
                }
711
                rmqMsgs.add(this.createRocketMqMessage(destination, message));
×
712
            }
×
713
            MessageBatch messageBatch = batch(rmqMsgs);
×
714
            SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
×
715
            long costTime = System.currentTimeMillis() - now;
×
716
            if (log.isDebugEnabled()) {
×
717
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
718
            }
719
            return sendResult;
×
720
        } catch (Exception e) {
×
721
            throw new MessagingException(e.getMessage(), e);
×
722
        }
723
    }
724

725
    /**
726
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
727
     * addition.
728
     *
729
     * @param destination formats: `topicName:tags`
730
     * @param message {@link org.springframework.messaging.Message}
731
     * @param sendCallback {@link SendCallback}
732
     * @param timeout send timeout with millis
733
     * @param delayLevel level for the delay message
734
     */
735
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
736
        int delayLevel) {
737
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
738
            log.error("asyncSend failed. destination:{}, message is null ", destination);
×
739
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
740
        }
741
        try {
742
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
743
            if (delayLevel > 0) {
1✔
744
                rocketMsg.setDelayTimeLevel(delayLevel);
×
745
            }
746
            producer.send(rocketMsg, sendCallback, timeout);
1✔
747
        } catch (Exception e) {
×
748
            log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
×
749
            throw new MessagingException(e.getMessage(), e);
×
750
        }
1✔
751
    }
1✔
752

753
    /**
754
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
755
     *
756
     * @param destination formats: `topicName:tags`
757
     * @param message {@link org.springframework.messaging.Message}
758
     * @param sendCallback {@link SendCallback}
759
     * @param timeout send timeout with millis
760
     */
761
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
762
        asyncSend(destination, message, sendCallback, timeout, 0);
1✔
763
    }
1✔
764

765
    /**
766
     * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
767
     * sensitive business scenarios. </p>
768
     * <p>
769
     * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
770
     * <p>
771
     * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
772
     * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
773
     * message duplication and application developers are the one to resolve this potential issue.
774
     *
775
     * @param destination formats: `topicName:tags`
776
     * @param message {@link org.springframework.messaging.Message}
777
     * @param sendCallback {@link SendCallback}
778
     */
779
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
780
        asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
×
781
    }
×
782

783
    /**
784
     * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
785
     *
786
     * @param destination formats: `topicName:tags`
787
     * @param payload the Object to use as payload
788
     * @param sendCallback {@link SendCallback}
789
     * @param timeout send timeout with millis
790
     */
791
    public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
792
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
793
        asyncSend(destination, message, sendCallback, timeout);
1✔
794
    }
1✔
795

796
    /**
797
     * Same to {@link #asyncSend(String, Message, SendCallback)}.
798
     *
799
     * @param destination formats: `topicName:tags`
800
     * @param payload the Object to use as payload
801
     * @param sendCallback {@link SendCallback}
802
     */
803
    public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
804
        asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
1✔
805
    }
1✔
806

807
    /**
808
     * asyncSend batch messages
809
     *
810
     * @param destination formats: `topicName:tags`
811
     * @param messages Collection of {@link org.springframework.messaging.Message}
812
     * @param sendCallback {@link SendCallback}
813
     */
814
    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback) {
815
        asyncSend(destination, messages, sendCallback, producer.getSendMsgTimeout());
1✔
816
    }
1✔
817

818
    /**
819
     * asyncSend batch messages in a given timeout.
820
     *
821
     * @param destination formats: `topicName:tags`
822
     * @param messages Collection of {@link org.springframework.messaging.Message}
823
     * @param sendCallback {@link SendCallback}
824
     * @param timeout send timeout with millis
825
     */
826
    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback, long timeout) {
827
        if (Objects.isNull(messages) || messages.size() == 0) {
1✔
828
            log.error("asyncSend with batch failed. destination:{}, messages is empty ", destination);
×
829
            throw new IllegalArgumentException("`messages` can not be empty");
×
830
        }
831

832
        try {
833
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
1✔
834
            for (Message msg : messages) {
1✔
835
                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
1✔
836
                    log.warn("Found a message empty in the batch, skip it");
×
837
                    continue;
×
838
                }
839
                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
1✔
840
            }
1✔
841
            producer.send(rmqMsgs, sendCallback, timeout);
1✔
842
        } catch (Exception e) {
×
843
            log.error("asyncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
×
844
            throw new MessagingException(e.getMessage(), e);
×
845
        }
1✔
846
    }
1✔
847

848
    /**
849
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
850
     * addition.
851
     *
852
     * @param destination formats: `topicName:tags`
853
     * @param message {@link org.springframework.messaging.Message}
854
     * @param hashKey use this key to select queue. for example: orderId, productId ...
855
     * @param sendCallback {@link SendCallback}
856
     * @param timeout send timeout with millis
857
     */
858
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
859
        long timeout) {
860
        asyncSendOrderly(destination, message, hashKey, sendCallback, timeout, 0);
×
861
    }
×
862

863
    /**
864
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
865
     * addition.
866
     *
867
     * @param destination formats: `topicName:tags`
868
     * @param message {@link org.springframework.messaging.Message}
869
     * @param hashKey use this key to select queue. for example: orderId, productId ...
870
     * @param sendCallback {@link SendCallback}
871
     * @param timeout send timeout with millis
872
     * @param delayLevel level for the delay message
873
     */
874
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
875
        long timeout, int delayLevel) {
876
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
877
            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
×
878
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
879
        }
880
        try {
881
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
882
            if (delayLevel > 0) {
×
883
                rocketMsg.setDelayTimeLevel(delayLevel);
×
884
            }
885
            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
×
886
        } catch (Exception e) {
×
887
            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
×
888
            throw new MessagingException(e.getMessage(), e);
×
889
        }
×
890
    }
×
891

892
    /**
893
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
894
     *
895
     * @param destination formats: `topicName:tags`
896
     * @param message {@link org.springframework.messaging.Message}
897
     * @param hashKey use this key to select queue. for example: orderId, productId ...
898
     * @param sendCallback {@link SendCallback}
899
     */
900
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
901
        asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
×
902
    }
×
903

904
    /**
905
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
906
     *
907
     * @param destination formats: `topicName:tags`
908
     * @param payload the Object to use as payload
909
     * @param hashKey use this key to select queue. for example: orderId, productId ...
910
     * @param sendCallback {@link SendCallback}
911
     */
912
    public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
913
        asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
×
914
    }
×
915

916
    /**
917
     * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
918
     *
919
     * @param destination formats: `topicName:tags`
920
     * @param payload the Object to use as payload
921
     * @param hashKey use this key to select queue. for example: orderId, productId ...
922
     * @param sendCallback {@link SendCallback}
923
     * @param timeout send timeout with millis
924
     */
925
    public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
926
        long timeout) {
927
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
928
        asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
×
929
    }
×
930

931
    /**
932
     * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
933
     * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
934
     * <p>
935
     * One-way transmission is used for cases requiring moderate reliability, such as log collection.
936
     *
937
     * @param destination formats: `topicName:tags`
938
     * @param message {@link org.springframework.messaging.Message}
939
     */
940
    public void sendOneWay(String destination, Message<?> message) {
941
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
942
            log.error("sendOneWay failed. destination:{}, message is null ", destination);
×
943
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
944
        }
945
        try {
946
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
947
            producer.sendOneway(rocketMsg);
×
948
        } catch (Exception e) {
×
949
            log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
×
950
            throw new MessagingException(e.getMessage(), e);
×
951
        }
×
952
    }
×
953

954
    /**
955
     * Same to {@link #sendOneWay(String, Message)}
956
     *
957
     * @param destination formats: `topicName:tags`
958
     * @param payload the Object to use as payload
959
     */
960
    public void sendOneWay(String destination, Object payload) {
961
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
962
        sendOneWay(destination, message);
×
963
    }
×
964

965
    /**
966
     * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
967
     *
968
     * @param destination formats: `topicName:tags`
969
     * @param message {@link org.springframework.messaging.Message}
970
     * @param hashKey use this key to select queue. for example: orderId, productId ...
971
     */
972
    public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
973
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
974
            log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
×
975
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
976
        }
977
        try {
978
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
979
            producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
×
980
        } catch (Exception e) {
×
981
            log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
×
982
            throw new MessagingException(e.getMessage(), e);
×
983
        }
×
984
    }
×
985

986
    /**
987
     * Same to {@link #sendOneWayOrderly(String, Message, String)}
988
     *
989
     * @param destination formats: `topicName:tags`
990
     * @param payload the Object to use as payload
991
     */
992
    public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
993
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
994
        sendOneWayOrderly(destination, message, hashKey);
×
995
    }
×
996

997
    @Override
998
    public void afterPropertiesSet() throws Exception {
999
        if (producer != null) {
1✔
1000
            producer.start();
1✔
1001
        }
1002
        if (Objects.nonNull(consumer)) {
1✔
1003
            try {
1004
                consumer.start();
×
1005
            } catch (Exception e) {
1✔
1006
                log.error("Failed to startup PullConsumer for RocketMQTemplate", e);
1✔
1007
            }
×
1008
        }
1009
    }
1✔
1010

1011
    @Override
1012
    protected void doSend(String destination, Message<?> message) {
1013
        SendResult sendResult = syncSend(destination, message);
×
1014
        if (log.isDebugEnabled()) {
×
1015
            log.debug("send message to `{}` finished. result:{}", destination, sendResult);
×
1016
        }
1017
    }
×
1018

1019
    @Override
1020
    protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
1021
        Message<?> message = super.doConvert(payload, headers, postProcessor);
1✔
1022
        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
1✔
1023
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
1✔
1024
        return builder.build();
1✔
1025
    }
1026

1027
    @Override
1028
    public void destroy() {
1029
        if (Objects.nonNull(producer)) {
1✔
1030
            producer.shutdown();
1✔
1031
        }
1032
        if (Objects.nonNull(consumer)) {
1✔
1033
            consumer.shutdown();
1✔
1034
        }
1035
    }
1✔
1036

1037
    /**
1038
     * Send Spring Message in Transaction
1039
     *
1040
     * @param destination destination formats: `topicName:tags`
1041
     * @param message message {@link org.springframework.messaging.Message}
1042
     * @param arg ext arg
1043
     * @return TransactionSendResult
1044
     * @throws MessagingException
1045
     */
1046
    public TransactionSendResult sendMessageInTransaction(final String destination,
1047
        final Message<?> message, final Object arg) throws MessagingException {
1048
        try {
1049
            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
1✔
1050
                throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
1✔
1051
            }
1052
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
1053
            return producer.sendMessageInTransaction(rocketMsg, arg);
×
1054
        } catch (MQClientException e) {
1✔
1055
            throw RocketMQUtil.convert(e);
1✔
1056
        }
1057
    }
1058

1059
    private org.apache.rocketmq.common.message.Message createRocketMqMessage(
1060
        String destination, Message<?> message) {
1061
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
1✔
1062
        return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
1✔
1063
            destination, msg);
1064
    }
1065

1066
    private Object doConvertMessage(MessageExt messageExt, Type type) {
1067
        if (Objects.equals(type, MessageExt.class)) {
×
1068
            return messageExt;
×
1069
        } else if (Objects.equals(type, byte[].class)) {
×
1070
            return messageExt.getBody();
×
1071
        } else {
1072
            String str = new String(messageExt.getBody(), Charset.forName(charset));
×
1073
            if (Objects.equals(type, String.class)) {
×
1074
                return str;
×
1075
            } else {
1076
                // If msgType not string, use objectMapper change it.
1077
                try {
1078
                    if (type instanceof Class) {
×
1079
                        //if the messageType has not Generic Parameter
1080
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
×
1081
                    } else {
1082
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
1083
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
1084
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
×
1085
                    }
1086
                } catch (Exception e) {
×
1087
                    log.error("convert failed. str:{}, msgType:{}", str, type);
×
1088
                    throw new RuntimeException("cannot convert message to " + type, e);
×
1089
                }
1090
            }
1091
        }
1092
    }
1093

1094
    private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
1095
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
×
1096
        Type matchedGenericInterface = null;
×
1097
        while (Objects.nonNull(targetClass)) {
×
1098
            Type[] interfaces = targetClass.getGenericInterfaces();
×
1099
            if (Objects.nonNull(interfaces)) {
×
1100
                for (Type type : interfaces) {
×
1101
                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
×
1102
                        matchedGenericInterface = type;
×
1103
                        break;
×
1104
                    }
1105
                }
1106
            }
1107
            targetClass = targetClass.getSuperclass();
×
1108
        }
×
1109
        if (Objects.isNull(matchedGenericInterface)) {
×
1110
            return Object.class;
×
1111
        }
1112

1113
        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
×
1114
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
×
1115
            return actualTypeArguments[0];
×
1116
        }
1117
        return Object.class;
×
1118
    }
1119

1120
    private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
1121
        MessageBatch msgBatch;
1122
        try {
1123
            msgBatch = MessageBatch.generateFromList(msgs);
×
1124
            for (org.apache.rocketmq.common.message.Message message : msgBatch) {
×
1125
                Validators.checkMessage(message, producer);
×
1126
                MessageClientIDSetter.setUniqID(message);
×
1127
                message.setTopic(producer.withNamespace(message.getTopic()));
×
1128
            }
×
1129
            msgBatch.setBody(msgBatch.encode());
×
1130
        } catch (Exception e) {
×
1131
            throw new MQClientException("Failed to initiate the MessageBatch", e);
×
1132
        }
×
1133
        msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
×
1134
        return msgBatch;
×
1135
    }
1136

1137
    /**
1138
     * receive message  in pull mode.
1139
     *
1140
     * @param clazz message object type
1141
     * @param <T>
1142
     * @return message list
1143
     */
1144
    public <T> List<T> receive(Class<T> clazz) {
1145
        return receive(clazz, this.consumer.getPollTimeoutMillis());
1✔
1146
    }
1147

1148
    /**
1149
     * Same to {@link #receive(Class<T>)} with receive timeout specified in addition.
1150
     *
1151
     * @param clazz   message object type
1152
     * @param timeout receive timeout with millis
1153
     * @param <T>
1154
     * @return message list
1155
     */
1156
    public <T> List<T> receive(Class<T> clazz, long timeout) {
1157
        List<MessageExt> messageExts = this.consumer.poll(timeout);
1✔
1158
        List<T> list = new ArrayList<>(messageExts.size());
1✔
1159
        for (MessageExt messageExt : messageExts) {
1✔
1160
            list.add(doConvertMessage(messageExt, clazz));
×
1161
        }
×
1162
        return list;
1✔
1163
    }
1164

1165
    @SuppressWarnings("unchecked")
1166
    private <T> T doConvertMessage(MessageExt messageExt, Class<T> messageType) {
1167
        if (Objects.equals(messageType, MessageExt.class)) {
×
1168
            return (T) messageExt;
×
1169
        } else {
1170
            String str = new String(messageExt.getBody(), Charset.forName(charset));
×
1171
            if (Objects.equals(messageType, String.class)) {
×
1172
                return (T) str;
×
1173
            } else {
1174
                // If msgType not string, use objectMapper change it.
1175
                try {
1176
                    return (T) this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
×
1177
                } catch (Exception e) {
×
1178
                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
×
1179
                    throw new RuntimeException("cannot convert message to " + messageType, e);
×
1180
                }
1181
            }
1182
        }
1183
    }
1184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc