• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5057

17 May 2026 01:15AM UTC coverage: 73.406% (+0.02%) from 73.384%
#5057

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281727 of 383795 relevant lines covered (73.41%)

136101761.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "tcompare.h"
27
#include "tname.h"
28

29
#define MND_CONSUMER_VER_NUMBER   4
30
#define MND_CONSUMER_RESERVE_SIZE 64
31

32
#define MND_MAX_GROUP_PER_TOPIC 100
33

34
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
36
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
37
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
39

40
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
41
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
42
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
43
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
44

45
int32_t mndInitConsumer(SMnode *pMnode) {
530,637✔
46
  SSdbTable table = {
530,637✔
47
      .sdbType = SDB_CONSUMER,
48
      .keyType = SDB_KEY_INT64,
49
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
51
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
52
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
54
  };
55

56
  if (pMnode == NULL){
530,637✔
57
    return TSDB_CODE_INVALID_PARA;
×
58
  }
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
530,637✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
530,637✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
530,637✔
62
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
530,637✔
63

64
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
530,637✔
65
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
530,637✔
66

67
  return sdbSetTable(pMnode->pSdb, table);
530,637✔
68
}
69

70
void mndCleanupConsumer(SMnode *pMnode) {}
530,574✔
71

72
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
132,932✔
73
  if (pMnode == NULL || info == NULL) {
132,932✔
74
    return TSDB_CODE_INVALID_PARA;
×
75
  }
76
  int32_t code = 0;
132,932✔
77
  int32_t lino = 0;
132,932✔
78
  PRINT_LOG_START
132,932✔
79
  void   *msg  = rpcMallocCont(sizeof(int64_t));
132,932✔
80
  MND_TMQ_NULL_CHECK(msg);
132,932✔
81

82
  *(int64_t*)msg = consumerId;
132,932✔
83
  SRpcMsg rpcMsg = {
132,932✔
84
      .msgType = msgType,
85
      .pCont = msg,
86
      .contLen = sizeof(int64_t),
87
      .info = *info,
88
  };
89

90
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
132,932✔
91
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
132,932✔
92

93
END:
132,932✔
94
  PRINT_LOG_END
132,932✔
95
  return code;
132,932✔
96
}
97

98
static int32_t validateOneTopic(STrans* pTrans,char *pOneTopic, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
198,272✔
99
  int32_t      code = 0;
198,272✔
100
  int32_t lino = 0;
198,272✔
101
  SMqTopicObj *pTopic = NULL;
198,272✔
102

103
  PRINT_LOG_START
198,272✔
104
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
198,272✔
105
  taosRLockLatch(&pTopic->lock);
197,500✔
106

107
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic));
197,500✔
108
  MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
196,706✔
109

110
  if (subscribe->enableReplay) {
196,706✔
111
    if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
2,366✔
112
      code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
676✔
113
      goto END;
676✔
114
    } 
115
    if (pTopic->stbName[0] != 0) {
1,690✔
116
      SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
1,352✔
117
      if (pDb == NULL) {
1,352✔
118
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
119
        goto END;
×
120
      }
121
      if (pDb->cfg.numOfVgroups != 1) {
1,352✔
122
        mndReleaseDb(pMnode, pDb);
338✔
123
        code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
338✔
124
        goto END;
338✔
125
      }
126
      mndReleaseDb(pMnode, pDb);
1,014✔
127
    }
128
  }
129
  char  key[TSDB_CONSUMER_ID_LEN] = {0};
195,692✔
130
  (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
195,692✔
131
  mndTransSetDbName(pTrans, pTopic->db, key);
195,692✔
132
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
195,692✔
133

134
END:
198,272✔
135
  PRINT_LOG_END
198,272✔
136
  if (pTopic != NULL) {
198,272✔
137
    taosRUnLockLatch(&pTopic->lock);
197,500✔
138
  }
139
  mndReleaseTopic(pMnode, pTopic);
198,272✔
140
  return code;
198,272✔
141
}
142

143
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
333,596✔
144
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || user == NULL) {
333,596✔
145
    return TSDB_CODE_INVALID_PARA;
×
146
  }
147
  int32_t      code = 0;
333,596✔
148
  int32_t lino = 0;
333,596✔
149

150
  PRINT_LOG_START
333,596✔
151
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
333,596✔
152
  for (int32_t i = 0; i < numOfTopics; i++) {
529,288✔
153
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
198,272✔
154
    MND_TMQ_RETURN_CHECK(validateOneTopic(pTrans, pOneTopic, subscribe, pMnode, user, token));
198,272✔
155
  }
156

157
END:
331,016✔
158
  PRINT_LOG_END
333,596✔
159
  return code;
333,596✔
160
}
161

162
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
132,932✔
163
  if (pMsg == NULL || pMsg->pCont == NULL) {
132,932✔
164
    return TSDB_CODE_INVALID_PARA;
×
165
  }
166
  int32_t              code = 0;
132,932✔
167
  int32_t              lino = 0;
132,932✔
168
  SMnode              *pMnode = pMsg->info.node;
132,932✔
169
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
132,932✔
170
  SMqConsumerObj      *pConsumerNew = NULL;
132,932✔
171
  STrans              *pTrans = NULL;
132,932✔
172
  SMqConsumerObj      *pConsumer = NULL;
132,932✔
173

174
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
132,932✔
175
  taosRLockLatch(&pConsumer->lock);
132,932✔
176
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
132,932✔
177
        mndConsumerStatusName(pConsumer->status));
178

179
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
132,932✔
180

181
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
132,932✔
182
  MND_TMQ_NULL_CHECK(pTrans);
132,932✔
183
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
132,932✔
184
  code = mndTransPrepare(pMnode, pTrans);
132,932✔
185

186
END:
132,932✔
187
  PRINT_LOG_END
132,932✔
188
  if (pConsumer != NULL) {
132,932✔
189
    taosRUnLockLatch(&pConsumer->lock);
132,932✔
190
  }
191
  mndReleaseConsumer(pMnode, pConsumer);
132,932✔
192
  tDeleteSMqConsumerObj(pConsumerNew);
132,932✔
193
  mndTransDrop(pTrans);
132,932✔
194
  return code;
132,932✔
195
}
196

197
static void checkOnePrivilege(const char* topic, SMnode *pMnode, SMqHbRsp *rsp, const char *user, const char* token) {
1,480,648✔
198
  int32_t code = 0;
1,480,648✔
199
  int32_t lino = 0;
1,480,648✔
200
  SMqTopicObj *pTopic = NULL;
1,480,648✔
201
  PRINT_LOG_START
1,480,648✔
202
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
1,480,648✔
203
  taosRLockLatch(&pTopic->lock);
1,480,648✔
204
  STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
1,480,648✔
205
  MND_TMQ_NULL_CHECK(data);
1,480,648✔
206
  tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
1,480,648✔
207
  if (mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
2,960,512✔
208
      grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
1,479,864✔
209
    data->noPrivilege = 1;
784✔
210
  } else {
211
    data->noPrivilege = 0;
1,479,864✔
212
  }
213

214
END:
1,480,648✔
215
  PRINT_LOG_END
1,480,648✔
216
  if (pTopic != NULL) {
1,480,648✔
217
    taosRUnLockLatch(&pTopic->lock);
1,480,648✔
218
  }
219
  mndReleaseTopic(pMnode, pTopic);
1,480,648✔
220
}
1,480,648✔
221

222
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, const char *user, const char* token) {
1,696,555✔
223
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
1,696,555✔
224
    return TSDB_CODE_INVALID_PARA;
×
225
  }
226
  int32_t code = 0;
1,696,555✔
227
  int32_t lino = 0;
1,696,555✔
228
  PRINT_LOG_START
1,696,555✔
229

230
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
1,696,555✔
231
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
1,696,555✔
232
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
3,177,203✔
233
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,480,648✔
234
    checkOnePrivilege(topic, pMnode, rsp, user, token);
1,480,648✔
235
  }
236

237
END:
1,696,555✔
238
  PRINT_LOG_END
1,696,555✔
239
  return code;
1,696,555✔
240
}
241

242
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
1,696,555✔
243
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
1,696,555✔
244
    return;
×
245
  }
246
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
3,131,490✔
247
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
1,434,935✔
248
    if (data == NULL){
1,434,935✔
249
      continue;
×
250
    }
251
    mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
1,434,935✔
252

253
    SMqSubscribeObj *pSub = NULL;
1,434,935✔
254
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,434,935✔
255
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
1,434,935✔
256
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,434,935✔
257
    if (code != 0) {
1,434,935✔
258
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
259
      continue;
×
260
    }
261
    taosWLockLatch(&pSub->lock);
1,434,935✔
262
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,434,935✔
263
    if (pConsumerEp) {
1,434,935✔
264
      taosArrayDestroy(pConsumerEp->offsetRows);
1,416,775✔
265
      pConsumerEp->offsetRows = data->offsetRows;
1,416,775✔
266
      data->offsetRows = NULL;
1,416,775✔
267
    }
268
    taosWUnLockLatch(&pSub->lock);
1,434,935✔
269

270
    mndReleaseSubscribe(pMnode, pSub);
1,434,935✔
271
  }
272
}
273

274
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
1,696,555✔
275
  if (pMsg == NULL || rsp == NULL){
1,696,555✔
276
    return TSDB_CODE_INVALID_PARA;
×
277
  }
278
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
1,696,555✔
279
  if (tlen <= 0){
1,696,555✔
280
    return TSDB_CODE_TMQ_INVALID_MSG;
×
281
  }
282
  void   *buf = rpcMallocCont(tlen);
1,696,555✔
283
  if (buf == NULL) {
1,696,555✔
284
    return terrno;
×
285
  }
286

287
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
1,696,555✔
288
    rpcFreeCont(buf);
×
289
    return TSDB_CODE_TMQ_INVALID_MSG;
×
290
  }
291
  pMsg->info.rsp = buf;
1,696,555✔
292
  pMsg->info.rspLen = tlen;
1,696,555✔
293
  return 0;
1,696,555✔
294
}
295

296
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
1,705,717✔
297
  if (pMsg == NULL) {
1,705,717✔
298
    return TSDB_CODE_INVALID_PARA;
×
299
  }
300
  int32_t         code = 0;
1,705,717✔
301
  int32_t         lino = 0;
1,705,717✔
302
  SMnode         *pMnode = pMsg->info.node;
1,705,717✔
303
  SMqHbReq        req = {0};
1,705,717✔
304
  SMqHbRsp        rsp = {0};
1,705,717✔
305
  SMqConsumerObj *pConsumer = NULL;
1,705,717✔
306
  PRINT_LOG_START
1,705,717✔
307

308
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
1,705,717✔
309
  int64_t consumerId = req.consumerId;
1,705,717✔
310
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
1,705,717✔
311
  taosWLockLatch(&pConsumer->lock);
1,696,555✔
312
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
1,696,555✔
313
  atomic_store_32(&pConsumer->hbStatus, 0);
1,696,555✔
314
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
1,696,555✔
315
  if (req.pollFlag == 1){
1,696,555✔
316
    atomic_store_32(&pConsumer->pollStatus, 0);
912,713✔
317
    pConsumer->pollTime = taosGetTimestampMs();
1,825,426✔
318
  }
319

320
  storeOffsetRows(pMnode, &req, pConsumer);
1,696,555✔
321
  rsp.debugFlag = tqClientDebugFlag;
1,696,555✔
322
  code = buildMqHbRsp(pMsg, &rsp);
1,696,555✔
323

324
END:
1,705,717✔
325
  if (pConsumer != NULL) {
1,705,717✔
326
    taosWUnLockLatch(&pConsumer->lock);
1,696,555✔
327
  }
328
  tDestroySMqHbRsp(&rsp);
1,705,717✔
329
  mndReleaseConsumer(pMnode, pConsumer);
1,705,717✔
330
  tDestroySMqHbReq(&req);
1,705,717✔
331
  PRINT_LOG_END
1,705,717✔
332
  return code;
1,705,717✔
333
}
334

335
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp) {
13,105,682✔
336
  int32_t         code = 0;
13,105,682✔
337
  int32_t         lino = 0;
13,105,682✔
338
  SMqSubscribeObj *pSub = NULL;
13,105,682✔
339
  SMqSubTopicEp topicEp = {0};
13,105,682✔
340
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
13,105,682✔
341
  PRINT_LOG_START
13,105,682✔
342
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
13,105,682✔
343
  if(mndAcquireSubscribeByKey(pMnode, key, &pSub) != 0) {
13,105,682✔
344
    mWarn("%s failed to acquire subscribe by key:%s", __func__, key);
×
345
    goto END;
×
346
  }
347
  tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
13,105,682✔
348

349
  taosWLockLatch(&pSub->lock);
13,105,682✔
350
  // 2.2 iterate all vg assigned to the consumer of that topic
351
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
13,105,682✔
352
  MND_TMQ_NULL_CHECK(pConsumerEp);
13,105,682✔
353
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
13,105,682✔
354
  topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
13,105,682✔
355
  MND_TMQ_NULL_CHECK(topicEp.vgs);
13,105,682✔
356

357
  tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
13,105,682✔
358
  for (int32_t j = 0; j < vgNum; j++) {
14,326,560✔
359
    int32_t *vgId = taosArrayGet(pConsumerEp->vgs, j);
13,426,792✔
360
    if (vgId == NULL) {
13,426,792✔
361
      continue;
×
362
    }
363
    SMqSubVgEp vgEp = {.epSet = {0}, .vgId = *vgId, .offset = -1};
13,426,792✔
364
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, *vgId);
13,426,792✔
365
    if (pVgroup == NULL) {
13,426,792✔
366
      mWarn("failed to acquire vgroup:%d", *vgId);
12,205,914✔
367
      code = terrno;
12,205,914✔
368
      goto END;
12,205,914✔
369
    }
370
    vgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,220,878✔
371
    mndReleaseVgroup(pMnode, pVgroup);
1,220,878✔
372
    MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
2,441,756✔
373
  }
374
  MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
1,799,536✔
375
  topicEp.vgs = NULL;
899,768✔
376

377
END:
13,105,682✔
378
  if (pSub != NULL) {
13,105,682✔
379
    taosWUnLockLatch(&pSub->lock);
13,105,682✔
380
  }
381
  taosArrayDestroy(topicEp.vgs);
13,105,682✔
382
  mndReleaseSubscribe(pMnode, pSub);
13,105,682✔
383
  PRINT_LOG_END
13,105,682✔
384
  return code;
13,105,682✔
385
}
386

387
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqAskEpRsp *rsp){
13,335,128✔
388
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
13,335,128✔
389
    return TSDB_CODE_INVALID_PARA;
×
390
  }
391
  int32_t code = 0;
13,335,128✔
392
  int32_t lino = 0;
13,335,128✔
393
  PRINT_LOG_START
13,335,128✔
394

395
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
13,335,128✔
396
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
13,335,128✔
397
  MND_TMQ_NULL_CHECK(rsp->topics);
13,335,128✔
398

399
  // handle all topics subscribed by this consumer
400
  for (int32_t i = 0; i < numOfTopics; i++) {
14,234,896✔
401
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
13,105,682✔
402
    MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp));
13,105,682✔
403
  }
404

405
END:
1,129,214✔
406
  PRINT_LOG_END
13,335,128✔
407
  return code;
13,335,128✔
408
}
409

410
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
6,060,034✔
411
  if (pMsg == NULL || rsp == NULL) {
6,060,034✔
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414
  int32_t code = 0;
6,060,034✔
415
  // encode rsp
416
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
6,060,034✔
417
  void   *buf = rpcMallocCont(tlen);
6,060,034✔
418
  if (buf == NULL) {
6,059,621✔
419
    return terrno;
×
420
  }
421

422
  SMqRspHead *pHead = buf;
6,059,621✔
423

424
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
6,059,621✔
425
  pHead->epoch = serverEpoch;
6,059,621✔
426
  pHead->consumerId = consumerId;
6,059,621✔
427
  pHead->walsver = 0;
6,060,034✔
428
  pHead->walever = 0;
6,059,621✔
429

430
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
6,059,621✔
431
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
6,059,644✔
432
    rpcFreeCont(buf);
×
433
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
434
  }
435

436
  // send rsp
437
  pMsg->info.rsp = buf;
6,059,644✔
438
  pMsg->info.rspLen = tlen;
6,059,644✔
439
  return code;
6,059,644✔
440
}
441

442
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
18,272,672✔
443
  if (pMsg == NULL) {
18,272,672✔
444
    return TSDB_CODE_INVALID_PARA;
×
445
  }
446
  SMnode     *pMnode = pMsg->info.node;
18,272,672✔
447
  SMqAskEpReq req = {0};
18,272,672✔
448
  SMqAskEpRsp rsp = {0};
18,272,672✔
449
  int32_t     code = 0;
18,271,573✔
450
  int32_t     lino = 0;
18,271,573✔
451
  SMqConsumerObj *pConsumer = NULL;
18,271,573✔
452
  PRINT_LOG_START
18,270,747✔
453

454
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
18,272,329✔
455
  int64_t consumerId = req.consumerId;
18,273,085✔
456
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
18,273,085✔
457
  taosRLockLatch(&pConsumer->lock);
18,265,610✔
458
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
18,265,948✔
459
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
460
           pConsumer->cgroup);
461
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
462
    goto END;
×
463
  }
464

465
  // 1. check consumer status
466
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
18,264,877✔
467
  int32_t status = atomic_load_32(&pConsumer->status);
18,265,267✔
468
  if (status != MQ_CONSUMER_STATUS_READY) {
18,265,267✔
469
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
1,627,705✔
470
    rsp.code = TSDB_CODE_MND_CONSUMER_NOT_READY;
1,627,705✔
471
  } else {
472
    int32_t epoch = req.epoch;
16,637,562✔
473

474
    // 2. check epoch, only send ep info when epochs do not match
475
    if (epoch != serverEpoch) {
16,637,562✔
476
      mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
13,334,785✔
477
            consumerId, epoch, serverEpoch);
478
      MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, &rsp));
13,335,128✔
479
    }
480
  }
481
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
6,059,696✔
482

483
END:
18,272,695✔
484
  if (pConsumer != NULL) {
18,272,282✔
485
    taosRUnLockLatch(&pConsumer->lock);
18,265,558✔
486
  }
487
  tDeleteSMqAskEpRsp(&rsp);
488
  mndReleaseConsumer(pMnode, pConsumer);
18,272,695✔
489
  PRINT_LOG_END
18,273,085✔
490
  return code;
18,273,085✔
491
}
492

493
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
134,126✔
494
  if (pConsumer == NULL || pTrans == NULL) {
134,126✔
495
    return TSDB_CODE_INVALID_PARA;
×
496
  }
497
  int32_t  code = 0;
134,126✔
498
  int32_t lino = 0;
134,126✔
499
  PRINT_LOG_START
134,126✔
500
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
134,126✔
501
  MND_TMQ_NULL_CHECK(pCommitRaw);
134,126✔
502
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
134,126✔
503
  if (code != 0) {
134,126✔
504
    sdbFreeRaw(pCommitRaw);
×
505
    goto END;
×
506
  }
507
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
134,126✔
508

509
END:
134,126✔
510
  PRINT_LOG_END
134,126✔
511
  return code;
134,126✔
512
}
513

514
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
671,843✔
515
  if (pConsumer == NULL || pTrans == NULL) {
671,843✔
516
    return TSDB_CODE_INVALID_PARA;
×
517
  }
518
  int32_t  code = 0;
671,843✔
519
  int32_t lino = 0;
671,843✔
520
  PRINT_LOG_START
671,843✔
521
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
671,843✔
522
  MND_TMQ_NULL_CHECK(pCommitRaw);
671,843✔
523
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
671,843✔
524
  if (code != 0) {
671,843✔
525
    sdbFreeRaw(pCommitRaw);
×
526
    goto END;
×
527
  }
528
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
671,843✔
529
END:
671,843✔
530

531
  return code;
671,843✔
532
}
533

534
static void freeItem(void *param) {
383✔
535
  if (param == NULL) {
383✔
536
    return;
×
537
  }
538
  void *pItem = *(void **)param;
383✔
539
  if (pItem != NULL) {
383✔
540
    taosMemoryFree(pItem);
383✔
541
  }
542
}
543

544
#define ADD_TOPIC_TO_ARRAY(element, array) \
545
char *newTopicCopy = taosStrdup(element); \
546
MND_TMQ_NULL_CHECK(newTopicCopy);\
547
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
548
  taosMemoryFree(newTopicCopy);\
549
  code = terrno;\
550
  goto END;\
551
}
552

553
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
145,746✔
554
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
145,746✔
555
    return TSDB_CODE_INVALID_PARA;
×
556
  }
557
  int32_t code = 0;
145,746✔
558
  int32_t lino = 0;
145,746✔
559
  PRINT_LOG_START
145,746✔
560
  taosRLockLatch(&pExistedConsumer->lock);
145,746✔
561

562
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
145,746✔
563
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
145,746✔
564
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
145,746✔
565
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
145,746✔
566

567
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
145,746✔
568
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
145,746✔
569
  int32_t i = 0, j = 0;
145,746✔
570
  while (i < oldTopicNum || j < newTopicNum) {
294,588✔
571
    if (i >= oldTopicNum) {
148,842✔
572
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
1,150✔
573
      MND_TMQ_NULL_CHECK(tmp);
1,150✔
574
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
2,300✔
575
      j++;
1,150✔
576
      continue;
1,150✔
577
    } else if (j >= newTopicNum) {
147,692✔
578
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
141,516✔
579
      MND_TMQ_NULL_CHECK(tmp);
141,516✔
580
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
283,032✔
581
      i++;
141,516✔
582
      continue;
141,516✔
583
    } else {
584
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
6,176✔
585
      MND_TMQ_NULL_CHECK(oldTopic);
6,176✔
586
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
6,176✔
587
      MND_TMQ_NULL_CHECK(newTopic);
6,176✔
588
      int   comp = strcmp(oldTopic, newTopic);
6,176✔
589
      if (comp == 0) {
6,176✔
590
        i++;
6,176✔
591
        j++;
6,176✔
592
        continue;
6,176✔
593
      } else if (comp < 0) {
×
594
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
595
        i++;
×
596
        continue;
×
597
      } else {
×
598
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
599
        j++;
×
600
        continue;
×
601
      }
602
    }
603
  }
604
  // no topics need to be rebalanced
605
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
145,746✔
606
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
6,176✔
607
  }
608

609
END:
139,570✔
610
  taosRUnLockLatch(&pExistedConsumer->lock);
145,746✔
611
  PRINT_LOG_END
145,746✔
612
  return code;
145,746✔
613
}
614

615
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
334,009✔
616
  if (pTopicList == NULL || pMnode == NULL) {
334,009✔
617
    return TSDB_CODE_INVALID_PARA;
×
618
  }
619
  taosArraySort(pTopicList, taosArrayCompareString);
334,009✔
620
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
334,009✔
621

622
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
334,009✔
623
  for (int i = 0; i < newTopicNum; i++) {
532,281✔
624
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
198,685✔
625
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
198,685✔
626
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
413✔
627
    }
628
  }
629
  return 0;
333,596✔
630
}
631

632
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
331,016✔
633
  if (pMnode == NULL || subscribe == NULL) {
331,016✔
634
    return TSDB_CODE_INVALID_PARA;
×
635
  }
636
  int64_t         consumerId = subscribe->consumerId;
331,016✔
637
  char           *cgroup     = subscribe->cgroup;
331,016✔
638
  SMqConsumerObj *pConsumerNew     = NULL;
331,016✔
639
  SMqConsumerObj *pExistedConsumer = NULL;
331,016✔
640
  int32_t lino = 0;
331,016✔
641
  PRINT_LOG_START
331,016✔
642
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
331,016✔
643
  if (code != 0) {
331,016✔
644
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
185,270✔
645
              ",cgroup:%s, numOfTopics:%d", consumerId,
646
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
647

648
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
185,270✔
649
  } else {
650
    int32_t status = atomic_load_32(&pExistedConsumer->status);
145,746✔
651

652
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
145,746✔
653
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
654
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
655
          (int32_t)taosArrayGetSize(subscribe->topicNames));
656

657
    if (status != MQ_CONSUMER_STATUS_READY) {
145,746✔
658
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
659
      goto END;
×
660
    }
661
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
145,746✔
662
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
145,746✔
663
  }
664
  if (ppConsumer){
324,840✔
665
    *ppConsumer = pConsumerNew;
324,840✔
666
    pConsumerNew = NULL;
324,840✔
667
  }
668

669
END:
330,674✔
670
  PRINT_LOG_END
331,016✔
671
  mndReleaseConsumer(pMnode, pExistedConsumer);
331,016✔
672
  tDeleteSMqConsumerObj(pConsumerNew);
331,016✔
673
  return code;
331,016✔
674
}
675

676
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
437,190✔
677
  if (pMsg == NULL) {
437,190✔
678
    return TSDB_CODE_INVALID_PARA;
×
679
  }
680
  SMnode         *pMnode = pMsg->info.node;
437,190✔
681
  char           *msgStr = pMsg->pCont;
437,190✔
682
  int32_t         code = 0;
437,190✔
683
  int32_t         lino = 0;
437,190✔
684
  SMqConsumerObj *pConsumerNew = NULL;
437,190✔
685
  STrans         *pTrans = NULL;
437,190✔
686

687
  PRINT_LOG_START
437,190✔
688
  SCMSubscribeReq subscribe = {0};
437,190✔
689
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
874,380✔
690
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
437,190✔
691
  if(unSubscribe){
437,190✔
692
    SMqConsumerObj *pConsumerTmp = NULL;
241,601✔
693
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
241,601✔
694
    taosRLockLatch(&pConsumerTmp->lock);
237,482✔
695
    size_t topicNum = taosArrayGetSize(pConsumerTmp->assignedTopics);
237,482✔
696
    taosRUnLockLatch(&pConsumerTmp->lock);
237,482✔
697
    mndReleaseConsumer(pMnode, pConsumerTmp);
237,482✔
698
    if (topicNum == 0){
237,482✔
699
      goto END;
99,062✔
700
    }
701
  }
702
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
334,009✔
703
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
333,596✔
704
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
705
                          pMsg, "subscribe");
706
  MND_TMQ_NULL_CHECK(pTrans);
333,596✔
707

708
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
333,596✔
709
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
331,016✔
710
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
324,840✔
711
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
324,840✔
712
  code = TSDB_CODE_ACTION_IN_PROGRESS;
324,840✔
713

714
END:
437,190✔
715
  mndTransDrop(pTrans);
437,190✔
716
  tDeleteSMqConsumerObj(pConsumerNew);
437,190✔
717
  taosArrayDestroyP(subscribe.topicNames, NULL);
437,190✔
718
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
437,190✔
719
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS){
437,190✔
720
    mError("tmq subscribe request from consumer:0x%" PRIx64 " failed, code:%d", subscribe.consumerId, code);
2,993✔
721
  } else {
722
    mInfo("tmq subscribe request from consumer:0x%" PRIx64 " processed, code:%d", subscribe.consumerId, code);
434,197✔
723
  }
724
  return code;
437,190✔
725
}
726

727
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
945,135✔
728
  if (pConsumer == NULL) {
945,135✔
729
    return NULL;
×
730
  }
731
  int32_t code = 0;
945,135✔
732
  int32_t lino = 0;
945,135✔
733
  terrno = TSDB_CODE_OUT_OF_MEMORY;
945,135✔
734

735
  void   *buf = NULL;
945,135✔
736
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
945,135✔
737
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
945,135✔
738

739
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
945,135✔
740
  if (pRaw == NULL) goto CM_ENCODE_OVER;
945,135✔
741

742
  buf = taosMemoryMalloc(tlen);
945,135✔
743
  if (buf == NULL) goto CM_ENCODE_OVER;
945,135✔
744

745
  void *abuf = buf;
945,135✔
746
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
945,135✔
747
    goto CM_ENCODE_OVER;
×
748
  }
749

750
  int32_t dataPos = 0;
945,135✔
751
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
945,135✔
752
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
945,135✔
753
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
945,135✔
754
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
945,135✔
755

756
  terrno = TSDB_CODE_SUCCESS;
945,135✔
757

758
CM_ENCODE_OVER:
945,135✔
759
  taosMemoryFreeClear(buf);
945,135✔
760
  if (terrno != 0) {
945,135✔
761
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
762
    sdbFreeRaw(pRaw);
×
763
    return NULL;
×
764
  }
765

766
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
945,135✔
767
  return pRaw;
945,135✔
768
}
769

770
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
820,734✔
771
  if (pRaw == NULL) {
820,734✔
772
    return NULL;
×
773
  }
774
  int32_t         code = 0;
820,734✔
775
  int32_t         lino = 0;
820,734✔
776
  SSdbRow        *pRow = NULL;
820,734✔
777
  SMqConsumerObj *pConsumer = NULL;
820,734✔
778
  void           *buf = NULL;
820,734✔
779

780
  terrno = 0;
820,734✔
781
  int8_t sver = 0;
820,734✔
782
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
820,734✔
783
    goto CM_DECODE_OVER;
×
784
  }
785

786
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
820,734✔
787
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
788
    goto CM_DECODE_OVER;
×
789
  }
790

791
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
820,734✔
792
  if (pRow == NULL) {
820,734✔
793
    goto CM_DECODE_OVER;
×
794
  }
795

796
  pConsumer = sdbGetRowObj(pRow);
820,734✔
797
  if (pConsumer == NULL) {
820,734✔
798
    goto CM_DECODE_OVER;
×
799
  }
800

801
  int32_t dataPos = 0;
820,734✔
802
  int32_t len;
819,936✔
803
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
820,734✔
804
  buf = taosMemoryMalloc(len);
820,734✔
805
  if (buf == NULL) {
820,734✔
806
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
807
    goto CM_DECODE_OVER;
×
808
  }
809

810
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
820,734✔
811
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
820,734✔
812

813
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
820,734✔
814
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
815
    goto CM_DECODE_OVER;
×
816
  }
817

818
CM_DECODE_OVER:
820,734✔
819
  taosMemoryFreeClear(buf);
820,734✔
820
  if (terrno != TSDB_CODE_SUCCESS) {
820,734✔
821
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
822
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
823
    taosMemoryFreeClear(pRow);
×
824
  }
825

826
  return pRow;
820,734✔
827
}
828

829
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
194,069✔
830
  if (pConsumer == NULL) {
194,069✔
831
    return TSDB_CODE_INVALID_PARA;
×
832
  }
833
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
194,069✔
834
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
835
  pConsumer->subscribeTime = pConsumer->createTime;
194,069✔
836
  return 0;
194,069✔
837
}
838

839
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
820,734✔
840
  if (pConsumer == NULL) {
820,734✔
841
    return TSDB_CODE_INVALID_PARA;
×
842
  }
843
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
820,734✔
844
        mndConsumerStatusName(pConsumer->status));
845
  tClearSMqConsumerObj(pConsumer);
820,734✔
846
  return 0;
820,734✔
847
}
848

849
// remove from topic list
850
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
487,695✔
851
  if (topicList == NULL || pTopic == NULL) {
487,695✔
852
    return;
×
853
  }
854
  int32_t size = taosArrayGetSize(topicList);
487,695✔
855
  for (int32_t i = 0; i < size; i++) {
492,507✔
856
    char *p = taosArrayGetP(topicList, i);
487,085✔
857
    if (strcmp(pTopic, p) == 0) {
487,085✔
858
      taosArrayRemove(topicList, i);
482,273✔
859
      taosMemoryFree(p);
482,273✔
860

861
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
482,273✔
862
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
863
      break;
482,273✔
864
    }
865
  }
866
}
867

868
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
191,151✔
869
  if (pConsumer == NULL || pTopic == NULL) {
191,151✔
870
    return false;
×
871
  }
872
  bool    existing = false;
191,151✔
873
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
191,151✔
874
  for (int32_t i = 0; i < size; i++) {
194,578✔
875
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
3,427✔
876
    if (topic && strcmp(topic, pTopic) == 0) {
3,427✔
877
      existing = true;
×
878
      break;
×
879
    }
880
  }
881

882
  return existing;
191,151✔
883
}
884

885
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
491,262✔
886
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
491,262✔
887
    return TSDB_CODE_INVALID_PARA;
×
888
  }
889
  int32_t lino = 0;
491,262✔
890
  int32_t code = 0;
491,262✔
891
  char *pNewTopic = NULL;
491,262✔
892
  PRINT_LOG_START
491,262✔
893
  taosWLockLatch(&pOldConsumer->lock);
491,262✔
894
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
491,262✔
895
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
896

897
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
491,262✔
898
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
140,904✔
899
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
140,904✔
900
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
140,904✔
901

902
    pOldConsumer->subscribeTime = taosGetTimestampMs();
140,904✔
903
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
140,904✔
904
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
140,904✔
905
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
350,358✔
906
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
10,935✔
907
    pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
10,935✔
908
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
10,935✔
909
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
10,935✔
910
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
339,423✔
911
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
191,151✔
912
    MND_TMQ_NULL_CHECK(tmp);
191,151✔
913
    char *pNewTopic = taosStrdup(tmp);
191,151✔
914
    MND_TMQ_NULL_CHECK(pNewTopic);
191,151✔
915
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
191,151✔
916
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
191,151✔
917
    if (existing) {
191,151✔
918
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
919
    } else {
920
      MND_TMQ_NULL_CHECK(taosArrayPush(pOldConsumer->currentTopics, &pNewTopic));
382,302✔
921
      pNewTopic = NULL;
191,151✔
922
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
191,151✔
923
    }
924

925
    int32_t status = pOldConsumer->status;
191,151✔
926
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
191,151✔
927
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
188,055✔
928
    }
929

930
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
191,151✔
931
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
191,151✔
932

933
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
191,151✔
934
          ", current topics:%d, newTopics:%d, removeTopics:%d",
935
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
936
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
937
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
938
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
939

940
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
148,272✔
941
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
148,272✔
942
    MND_TMQ_NULL_CHECK(topic);
148,272✔
943
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
148,272✔
944
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
148,272✔
945

946
    int32_t status = pOldConsumer->status;
148,272✔
947
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
148,272✔
948
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
145,176✔
949
    }
950
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
148,272✔
951
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
148,272✔
952

953
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
148,272✔
954
          ", current topics:%d, newTopics:%d, removeTopics:%d",
955
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
956
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
957
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
958
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
959
  }
960

961
END:
×
962
  taosMemoryFree(pNewTopic);
491,262✔
963
  taosWUnLockLatch(&pOldConsumer->lock);
491,262✔
964

965
  PRINT_LOG_END
491,262✔
966

967
  return code;
491,262✔
968
}
969

970
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
20,684,351✔
971
  if (pMnode == NULL || pConsumer == NULL) {
20,684,351✔
972
    return TSDB_CODE_INVALID_PARA;
×
973
  }
974
  SSdb           *pSdb = pMnode->pSdb;
20,684,351✔
975
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
20,684,351✔
976
  if (*pConsumer == NULL) {
20,684,013✔
977
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
205,688✔
978
  }
979
  return 0;
20,478,325✔
980
}
981

982
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
41,532,523✔
983
  if (pMnode == NULL || pConsumer == NULL) {
41,532,523✔
984
    return;
19,140,867✔
985
  }
986
  SSdb *pSdb = pMnode->pSdb;
22,391,656✔
987
  sdbRelease(pSdb, pConsumer);
22,391,243✔
988
}
989

990
static int32_t buildResult(SMqConsumerObj *pConsumer, SShowObj *pShow, SSDataBlock *pBlock, int32_t numOfRows, const char* showTopic, bool hasTopic) {
124,254✔
991
  int32_t         code = 0;
124,254✔
992
  int32_t         lino = 0;
124,254✔
993
  SColumnInfoData *pColInfo = NULL;
124,254✔
994
  int32_t          cols = 0;
124,254✔
995
  char * parasStr = NULL;
124,254✔
996
  char           *status = NULL;
124,254✔
997

998
  PRINT_LOG_START
124,254✔
999
  // consumer id
1000
  char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1001
  (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
124,254✔
1002
  varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
124,254✔
1003

1004
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1005
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1006
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
124,254✔
1007

1008
  // consumer group
1009
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1010
  STR_TO_VARSTR(cgroup, pConsumer->cgroup);
124,254✔
1011

1012
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1013
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1014
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
124,254✔
1015

1016
  // client id
1017
  char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1018
  STR_TO_VARSTR(clientId, pConsumer->clientId);
124,254✔
1019

1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1021
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1022
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
124,254✔
1023

1024
  // user
1025
  char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1026
  STR_TO_VARSTR(user, pConsumer->user);
124,254✔
1027

1028
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1029
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1030
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
124,254✔
1031

1032
  // fqdn
1033
  char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1034
  STR_TO_VARSTR(fqdn, pConsumer->fqdn);
124,254✔
1035

1036
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1037
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1038
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
124,254✔
1039

1040
  // status
1041
  const char *pStatusName = mndConsumerStatusName(pConsumer->status);
124,254✔
1042
  status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
124,254✔
1043
  MND_TMQ_NULL_CHECK(status);
124,254✔
1044
  STR_TO_VARSTR(status, pStatusName);
124,254✔
1045

1046
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1047
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1048
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
124,254✔
1049

1050
  // one subscribed topic
1051
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1052
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1053
  if (hasTopic) {
124,254✔
1054
    char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
124,254✔
1055
    mndTopicGetShowName(showTopic, topic + VARSTR_HEADER_SIZE);
124,254✔
1056
    *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
124,254✔
1057
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
124,254✔
1058
  } else {
1059
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1060
  }
1061

1062
  // up time
1063
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1064
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1065
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
124,254✔
1066

1067
  // subscribe time
1068
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1069
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1070
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
124,254✔
1071

1072
  // rebalance time
1073
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1074
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1075
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
124,254✔
1076

1077
  char         buf[TSDB_OFFSET_LEN] = {0};
124,254✔
1078
  STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
124,254✔
1079
  tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
124,254✔
1080

1081
  parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
124,254✔
1082
  MND_TMQ_NULL_CHECK(parasStr);
124,254✔
1083
  (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
248,508✔
1084
          pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
124,254✔
1085
  varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
124,254✔
1086

1087
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1088
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1089
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
124,254✔
1090

1091
  // rebalance time
1092
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
124,254✔
1093
  MND_TMQ_NULL_CHECK(pColInfo);
124,254✔
1094
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
124,254✔
1095

1096
END:
124,254✔
1097
  PRINT_LOG_END
124,254✔
1098
  taosMemoryFreeClear(status);
124,254✔
1099
  taosMemoryFreeClear(parasStr);
124,254✔
1100
  return code;
124,254✔
1101
}
1102

1103
static int32_t retrieveOneConsumer(SRpcMsg *pReq, SMqConsumerObj *pConsumer, SUserObj *pOperUser, bool showAll,
125,318✔
1104
                                   int32_t *numOfRows, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1105
  int32_t code = 0;
125,318✔
1106
  int32_t lino = 0;
125,318✔
1107
  SMnode *pMnode = pReq->info.node;
125,318✔
1108
  PRINT_LOG_START
125,318✔
1109
  taosRLockLatch(&pConsumer->lock);
125,318✔
1110
  mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
125,318✔
1111
  if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
125,318✔
1112
    mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
904✔
1113
    goto END;
904✔
1114
  }
1115

1116
  int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
124,414✔
1117
  bool    hasTopic = true;
124,414✔
1118
  if (topicSz == 0) {
124,414✔
1119
    hasTopic = false;
×
1120
    topicSz = 1;
×
1121
  }
1122

1123
  if (*numOfRows + topicSz > rowsCapacity) {
124,414✔
1124
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + topicSz));
×
1125
  }
1126

1127
  for (int32_t i = 0; i < topicSz; i++) {
248,828✔
1128
    char *pTopicFName = taosArrayGetP(pConsumer->assignedTopics, i);
124,414✔
1129
    if (!showAll && (strncmp(pOperUser->name, pConsumer->user, TSDB_USER_LEN) != 0)) {
124,414✔
1130
      bool         showConsumer = false;
320✔
1131
      SMqTopicObj *pTopic = NULL;
320✔
1132
      (void)mndAcquireTopic(pMnode, pTopicFName, &pTopic);
320✔
1133
      if (pTopic) {
320✔
1134
        SName name = {0};  // 1.topic1
320✔
1135
        if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
320✔
1136
          if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CONSUMER_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
320✔
1137
                                            pTopic->db, name.dbname)) {
320✔
1138
            showConsumer = true;
160✔
1139
          }
1140
        }
1141
        mndReleaseTopic(pMnode, pTopic);
320✔
1142
      }
1143
      if (!showConsumer) {
320✔
1144
        continue;
160✔
1145
      }
1146
    }
1147
    // char  topic[TSDB_TOPIC_FNAME_LEN] = {0};
1148
    // mndTopicGetShowName(showTopic, topic);
1149
    MND_TMQ_RETURN_CHECK(buildResult(pConsumer, pShow, pBlock, *numOfRows, pTopicFName, hasTopic));
124,254✔
1150
    (*numOfRows)++;
124,254✔
1151
  }
1152

1153
END:
124,414✔
1154
  taosRUnLockLatch(&pConsumer->lock);
125,318✔
1155
  PRINT_LOG_END
125,318✔
1156
  return code;
125,318✔
1157
}
1158

1159
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
20,210✔
1160
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
20,210✔
1161
    return TSDB_CODE_INVALID_PARA;
×
1162
  }
1163
  SMnode         *pMnode = pReq->info.node;
20,210✔
1164
  SSdb           *pSdb = pMnode->pSdb;
20,210✔
1165
  int32_t         numOfRows = 0;
20,210✔
1166
  SMqConsumerObj *pConsumer = NULL;
20,210✔
1167
  SUserObj       *pOperUser = NULL;
20,210✔
1168
  int32_t         code = 0;
20,210✔
1169
  int32_t         lino = 0;
20,210✔
1170
  bool            showAll = false;
20,210✔
1171
  char            objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
20,210✔
1172
  PRINT_LOG_START
20,210✔
1173

1174
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
20,210✔
1175
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
20,210✔
1176
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_CONSUMER_SHOW,
20,210✔
1177
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1178
  while (numOfRows < rowsCapacity) {
145,528✔
1179
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
145,528✔
1180
    if (pShow->pIter == NULL) {
145,528✔
1181
      break;
20,210✔
1182
    }
1183
    MND_TMQ_RETURN_CHECK(retrieveOneConsumer(pReq, pConsumer, pOperUser, showAll,  &numOfRows, pShow, pBlock, rowsCapacity));
125,318✔
1184
    
1185
    pBlock->info.rows = numOfRows;
125,318✔
1186
    sdbRelease(pSdb, pConsumer);
125,318✔
1187
    pConsumer = NULL;
125,318✔
1188
  }
1189

1190
  pShow->numOfRows += numOfRows;
20,210✔
1191

1192
END:
20,210✔
1193
  sdbRelease(pSdb, pConsumer);
20,210✔
1194
  sdbCancelFetch(pSdb, pShow->pIter);
20,210✔
1195
  mndReleaseUser(pMnode, pOperUser);
20,210✔
1196
  if (code != 0) {
20,210✔
1197
    mError("show consumer failed, code:%d", code);
×
1198
    return code;
×
1199
  } else {
1200
    mDebug("show consumer processed, numOfRows:%d", numOfRows);
20,210✔
1201
    return numOfRows;
20,210✔
1202
  }
1203
}
1204

1205
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1206
  if (pMnode == NULL || pIter == NULL) return;
×
1207
  SSdb *pSdb = pMnode->pSdb;
×
1208
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1209
}
1210

1211
const char *mndConsumerStatusName(int status) {
5,643,091✔
1212
  switch (status) {
5,643,091✔
1213
    case MQ_CONSUMER_STATUS_READY:
2,406,422✔
1214
      return "ready";
2,406,422✔
1215
    case MQ_CONSUMER_STATUS_REBALANCE:
3,236,669✔
1216
      return "rebalancing";
3,236,669✔
1217
    default:
×
1218
      return "unknown";
×
1219
  }
1220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc