• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq-spring / 486

pending completion
486

Pull #515

travis-ci-com

web-flow
Merge 19d522a3f into 694d4a1c4
Pull Request #515: [ISSUE #506] support send message with arbitrarily delay time

29 of 29 new or added lines in 2 files covered. (100.0%)

818 of 1301 relevant lines covered (62.87%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.61
/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.rocketmq.spring.core;
19

20
import org.apache.rocketmq.client.Validators;
21
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
22
import org.apache.rocketmq.client.exception.MQClientException;
23
import org.apache.rocketmq.client.producer.DefaultMQProducer;
24
import org.apache.rocketmq.client.producer.MessageQueueSelector;
25
import org.apache.rocketmq.client.producer.RequestCallback;
26
import org.apache.rocketmq.client.producer.SendCallback;
27
import org.apache.rocketmq.client.producer.SendResult;
28
import org.apache.rocketmq.client.producer.TransactionMQProducer;
29
import org.apache.rocketmq.client.producer.TransactionSendResult;
30
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
31
import org.apache.rocketmq.common.message.MessageBatch;
32
import org.apache.rocketmq.common.message.MessageClientIDSetter;
33
import org.apache.rocketmq.common.message.MessageExt;
34
import org.apache.rocketmq.spring.support.DelayMode;
35
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
36
import org.apache.rocketmq.spring.support.RocketMQUtil;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.springframework.aop.framework.AopProxyUtils;
40
import org.springframework.beans.factory.DisposableBean;
41
import org.springframework.beans.factory.InitializingBean;
42
import org.springframework.messaging.Message;
43
import org.springframework.messaging.MessageHeaders;
44
import org.springframework.messaging.MessagingException;
45
import org.springframework.messaging.converter.SmartMessageConverter;
46
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
47
import org.springframework.messaging.core.MessagePostProcessor;
48
import org.springframework.messaging.support.MessageBuilder;
49
import org.springframework.util.MimeTypeUtils;
50

51
import java.lang.reflect.ParameterizedType;
52
import java.lang.reflect.Type;
53
import java.nio.charset.Charset;
54
import java.util.ArrayList;
55
import java.util.Collection;
56
import java.util.List;
57
import java.util.Map;
58
import java.util.Objects;
59
import java.util.concurrent.ExecutorService;
60

61
@SuppressWarnings({"WeakerAccess", "unused"})
62
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
1✔
63
    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
1✔
64

65
    private DefaultMQProducer producer;
66

67
    private DefaultLitePullConsumer consumer;
68

69
    private String charset = "UTF-8";
1✔
70

71
    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
1✔
72

73
    private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter();
1✔
74

75
    public DefaultMQProducer getProducer() {
76
        return producer;
1✔
77
    }
78

79
    public void setProducer(DefaultMQProducer producer) {
80
        this.producer = producer;
1✔
81
    }
1✔
82

83
    public DefaultLitePullConsumer getConsumer() {
84
        return consumer;
1✔
85
    }
86

87
    public void setConsumer(DefaultLitePullConsumer consumer) {
88
        this.consumer = consumer;
1✔
89
    }
1✔
90

91
    public String getCharset() {
92
        return charset;
×
93
    }
94

95
    public void setCharset(String charset) {
96
        this.charset = charset;
×
97
    }
×
98

99
    public MessageQueueSelector getMessageQueueSelector() {
100
        return messageQueueSelector;
×
101
    }
102

103
    public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
104
        this.messageQueueSelector = messageQueueSelector;
×
105
    }
×
106

107
    public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
108
        this.producer.setAsyncSenderExecutor(asyncSenderExecutor);
1✔
109
    }
1✔
110

111
    /**
112
     * @param destination formats: `topicName:tags`
113
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
114
     * @param type The type of T
115
     * @return
116
     */
117
    public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
118
        return sendAndReceive(destination, message, type, null, producer.getSendMsgTimeout(), 0);
×
119
    }
120

121
    /**
122
     * @param destination formats: `topicName:tags`
123
     * @param payload the payload to be sent.
124
     * @param type The type of T
125
     * @return
126
     */
127
    public <T> T sendAndReceive(String destination, Object payload, Type type) {
128
        return sendAndReceive(destination, payload, type, null, producer.getSendMsgTimeout(), 0);
×
129
    }
130

131
    /**
132
     * @param destination formats: `topicName:tags`
133
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
134
     * @param type The type of T
135
     * @param timeout send timeout in millis
136
     * @return
137
     */
138
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
139
        return sendAndReceive(destination, message, type, null, timeout, 0);
×
140
    }
141

142
    /**
143
     * @param destination formats: `topicName:tags`
144
     * @param payload the payload to be sent.
145
     * @param type The type of T
146
     * @param timeout send timeout in millis
147
     * @return
148
     */
149
    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
150
        return sendAndReceive(destination, payload, type, null, timeout, 0);
×
151
    }
152

153
    /**
154
     * @param destination formats: `topicName:tags`
155
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
156
     * @param type The type of T
157
     * @param timeout send timeout in millis
158
     * @param delayLevel message delay level(0 means no delay)
159
     * @return
160
     */
161
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
162
        return sendAndReceive(destination, message, type, null, timeout, delayLevel);
×
163
    }
164

165
    /**
166
     * @param destination formats: `topicName:tags`
167
     * @param payload the payload to be sent.
168
     * @param type The type of T
169
     * @param timeout send timeout in millis
170
     * @param delayLevel message delay level(0 means no delay)
171
     * @return
172
     */
173
    public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
174
        return sendAndReceive(destination, payload, type, null, timeout, delayLevel);
×
175
    }
176

177
    /**
178
     * @param destination formats: `topicName:tags`
179
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
180
     * @param type The type of T
181
     * @param hashKey needed when sending message orderly
182
     * @return
183
     */
184
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
185
        return sendAndReceive(destination, message, type, hashKey, producer.getSendMsgTimeout(), 0);
×
186
    }
187

188
    /**
189
     * @param destination formats: `topicName:tags`
190
     * @param payload the payload to be sent.
191
     * @param type The type of T
192
     * @param hashKey needed when sending message orderly
193
     * @return
194
     */
195
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
196
        return sendAndReceive(destination, payload, type, hashKey, producer.getSendMsgTimeout(), 0);
×
197
    }
198

199
    /**
200
     * @param destination formats: `topicName:tags`
201
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
202
     * @param type The type of T
203
     * @param hashKey needed when sending message orderly
204
     * @param timeout send timeout in millis
205
     * @return
206
     */
207
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
208
        return sendAndReceive(destination, message, type, hashKey, timeout, 0);
×
209
    }
210

211
    /**
212
     * @param destination formats: `topicName:tags`
213
     * @param payload the payload to be sent.
214
     * @param type The type of T
215
     * @param hashKey
216
     * @return
217
     */
218
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey, long timeout) {
219
        return sendAndReceive(destination, payload, type, hashKey, timeout, 0);
×
220
    }
221

222
    /**
223
     * @param destination formats: `topicName:tags`
224
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
225
     * @param type The type that receive
226
     * @param hashKey needed when sending message orderly
227
     * @param timeout send timeout in millis
228
     * @param delayLevel message delay level(0 means no delay)
229
     * @return
230
     */
231
    public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
232
        long timeout, int delayLevel) {
233
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
234
            log.error("send request message failed. destination:{}, message is null ", destination);
1✔
235
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
1✔
236
        }
237

238
        try {
239
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
240
            if (delayLevel > 0) {
1✔
241
                rocketMsg.setDelayTimeLevel(delayLevel);
×
242
            }
243
            MessageExt replyMessage;
244

245
            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
1✔
246
                replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
×
247
            } else {
248
                replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
×
249
            }
250
            return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
×
251
        } catch (Exception e) {
1✔
252
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
1✔
253
            throw new MessagingException(e.getMessage(), e);
1✔
254
        }
255
    }
256

257
    /**
258
     * @param destination formats: `topicName:tags`
259
     * @param payload the payload to be sent.
260
     * @param type The type that receive
261
     * @param hashKey needed when sending message orderly
262
     * @param timeout send timeout in millis
263
     * @param delayLevel message delay level(0 means no delay)
264
     * @return
265
     */
266
    public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
267
        long timeout, int delayLevel) {
268
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
269
        return sendAndReceive(destination, message, type, hashKey, timeout, delayLevel);
×
270
    }
271

272
    /**
273
     * @param destination formats: `topicName:tags`
274
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
275
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
276
     * @return
277
     */
278
    public void sendAndReceive(String destination, Message<?> message,
279
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
280
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
×
281
    }
×
282

283
    /**
284
     * @param destination formats: `topicName:tags`
285
     * @param payload the payload to be sent.
286
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
287
     * @return
288
     */
289
    public void sendAndReceive(String destination, Object payload,
290
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
291
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, producer.getSendMsgTimeout(), 0);
×
292
    }
×
293

294
    /**
295
     * @param destination formats: `topicName:tags`
296
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
297
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
298
     * @param timeout send timeout in millis
299
     * @return
300
     */
301
    public void sendAndReceive(String destination, Message<?> message,
302
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
303
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, 0);
×
304
    }
×
305

306
    /**
307
     * @param destination formats: `topicName:tags`
308
     * @param payload the payload to be sent.
309
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
310
     * @param timeout send timeout in millis
311
     * @return
312
     */
313
    public void sendAndReceive(String destination, Object payload,
314
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout) {
315
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, 0);
×
316
    }
×
317

318
    /**
319
     * @param destination formats: `topicName:tags`
320
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
321
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
322
     * @param timeout send timeout in millis
323
     * @param delayLevel message delay level(0 means no delay)
324
     * @return
325
     */
326
    public void sendAndReceive(String destination, Message<?> message,
327
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
328
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, null, timeout, delayLevel);
×
329
    }
×
330

331
    /**
332
     * @param destination formats: `topicName:tags`
333
     * @param payload the payload to be sent.
334
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
335
     * @param hashKey needed when sending message orderly
336
     * @return
337
     */
338
    public void sendAndReceive(String destination, Object payload,
339
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
340
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
×
341
    }
×
342

343
    /**
344
     * @param destination formats: `topicName:tags`
345
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
346
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
347
     * @param hashKey needed when sending message orderly
348
     * @param timeout send timeout in millis
349
     * @return
350
     */
351
    public void sendAndReceive(String destination, Message<?> message,
352
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
353
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, 0);
×
354
    }
×
355

356
    /**
357
     * @param destination formats: `topicName:tags`
358
     * @param payload the payload to be sent.
359
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
360
     * @param hashKey needed when sending message orderly
361
     * @param timeout send timeout in millis
362
     * @return
363
     */
364
    public void sendAndReceive(String destination, Object payload,
365
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout) {
366
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, hashKey, timeout, 0);
×
367
    }
×
368

369
    /**
370
     * @param destination formats: `topicName:tags`
371
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
372
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
373
     * @param hashKey needed when sending message orderly
374
     * @return
375
     */
376
    public void sendAndReceive(String destination, Message<?> message,
377
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey) {
378
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, producer.getSendMsgTimeout(), 0);
×
379
    }
×
380

381
    /**
382
     * @param destination formats: `topicName:tags`
383
     * @param payload the payload to be sent.
384
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
385
     * @param timeout send timeout in millis
386
     * @param delayLevel message delay level(0 means no delay)
387
     * @return
388
     */
389
    public void sendAndReceive(String destination, Object payload,
390
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, long timeout, int delayLevel) {
391
        sendAndReceive(destination, payload, rocketMQLocalRequestCallback, null, timeout, delayLevel);
×
392
    }
×
393

394
    /**
395
     * @param destination formats: `topicName:tags`
396
     * @param payload the payload to be sent.
397
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
398
     * @param hashKey needed when sending message orderly
399
     * @param timeout send timeout in millis
400
     * @param delayLevel message delay level(0 means no delay)
401
     * @return
402
     */
403
    public void sendAndReceive(String destination, Object payload,
404
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
405
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
406
        sendAndReceive(destination, message, rocketMQLocalRequestCallback, hashKey, timeout, delayLevel);
×
407
    }
×
408

409
    /**
410
     * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
411
     * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
412
     *
413
     * @param destination formats: `topicName:tags`
414
     * @param message {@link org.springframework.messaging.Message} the message to be sent.
415
     * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
416
     * @param hashKey needed when sending message orderly
417
     * @param timeout send timeout in millis
418
     * @param delayLevel message delay level(0 means no delay)
419
     * @return
420
     */
421
    public void sendAndReceive(String destination, Message<?> message,
422
        RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long timeout, int delayLevel) {
423
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
424
            log.error("send request message failed. destination:{}, message is null ", destination);
×
425
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
426
        }
427

428
        try {
429
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
430
            if (delayLevel > 0) {
1✔
431
                rocketMsg.setDelayTimeLevel(delayLevel);
×
432
            }
433
            if (timeout <= 0) {
1✔
434
                timeout = producer.getSendMsgTimeout();
×
435
            }
436
            RequestCallback requestCallback = null;
1✔
437
            if (rocketMQLocalRequestCallback != null) {
1✔
438
                requestCallback = new RequestCallback() {
1✔
439
                    @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
440
                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
×
441
                    }
×
442

443
                    @Override public void onException(Throwable e) {
444
                        rocketMQLocalRequestCallback.onException(e);
×
445
                    }
×
446
                };
447
            }
448
            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
1✔
449
                producer.request(rocketMsg, requestCallback, timeout);
×
450
            } else {
451
                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
×
452
            }
453
        } catch (
1✔
454
            Exception e) {
455
            log.error("send request message failed. destination:{}, message:{} ", destination, message);
1✔
456
            throw new MessagingException(e.getMessage(), e);
1✔
457
        }
×
458

459
    }
×
460

461
    /**
462
     * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
463
     * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
464
     * notification, SMS marketing system, etc.. </p>
465
     * <p>
466
     * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
467
     * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
468
     * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
469
     * duplication issue.
470
     *
471
     * @param destination formats: `topicName:tags`
472
     * @param message {@link org.springframework.messaging.Message}
473
     * @return {@link SendResult}
474
     */
475
    public SendResult syncSend(String destination, Message<?> message) {
476
        return syncSend(destination, message, producer.getSendMsgTimeout());
×
477
    }
478

479
    /**
480
     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
481
     *
482
     * @param destination formats: `topicName:tags`
483
     * @param message {@link org.springframework.messaging.Message}
484
     * @param timeout send timeout with millis
485
     * @return {@link SendResult}
486
     */
487
    public SendResult syncSend(String destination, Message<?> message, long timeout) {
488
        return syncSend(destination, message, timeout, 0);
×
489
    }
490

491
    /**
492
     * syncSend batch messages
493
     *
494
     * @param destination formats: `topicName:tags`
495
     * @param messages Collection of {@link org.springframework.messaging.Message}
496
     * @return {@link SendResult}
497
     */
498
    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
499
        return syncSend(destination, messages, producer.getSendMsgTimeout());
×
500
    }
501

502
    /**
503
     * syncSend batch messages in a given timeout.
504
     *
505
     * @param destination formats: `topicName:tags`
506
     * @param messages Collection of {@link org.springframework.messaging.Message}
507
     * @param timeout send timeout with millis
508
     * @return {@link SendResult}
509
     */
510
    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
511
        if (Objects.isNull(messages) || messages.size() == 0) {
1✔
512
            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
1✔
513
            throw new IllegalArgumentException("`messages` can not be empty");
1✔
514
        }
515

516
        try {
517
            long now = System.currentTimeMillis();
×
518
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
×
519
            for (Message msg : messages) {
×
520
                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
×
521
                    log.warn("Found a message empty in the batch, skip it");
×
522
                    continue;
×
523
                }
524
                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
×
525
            }
×
526

527
            SendResult sendResult = producer.send(rmqMsgs, timeout);
×
528
            long costTime = System.currentTimeMillis() - now;
×
529
            if (log.isDebugEnabled()) {
×
530
                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
531
            }
532
            return sendResult;
×
533
        } catch (Exception e) {
×
534
            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
×
535
            throw new MessagingException(e.getMessage(), e);
×
536
        }
537
    }
538

539
    /**
540
     * Same to {@link #syncSend(String, Message)} with send delay time specified in addition.
541
     *
542
     * @param destination formats: `topicName:tags`
543
     * @param message {@link org.springframework.messaging.Message}
544
     * @param delayTime delay time in seconds for message
545
     * @return {@link SendResult}
546
     */
547
    public SendResult syncSendDelayTimeSeconds(String destination, Message<?> message, long delayTime) {
548
        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
×
549
    }
550

551
    /**
552
     * Same to {@link #syncSend(String, Object)} with send delayTime specified in addition.
553
     *
554
     * @param destination formats: `topicName:tags`
555
     * @param payload the Object to use as payload
556
     * @param delayTime delay time in seconds for message
557
     * @return {@link SendResult}
558
     */
559
    public SendResult syncSendDelayTimeSeconds(String destination, Object payload, long delayTime) {
560
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
561
        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
×
562
    }
563

564
    /**
565
     * Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition.
566
     *
567
     * @param destination formats: `topicName:tags`
568
     * @param message {@link org.springframework.messaging.Message}
569
     * @param timeout send timeout with millis
570
     * @param delayTime delay time for message
571
     * @return {@link SendResult}
572
     */
573
    public SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
574
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
575
            log.error("syncSend failed. destination:{}, message is null ", destination);
×
576
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
577
        }
578
        try {
579
            long now = System.currentTimeMillis();
1✔
580
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
581
            if (delayTime > 0 && Objects.nonNull(mode)) {
1✔
582
                switch (mode) {
1✔
583
                    case DELAY_SECONDS:
584
                        rocketMsg.setDelayTimeSec(delayTime);
1✔
585
                        break;
1✔
586
                    case DELAY_MILLISECONDS:
587
                        rocketMsg.setDelayTimeMs(delayTime);
×
588
                        break;
×
589
                    case DELIVER_TIME_MILLISECONDS:
590
                        rocketMsg.setDeliverTimeMs(delayTime);
×
591
                        break;
×
592
                    default:
593
                        log.warn("delay mode: {} not support", mode);
×
594
                }
595
            }
596
            SendResult sendResult = producer.send(rocketMsg, timeout);
×
597
            long costTime = System.currentTimeMillis() - now;
×
598
            if (log.isDebugEnabled()) {
×
599
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
600
            }
601
            return sendResult;
×
602
        } catch (Exception e) {
1✔
603
            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
1✔
604
            throw new MessagingException(e.getMessage(), e);
1✔
605
        }
606
    }
607

608

609

610
    /**
611
     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
612
     *
613
     * @param destination formats: `topicName:tags`
614
     * @param message {@link org.springframework.messaging.Message}
615
     * @param timeout send timeout with millis
616
     * @param delayLevel level for the delay message
617
     * @return {@link SendResult}
618
     */
619
    public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
620
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
621
            log.error("syncSend failed. destination:{}, message is null ", destination);
×
622
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
623
        }
624
        try {
625
            long now = System.currentTimeMillis();
1✔
626
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
627
            if (delayLevel > 0) {
1✔
628
                rocketMsg.setDelayTimeLevel(delayLevel);
×
629
            }
630
            SendResult sendResult = producer.send(rocketMsg, timeout);
×
631
            long costTime = System.currentTimeMillis() - now;
×
632
            if (log.isDebugEnabled()) {
×
633
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
634
            }
635
            return sendResult;
×
636
        } catch (Exception e) {
1✔
637
            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
1✔
638
            throw new MessagingException(e.getMessage(), e);
1✔
639
        }
640
    }
641

642
    /**
643
     * Same to {@link #syncSend(String, Message)}.
644
     *
645
     * @param destination formats: `topicName:tags`
646
     * @param payload the Object to use as payload
647
     * @return {@link SendResult}
648
     */
649
    public SendResult syncSend(String destination, Object payload) {
650
        return syncSend(destination, payload, producer.getSendMsgTimeout());
×
651
    }
652

653
    /**
654
     * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
655
     *
656
     * @param destination formats: `topicName:tags`
657
     * @param payload the Object to use as payload
658
     * @param timeout send timeout with millis
659
     * @return {@link SendResult}
660
     */
661
    public SendResult syncSend(String destination, Object payload, long timeout) {
662
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
663
        return syncSend(destination, message, timeout);
×
664
    }
665

666
    /**
667
     * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
668
     *
669
     * @param destination formats: `topicName:tags`
670
     * @param message {@link org.springframework.messaging.Message}
671
     * @param hashKey use this key to select queue. for example: orderId, productId ...
672
     * @return {@link SendResult}
673
     */
674
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
675
        return syncSendOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
×
676
    }
677

678
    /**
679
     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
680
     *
681
     * @param destination formats: `topicName:tags`
682
     * @param message {@link org.springframework.messaging.Message}
683
     * @param hashKey use this key to select queue. for example: orderId, productId ...
684
     * @param timeout send timeout with millis
685
     * @return {@link SendResult}
686
     */
687
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
688
        return syncSendOrderly(destination, message, hashKey, timeout, 0);
×
689
    }
690

691
    /**
692
     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
693
     *
694
     * @param destination formats: `topicName:tags`
695
     * @param message {@link org.springframework.messaging.Message}
696
     * @param hashKey use this key to select queue. for example: orderId, productId ...
697
     * @param timeout send timeout with millis
698
     * @param delayLevel level for the delay message
699
     * @return {@link SendResult}
700
     */
701
    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
702
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
703
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
×
704
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
705
        }
706
        try {
707
            long now = System.currentTimeMillis();
1✔
708
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
709
            if (delayLevel > 0) {
1✔
710
                rocketMsg.setDelayTimeLevel(delayLevel);
×
711
            }
712
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
×
713
            long costTime = System.currentTimeMillis() - now;
×
714
            if (log.isDebugEnabled()) {
×
715
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
716
            }
717
            return sendResult;
×
718
        } catch (Exception e) {
1✔
719
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
1✔
720
            throw new MessagingException(e.getMessage(), e);
1✔
721
        }
722
    }
723

724
    /**
725
     * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
726
     *
727
     * @param destination formats: `topicName:tags`
728
     * @param payload the Object to use as payload
729
     * @param hashKey use this key to select queue. for example: orderId, productId ...
730
     * @return {@link SendResult}
731
     */
732
    public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
733
        return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
×
734
    }
735

736
    /**
737
     * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
738
     *
739
     * @param destination formats: `topicName:tags`
740
     * @param payload the Object to use as payload
741
     * @param hashKey use this key to select queue. for example: orderId, productId ...
742
     * @param timeout send timeout with millis
743
     * @return {@link SendResult}
744
     */
745
    public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
746
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
747
        return syncSendOrderly(destination, message, hashKey, timeout);
×
748
    }
749

750
    /**
751
     * syncSend batch messages orderly.
752
     *
753
     * @param destination formats: `topicName:tags`
754
     * @param messages    Collection of {@link org.springframework.messaging.Message}
755
     * @param hashKey     use this key to select queue. for example: orderId, productId ...
756
     * @return {@link SendResult}
757
     */
758
    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey) {
759
        return syncSendOrderly(destination, messages, hashKey, producer.getSendMsgTimeout());
×
760
    }
761

762
    /**
763
     * Same to {@link #syncSendOrderly(String, Collection, String)} with send timeout specified in addition.
764
     *
765
     * @param destination formats: `topicName:tags`
766
     * @param messages    Collection of {@link org.springframework.messaging.Message}
767
     * @param hashKey     use this key to select queue. for example: orderId, productId ...
768
     * @param timeout     send timeout with millis
769
     * @return {@link SendResult}
770
     */
771
    public <T extends Message> SendResult syncSendOrderly(String destination, Collection<T> messages, String hashKey, long timeout) {
772
        if (Objects.isNull(messages) || messages.isEmpty()) {
×
773
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
×
774
            throw new IllegalArgumentException("`messages` can not be empty");
×
775
        }
776
        try {
777
            long now = System.currentTimeMillis();
×
778
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
×
779
            for (T message : messages) {
×
780
                if (Objects.isNull(message)) {
×
781
                    continue;
×
782
                }
783
                rmqMsgs.add(this.createRocketMqMessage(destination, message));
×
784
            }
×
785
            MessageBatch messageBatch = batch(rmqMsgs);
×
786
            SendResult sendResult = producer.send(messageBatch, this.messageQueueSelector, hashKey, timeout);
×
787
            long costTime = System.currentTimeMillis() - now;
×
788
            if (log.isDebugEnabled()) {
×
789
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
×
790
            }
791
            return sendResult;
×
792
        } catch (Exception e) {
×
793
            throw new MessagingException(e.getMessage(), e);
×
794
        }
795
    }
796

797
    /**
798
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
799
     * addition.
800
     *
801
     * @param destination formats: `topicName:tags`
802
     * @param message {@link org.springframework.messaging.Message}
803
     * @param sendCallback {@link SendCallback}
804
     * @param timeout send timeout with millis
805
     * @param delayLevel level for the delay message
806
     */
807
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
808
        int delayLevel) {
809
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
1✔
810
            log.error("asyncSend failed. destination:{}, message is null ", destination);
×
811
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
812
        }
813
        try {
814
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
815
            if (delayLevel > 0) {
1✔
816
                rocketMsg.setDelayTimeLevel(delayLevel);
×
817
            }
818
            producer.send(rocketMsg, sendCallback, timeout);
1✔
819
        } catch (Exception e) {
×
820
            log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
×
821
            throw new MessagingException(e.getMessage(), e);
×
822
        }
1✔
823
    }
1✔
824

825
    /**
826
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
827
     *
828
     * @param destination formats: `topicName:tags`
829
     * @param message {@link org.springframework.messaging.Message}
830
     * @param sendCallback {@link SendCallback}
831
     * @param timeout send timeout with millis
832
     */
833
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
834
        asyncSend(destination, message, sendCallback, timeout, 0);
1✔
835
    }
1✔
836

837
    /**
838
     * <p> Send message to broker asynchronously. asynchronous transmission is generally used in response time
839
     * sensitive business scenarios. </p>
840
     * <p>
841
     * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
842
     * <p>
843
     * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
844
     * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
845
     * message duplication and application developers are the one to resolve this potential issue.
846
     *
847
     * @param destination formats: `topicName:tags`
848
     * @param message {@link org.springframework.messaging.Message}
849
     * @param sendCallback {@link SendCallback}
850
     */
851
    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
852
        asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
×
853
    }
×
854

855
    /**
856
     * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
857
     *
858
     * @param destination formats: `topicName:tags`
859
     * @param payload the Object to use as payload
860
     * @param sendCallback {@link SendCallback}
861
     * @param timeout send timeout with millis
862
     */
863
    public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
864
        Message<?> message = MessageBuilder.withPayload(payload).build();
1✔
865
        asyncSend(destination, message, sendCallback, timeout);
1✔
866
    }
1✔
867

868
    /**
869
     * Same to {@link #asyncSend(String, Message, SendCallback)}.
870
     *
871
     * @param destination formats: `topicName:tags`
872
     * @param payload the Object to use as payload
873
     * @param sendCallback {@link SendCallback}
874
     */
875
    public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
876
        asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
1✔
877
    }
1✔
878

879
    /**
880
     * asyncSend batch messages
881
     *
882
     * @param destination formats: `topicName:tags`
883
     * @param messages Collection of {@link org.springframework.messaging.Message}
884
     * @param sendCallback {@link SendCallback}
885
     */
886
    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback) {
887
        asyncSend(destination, messages, sendCallback, producer.getSendMsgTimeout());
1✔
888
    }
1✔
889

890
    /**
891
     * asyncSend batch messages in a given timeout.
892
     *
893
     * @param destination formats: `topicName:tags`
894
     * @param messages Collection of {@link org.springframework.messaging.Message}
895
     * @param sendCallback {@link SendCallback}
896
     * @param timeout send timeout with millis
897
     */
898
    public <T extends Message> void asyncSend(String destination, Collection<T> messages, SendCallback sendCallback, long timeout) {
899
        if (Objects.isNull(messages) || messages.size() == 0) {
1✔
900
            log.error("asyncSend with batch failed. destination:{}, messages is empty ", destination);
×
901
            throw new IllegalArgumentException("`messages` can not be empty");
×
902
        }
903

904
        try {
905
            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
1✔
906
            for (Message msg : messages) {
1✔
907
                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
1✔
908
                    log.warn("Found a message empty in the batch, skip it");
×
909
                    continue;
×
910
                }
911
                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
1✔
912
            }
1✔
913
            producer.send(rmqMsgs, sendCallback, timeout);
1✔
914
        } catch (Exception e) {
×
915
            log.error("asyncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
×
916
            throw new MessagingException(e.getMessage(), e);
×
917
        }
1✔
918
    }
1✔
919

920
    /**
921
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
922
     * addition.
923
     *
924
     * @param destination formats: `topicName:tags`
925
     * @param message {@link org.springframework.messaging.Message}
926
     * @param hashKey use this key to select queue. for example: orderId, productId ...
927
     * @param sendCallback {@link SendCallback}
928
     * @param timeout send timeout with millis
929
     */
930
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
931
        long timeout) {
932
        asyncSendOrderly(destination, message, hashKey, sendCallback, timeout, 0);
×
933
    }
×
934

935
    /**
936
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
937
     * addition.
938
     *
939
     * @param destination formats: `topicName:tags`
940
     * @param message {@link org.springframework.messaging.Message}
941
     * @param hashKey use this key to select queue. for example: orderId, productId ...
942
     * @param sendCallback {@link SendCallback}
943
     * @param timeout send timeout with millis
944
     * @param delayLevel level for the delay message
945
     */
946
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
947
        long timeout, int delayLevel) {
948
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
949
            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
×
950
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
951
        }
952
        try {
953
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
954
            if (delayLevel > 0) {
×
955
                rocketMsg.setDelayTimeLevel(delayLevel);
×
956
            }
957
            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
×
958
        } catch (Exception e) {
×
959
            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
×
960
            throw new MessagingException(e.getMessage(), e);
×
961
        }
×
962
    }
×
963

964
    /**
965
     * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
966
     *
967
     * @param destination formats: `topicName:tags`
968
     * @param message {@link org.springframework.messaging.Message}
969
     * @param hashKey use this key to select queue. for example: orderId, productId ...
970
     * @param sendCallback {@link SendCallback}
971
     */
972
    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
973
        asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
×
974
    }
×
975

976
    /**
977
     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
978
     *
979
     * @param destination formats: `topicName:tags`
980
     * @param payload the Object to use as payload
981
     * @param hashKey use this key to select queue. for example: orderId, productId ...
982
     * @param sendCallback {@link SendCallback}
983
     */
984
    public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
985
        asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
×
986
    }
×
987

988
    /**
989
     * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
990
     *
991
     * @param destination formats: `topicName:tags`
992
     * @param payload the Object to use as payload
993
     * @param hashKey use this key to select queue. for example: orderId, productId ...
994
     * @param sendCallback {@link SendCallback}
995
     * @param timeout send timeout with millis
996
     */
997
    public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
998
        long timeout) {
999
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
1000
        asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
×
1001
    }
×
1002

1003
    /**
1004
     * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for
1005
     * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
1006
     * <p>
1007
     * One-way transmission is used for cases requiring moderate reliability, such as log collection.
1008
     *
1009
     * @param destination formats: `topicName:tags`
1010
     * @param message {@link org.springframework.messaging.Message}
1011
     */
1012
    public void sendOneWay(String destination, Message<?> message) {
1013
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
1014
            log.error("sendOneWay failed. destination:{}, message is null ", destination);
×
1015
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
1016
        }
1017
        try {
1018
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
1019
            producer.sendOneway(rocketMsg);
×
1020
        } catch (Exception e) {
×
1021
            log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
×
1022
            throw new MessagingException(e.getMessage(), e);
×
1023
        }
×
1024
    }
×
1025

1026
    /**
1027
     * Same to {@link #sendOneWay(String, Message)}
1028
     *
1029
     * @param destination formats: `topicName:tags`
1030
     * @param payload the Object to use as payload
1031
     */
1032
    public void sendOneWay(String destination, Object payload) {
1033
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
1034
        sendOneWay(destination, message);
×
1035
    }
×
1036

1037
    /**
1038
     * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
1039
     *
1040
     * @param destination formats: `topicName:tags`
1041
     * @param message {@link org.springframework.messaging.Message}
1042
     * @param hashKey use this key to select queue. for example: orderId, productId ...
1043
     */
1044
    public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
1045
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
×
1046
            log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
×
1047
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
×
1048
        }
1049
        try {
1050
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
×
1051
            producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
×
1052
        } catch (Exception e) {
×
1053
            log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
×
1054
            throw new MessagingException(e.getMessage(), e);
×
1055
        }
×
1056
    }
×
1057

1058
    /**
1059
     * Same to {@link #sendOneWayOrderly(String, Message, String)}
1060
     *
1061
     * @param destination formats: `topicName:tags`
1062
     * @param payload the Object to use as payload
1063
     */
1064
    public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
1065
        Message<?> message = MessageBuilder.withPayload(payload).build();
×
1066
        sendOneWayOrderly(destination, message, hashKey);
×
1067
    }
×
1068

1069
    @Override
1070
    public void afterPropertiesSet() throws Exception {
1071
        if (producer != null) {
1✔
1072
            producer.start();
1✔
1073
        }
1074
        if (Objects.nonNull(consumer)) {
1✔
1075
            try {
1076
                consumer.start();
×
1077
            } catch (Exception e) {
1✔
1078
                log.error("Failed to startup PullConsumer for RocketMQTemplate", e);
1✔
1079
            }
×
1080
        }
1081
    }
1✔
1082

1083
    @Override
1084
    protected void doSend(String destination, Message<?> message) {
1085
        SendResult sendResult = syncSend(destination, message);
×
1086
        if (log.isDebugEnabled()) {
×
1087
            log.debug("send message to `{}` finished. result:{}", destination, sendResult);
×
1088
        }
1089
    }
×
1090

1091
    @Override
1092
    protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
1093
        Message<?> message = super.doConvert(payload, headers, postProcessor);
1✔
1094
        MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
1✔
1095
        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
1✔
1096
        return builder.build();
1✔
1097
    }
1098

1099
    @Override
1100
    public void destroy() {
1101
        if (Objects.nonNull(producer)) {
1✔
1102
            producer.shutdown();
1✔
1103
        }
1104
        if (Objects.nonNull(consumer)) {
1✔
1105
            consumer.shutdown();
1✔
1106
        }
1107
    }
1✔
1108

1109
    /**
1110
     * Send Spring Message in Transaction
1111
     *
1112
     * @param destination destination formats: `topicName:tags`
1113
     * @param message message {@link org.springframework.messaging.Message}
1114
     * @param arg ext arg
1115
     * @return TransactionSendResult
1116
     * @throws MessagingException
1117
     */
1118
    public TransactionSendResult sendMessageInTransaction(final String destination,
1119
        final Message<?> message, final Object arg) throws MessagingException {
1120
        try {
1121
            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
1✔
1122
                throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
1✔
1123
            }
1124
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
1✔
1125
            return producer.sendMessageInTransaction(rocketMsg, arg);
×
1126
        } catch (MQClientException e) {
1✔
1127
            throw RocketMQUtil.convert(e);
1✔
1128
        }
1129
    }
1130

1131
    private org.apache.rocketmq.common.message.Message createRocketMqMessage(
1132
        String destination, Message<?> message) {
1133
        Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);
1✔
1134
        return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset,
1✔
1135
            destination, msg);
1136
    }
1137

1138
    private Object doConvertMessage(MessageExt messageExt, Type type) {
1139
        if (Objects.equals(type, MessageExt.class)) {
×
1140
            return messageExt;
×
1141
        } else if (Objects.equals(type, byte[].class)) {
×
1142
            return messageExt.getBody();
×
1143
        } else {
1144
            String str = new String(messageExt.getBody(), Charset.forName(charset));
×
1145
            if (Objects.equals(type, String.class)) {
×
1146
                return str;
×
1147
            } else {
1148
                // If msgType not string, use objectMapper change it.
1149
                try {
1150
                    if (type instanceof Class) {
×
1151
                        //if the messageType has not Generic Parameter
1152
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
×
1153
                    } else {
1154
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
1155
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
1156
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
×
1157
                    }
1158
                } catch (Exception e) {
×
1159
                    log.error("convert failed. str:{}, msgType:{}", str, type);
×
1160
                    throw new RuntimeException("cannot convert message to " + type, e);
×
1161
                }
1162
            }
1163
        }
1164
    }
1165

1166
    private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {
1167
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);
×
1168
        Type matchedGenericInterface = null;
×
1169
        while (Objects.nonNull(targetClass)) {
×
1170
            Type[] interfaces = targetClass.getGenericInterfaces();
×
1171
            if (Objects.nonNull(interfaces)) {
×
1172
                for (Type type : interfaces) {
×
1173
                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
×
1174
                        matchedGenericInterface = type;
×
1175
                        break;
×
1176
                    }
1177
                }
1178
            }
1179
            targetClass = targetClass.getSuperclass();
×
1180
        }
×
1181
        if (Objects.isNull(matchedGenericInterface)) {
×
1182
            return Object.class;
×
1183
        }
1184

1185
        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
×
1186
        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
×
1187
            return actualTypeArguments[0];
×
1188
        }
1189
        return Object.class;
×
1190
    }
1191

1192
    private MessageBatch batch(Collection<org.apache.rocketmq.common.message.Message> msgs) throws MQClientException {
1193
        MessageBatch msgBatch;
1194
        try {
1195
            msgBatch = MessageBatch.generateFromList(msgs);
×
1196
            for (org.apache.rocketmq.common.message.Message message : msgBatch) {
×
1197
                Validators.checkMessage(message, producer);
×
1198
                MessageClientIDSetter.setUniqID(message);
×
1199
                message.setTopic(producer.withNamespace(message.getTopic()));
×
1200
            }
×
1201
            msgBatch.setBody(msgBatch.encode());
×
1202
        } catch (Exception e) {
×
1203
            throw new MQClientException("Failed to initiate the MessageBatch", e);
×
1204
        }
×
1205
        msgBatch.setTopic(producer.withNamespace(msgBatch.getTopic()));
×
1206
        return msgBatch;
×
1207
    }
1208

1209
    /**
1210
     * receive message  in pull mode.
1211
     *
1212
     * @param clazz message object type
1213
     * @param <T>
1214
     * @return message list
1215
     */
1216
    public <T> List<T> receive(Class<T> clazz) {
1217
        return receive(clazz, this.consumer.getPollTimeoutMillis());
1✔
1218
    }
1219

1220
    /**
1221
     * Same to {@link #receive(Class<T>)} with receive timeout specified in addition.
1222
     *
1223
     * @param clazz   message object type
1224
     * @param timeout receive timeout with millis
1225
     * @param <T>
1226
     * @return message list
1227
     */
1228
    public <T> List<T> receive(Class<T> clazz, long timeout) {
1229
        List<MessageExt> messageExts = this.consumer.poll(timeout);
1✔
1230
        List<T> list = new ArrayList<>(messageExts.size());
1✔
1231
        for (MessageExt messageExt : messageExts) {
1✔
1232
            list.add(doConvertMessage(messageExt, clazz));
×
1233
        }
×
1234
        return list;
1✔
1235
    }
1236

1237
    @SuppressWarnings("unchecked")
1238
    private <T> T doConvertMessage(MessageExt messageExt, Class<T> messageType) {
1239
        if (Objects.equals(messageType, MessageExt.class)) {
×
1240
            return (T) messageExt;
×
1241
        } else {
1242
            String str = new String(messageExt.getBody(), Charset.forName(charset));
×
1243
            if (Objects.equals(messageType, String.class)) {
×
1244
                return (T) str;
×
1245
            } else {
1246
                // If msgType not string, use objectMapper change it.
1247
                try {
1248
                    return (T) this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), messageType);
×
1249
                } catch (Exception e) {
×
1250
                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
×
1251
                    throw new RuntimeException("cannot convert message to " + messageType, e);
×
1252
                }
1253
            }
1254
        }
1255
    }
1256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc