• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq-dashboard / 208

pending completion
208

Pull #138

travis-ci-com

web-flow
Merge 66d7fa0e1 into 86bdb0636
Pull Request #138: [ISSUE #137]Fix the bug of topic create or update when order is true

3 of 3 new or added lines in 1 file covered. (100.0%)

2375 of 2872 relevant lines covered (82.69%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.85
/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package org.apache.rocketmq.dashboard.service.impl;
19

20
import com.google.common.base.Throwables;
21
import com.google.common.collect.Lists;
22
import com.google.common.collect.Sets;
23
import java.util.stream.Collectors;
24
import org.apache.commons.lang3.StringUtils;
25
import org.apache.rocketmq.acl.common.AclClientRPCHook;
26
import org.apache.rocketmq.acl.common.SessionCredentials;
27
import org.apache.rocketmq.client.producer.DefaultMQProducer;
28
import org.apache.rocketmq.client.producer.SendResult;
29
import org.apache.rocketmq.client.trace.TraceContext;
30
import org.apache.rocketmq.client.trace.TraceDispatcher;
31
import org.apache.rocketmq.common.MixAll;
32
import org.apache.rocketmq.common.TopicConfig;
33
import org.apache.rocketmq.common.admin.TopicStatsTable;
34
import org.apache.rocketmq.common.message.Message;
35
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
36
import org.apache.rocketmq.common.protocol.body.GroupList;
37
import org.apache.rocketmq.common.protocol.body.TopicList;
38
import org.apache.rocketmq.common.protocol.route.BrokerData;
39
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
40
import org.apache.rocketmq.common.topic.TopicValidator;
41
import org.apache.rocketmq.dashboard.config.RMQConfigure;
42
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
43
import org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
44
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
45
import org.apache.rocketmq.dashboard.service.TopicService;
46
import org.apache.rocketmq.remoting.RPCHook;
47
import org.apache.rocketmq.tools.command.CommandUtil;
48
import org.joor.Reflect;
49
import org.springframework.beans.BeanUtils;
50
import org.springframework.beans.factory.annotation.Autowired;
51
import org.springframework.stereotype.Service;
52

53
import java.util.Arrays;
54
import java.util.HashSet;
55
import java.util.List;
56
import java.util.Set;
57
import java.util.concurrent.ArrayBlockingQueue;
58

59
@Service
60
public class TopicServiceImpl extends AbstractCommonService implements TopicService {
2✔
61

62
    @Autowired
63
    private RMQConfigure configure;
64

65
    @Override
66
    public TopicList fetchAllTopicList(boolean skipSysProcess, boolean skipRetryAndDlq) {
67
        try {
68
            TopicList allTopics = mqAdminExt.fetchAllTopicList();
2✔
69
            TopicList sysTopics = getSystemTopicList();
2✔
70
            Set<String> topics =
2✔
71
                allTopics.getTopicList().stream().map(topic -> {
2✔
72
                    if (!skipSysProcess && sysTopics.getTopicList().contains(topic)) {
2✔
73
                        topic = String.format("%s%s", "%SYS%", topic);
2✔
74
                    }
75
                    return topic;
2✔
76
                }).filter(topic -> {
2✔
77
                    if (skipRetryAndDlq) {
2✔
78
                        return !(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
2✔
79
                            || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
2✔
80
                    }
81
                    return true;
2✔
82
                }).collect(Collectors.toSet());
2✔
83
            allTopics.getTopicList().clear();
2✔
84
            allTopics.getTopicList().addAll(topics);
2✔
85
            return allTopics;
2✔
86
        } catch (Exception e) {
×
87
            throw Throwables.propagate(e);
×
88
        }
89
    }
90

91
    @Override
92
    public TopicStatsTable stats(String topic) {
93
        try {
94
            return mqAdminExt.examineTopicStats(topic);
2✔
95
        } catch (Exception e) {
×
96
            throw Throwables.propagate(e);
×
97
        }
98
    }
99

100
    @Override
101
    public TopicRouteData route(String topic) {
102
        try {
103
            return mqAdminExt.examineTopicRouteInfo(topic);
2✔
104
        } catch (Exception ex) {
×
105
            throw Throwables.propagate(ex);
×
106
        }
107
    }
108

109
    @Override
110
    public GroupList queryTopicConsumerInfo(String topic) {
111
        try {
112
            return mqAdminExt.queryTopicConsumeByWho(topic);
2✔
113
        } catch (Exception e) {
×
114
            throw Throwables.propagate(e);
×
115
        }
116
    }
117

118
    @Override
119
    public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
120
        TopicConfig topicConfig = new TopicConfig();
2✔
121
        BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
2✔
122
        try {
123
            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
2✔
124
            for (String brokerName : changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
2✔
125
                topicCreateOrUpdateRequest.getClusterNameList(), topicCreateOrUpdateRequest.getBrokerNameList())) {
2✔
126
                mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topicConfig);
2✔
127
                if (topicCreateOrUpdateRequest.isOrder()) {
2✔
128
                    String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
×
129
                    mqAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
×
130
                }
131
            }
2✔
132
        } catch (Exception err) {
×
133
            throw Throwables.propagate(err);
×
134
        }
2✔
135
    }
2✔
136

137
    @Override
138
    public TopicConfig examineTopicConfig(String topic, String brokerName) {
139
        ClusterInfo clusterInfo = null;
2✔
140
        try {
141
            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
2✔
142
            return mqAdminExt.examineTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), topic);
2✔
143
        } catch (Exception e) {
×
144
            throw Throwables.propagate(e);
×
145
        }
146
    }
147

148
    @Override
149
    public List<TopicConfigInfo> examineTopicConfig(String topic) {
150
        List<TopicConfigInfo> topicConfigInfoList = Lists.newArrayList();
2✔
151
        TopicRouteData topicRouteData = route(topic);
2✔
152
        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
2✔
153
            TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
2✔
154
            TopicConfig topicConfig = examineTopicConfig(topic, brokerData.getBrokerName());
2✔
155
            BeanUtils.copyProperties(topicConfig, topicConfigInfo);
2✔
156
            topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
2✔
157
            topicConfigInfoList.add(topicConfigInfo);
2✔
158
        }
2✔
159
        return topicConfigInfoList;
2✔
160
    }
161

162
    @Override
163
    public boolean deleteTopic(String topic, String clusterName) {
164
        try {
165
            if (StringUtils.isBlank(clusterName)) {
2✔
166
                return deleteTopic(topic);
2✔
167
            }
168
            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, clusterName);
2✔
169
            mqAdminExt.deleteTopicInBroker(masterSet, topic);
2✔
170
            Set<String> nameServerSet = null;
2✔
171
            if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
2✔
172
                String[] ns = configure.getNamesrvAddr().split(";");
2✔
173
                nameServerSet = new HashSet<String>(Arrays.asList(ns));
2✔
174
            }
175
            mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
2✔
176
        } catch (Exception err) {
×
177
            throw Throwables.propagate(err);
×
178
        }
2✔
179
        return true;
2✔
180
    }
181

182
    @Override
183
    public boolean deleteTopic(String topic) {
184
        ClusterInfo clusterInfo = null;
2✔
185
        try {
186
            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
2✔
187
        } catch (Exception err) {
×
188
            throw Throwables.propagate(err);
×
189
        }
2✔
190
        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
2✔
191
            deleteTopic(topic, clusterName);
2✔
192
        }
2✔
193
        return true;
2✔
194
    }
195

196
    @Override
197
    public boolean deleteTopicInBroker(String brokerName, String topic) {
198

199
        try {
200
            ClusterInfo clusterInfo = null;
2✔
201
            try {
202
                clusterInfo = mqAdminExt.examineBrokerClusterInfo();
2✔
203
            } catch (Exception e) {
×
204
                throw Throwables.propagate(e);
×
205
            }
2✔
206
            mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
2✔
207
        } catch (Exception e) {
×
208
            throw Throwables.propagate(e);
×
209
        }
2✔
210
        return true;
2✔
211
    }
212

213
    public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook) {
214
        return buildDefaultMQProducer(producerGroup, rpcHook, false);
2✔
215
    }
216

217
    public DefaultMQProducer buildDefaultMQProducer(String producerGroup, RPCHook rpcHook, boolean traceEnabled) {
218
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup, rpcHook, traceEnabled, TopicValidator.RMQ_SYS_TRACE_TOPIC);
×
219
        defaultMQProducer.setUseTLS(configure.isUseTLS());
×
220
        return defaultMQProducer;
×
221
    }
222

223
    private TopicList getSystemTopicList() {
224
        RPCHook rpcHook = null;
2✔
225
        boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) && !StringUtils.isEmpty(configure.getSecretKey());
2✔
226
        if (isEnableAcl) {
2✔
227
            rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
2✔
228
        }
229
        DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
2✔
230
        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
2✔
231
        producer.setNamesrvAddr(configure.getNamesrvAddr());
2✔
232

233
        try {
234
            producer.start();
2✔
235
            return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
2✔
236
        } catch (Exception e) {
×
237
            throw Throwables.propagate(e);
×
238
        } finally {
239
            producer.shutdown();
2✔
240
        }
241
    }
242

243
    @Override
244
    public SendResult sendTopicMessageRequest(SendTopicMessageRequest sendTopicMessageRequest) {
245
        DefaultMQProducer producer = null;
2✔
246
        AclClientRPCHook rpcHook = null;
2✔
247
        if (configure.isACLEnabled()) {
2✔
248
            rpcHook = new AclClientRPCHook(new SessionCredentials(
2✔
249
                configure.getAccessKey(),
2✔
250
                configure.getSecretKey()
2✔
251
            ));
252
        }
253
        producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, sendTopicMessageRequest.isTraceEnabled());
2✔
254
        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
2✔
255
        producer.setNamesrvAddr(configure.getNamesrvAddr());
2✔
256
        try {
257
            producer.start();
2✔
258
            Message msg = new Message(sendTopicMessageRequest.getTopic(),
2✔
259
                sendTopicMessageRequest.getTag(),
2✔
260
                sendTopicMessageRequest.getKey(),
2✔
261
                sendTopicMessageRequest.getMessageBody().getBytes()
2✔
262
            );
263
            return producer.send(msg);
2✔
264
        } catch (Exception e) {
×
265
            throw Throwables.propagate(e);
×
266
        } finally {
267
            waitSendTraceFinish(producer, sendTopicMessageRequest.isTraceEnabled());
2✔
268
            producer.shutdown();
2✔
269
        }
270
    }
271

272
    private void waitSendTraceFinish(DefaultMQProducer producer, boolean traceEnabled) {
273
        if (!traceEnabled) {
2✔
274
            return;
2✔
275
        }
276
        try {
277
            TraceDispatcher traceDispatcher = Reflect.on(producer).field("traceDispatcher").get();
×
278
            if (traceDispatcher != null) {
×
279
                ArrayBlockingQueue<TraceContext> traceContextQueue = Reflect.on(traceDispatcher).field("traceContextQueue").get();
×
280
                while (traceContextQueue.size() > 0) {
×
281
                    Thread.sleep(1);
×
282
                }
283
            }
284
            // wait another 150ms until async request send finish
285
            // after new RocketMQ version released, this logic can be removed
286
            // https://github.com/apache/rocketmq/pull/2989
287
            Thread.sleep(150);
×
288
        } catch (Exception ignore) {
×
289
        }
×
290
    }
×
291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc