• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4944

30 Jan 2026 06:19AM UTC coverage: 66.849% (+0.1%) from 66.718%
#4944

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1124 of 2018 new or added lines in 72 files covered. (55.7%)

13677 existing lines in 155 files now uncovered.

205211 of 306978 relevant lines covered (66.85%)

125657591.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.38
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "tcompare.h"
27
#include "tname.h"
28

29
#define MND_CONSUMER_VER_NUMBER   3
30
#define MND_CONSUMER_RESERVE_SIZE 64
31

32
#define MND_MAX_GROUP_PER_TOPIC 100
33

34
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
36
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
37
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
39

40
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
41
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
42
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
43
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
44

45
int32_t mndInitConsumer(SMnode *pMnode) {
414,834✔
46
  SSdbTable table = {
414,834✔
47
      .sdbType = SDB_CONSUMER,
48
      .keyType = SDB_KEY_INT64,
49
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
51
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
52
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
54
  };
55

56
  if (pMnode == NULL){
414,834✔
UNCOV
57
    return TSDB_CODE_INVALID_PARA;
×
58
  }
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
414,834✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
414,834✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
414,834✔
62
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
414,834✔
63

64
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
414,834✔
65
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
414,834✔
66

67
  return sdbSetTable(pMnode->pSdb, table);
414,834✔
68
}
69

70
void mndCleanupConsumer(SMnode *pMnode) {}
414,774✔
71

72
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
105,460✔
73
  if (pMnode == NULL || info == NULL) {
105,460✔
UNCOV
74
    return TSDB_CODE_INVALID_PARA;
×
75
  }
76
  int32_t code = 0;
105,460✔
77
  int32_t lino = 0;
105,460✔
78
  PRINT_LOG_START
105,460✔
79
  void   *msg  = rpcMallocCont(sizeof(int64_t));
105,460✔
80
  MND_TMQ_NULL_CHECK(msg);
105,460✔
81

82
  *(int64_t*)msg = consumerId;
105,460✔
83
  SRpcMsg rpcMsg = {
105,460✔
84
      .msgType = msgType,
85
      .pCont = msg,
86
      .contLen = sizeof(int64_t),
87
      .info = *info,
88
  };
89

90
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
105,460✔
91
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
105,460✔
92

93
END:
105,460✔
94
  PRINT_LOG_END
105,460✔
95
  return code;
105,460✔
96
}
97

98
static int32_t validateOneTopic(STrans* pTrans,char *pOneTopic, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
161,146✔
99
  int32_t      code = 0;
161,146✔
100
  int32_t lino = 0;
161,146✔
101
  SMqTopicObj *pTopic = NULL;
161,146✔
102

103
  PRINT_LOG_START
161,146✔
104
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
161,146✔
105
  taosRLockLatch(&pTopic->lock);
159,702✔
106

107
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic));
159,702✔
108
  MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
159,538✔
109

110
  if (subscribe->enableReplay) {
159,538✔
111
    if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
1,939✔
112
      code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
554✔
113
      goto END;
554✔
114
    } 
115
    if (pTopic->stbName[0] != 0) {
1,385✔
116
      SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
1,108✔
117
      if (pDb == NULL) {
1,108✔
UNCOV
118
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
119
        goto END;
×
120
      }
121
      if (pDb->cfg.numOfVgroups != 1) {
1,108✔
122
        mndReleaseDb(pMnode, pDb);
277✔
123
        code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
277✔
124
        goto END;
277✔
125
      }
126
      mndReleaseDb(pMnode, pDb);
831✔
127
    }
128
  }
129
  char  key[TSDB_CONSUMER_ID_LEN] = {0};
158,707✔
130
  (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
158,707✔
131
  mndTransSetDbName(pTrans, pTopic->db, key);
158,707✔
132
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
158,707✔
133

134
END:
161,146✔
135
  PRINT_LOG_END
161,146✔
136
  if (pTopic != NULL) {
161,146✔
137
    taosRUnLockLatch(&pTopic->lock);
159,702✔
138
  }
139
  mndReleaseTopic(pMnode, pTopic);
161,146✔
140
  return code;
161,146✔
141
}
142

143
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
267,192✔
144
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || user == NULL) {
267,192✔
UNCOV
145
    return TSDB_CODE_INVALID_PARA;
×
146
  }
147
  int32_t      code = 0;
267,192✔
148
  int32_t lino = 0;
267,192✔
149

150
  PRINT_LOG_START
267,192✔
151
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
267,192✔
152
  for (int32_t i = 0; i < numOfTopics; i++) {
425,899✔
153
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
161,146✔
154
    MND_TMQ_RETURN_CHECK(validateOneTopic(pTrans, pOneTopic, subscribe, pMnode, user, token));
161,146✔
155
  }
156

157
END:
264,753✔
158
  PRINT_LOG_END
267,192✔
159
  return code;
267,192✔
160
}
161

162
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
105,460✔
163
  if (pMsg == NULL || pMsg->pCont == NULL) {
105,460✔
UNCOV
164
    return TSDB_CODE_INVALID_PARA;
×
165
  }
166
  int32_t              code = 0;
105,460✔
167
  int32_t              lino = 0;
105,460✔
168
  SMnode              *pMnode = pMsg->info.node;
105,460✔
169
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
105,460✔
170
  SMqConsumerObj      *pConsumerNew = NULL;
105,460✔
171
  STrans              *pTrans = NULL;
105,460✔
172
  SMqConsumerObj      *pConsumer = NULL;
105,460✔
173

174
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
105,460✔
175
  taosRLockLatch(&pConsumer->lock);
104,536✔
176
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
104,536✔
177
        mndConsumerStatusName(pConsumer->status));
178

179
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
104,536✔
180

181
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
104,536✔
182
  MND_TMQ_NULL_CHECK(pTrans);
104,536✔
183
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
104,536✔
184
  code = mndTransPrepare(pMnode, pTrans);
104,536✔
185

186
END:
105,460✔
187
  PRINT_LOG_END
105,460✔
188
  if (pConsumer != NULL) {
105,460✔
189
    taosRUnLockLatch(&pConsumer->lock);
104,536✔
190
  }
191
  mndReleaseConsumer(pMnode, pConsumer);
105,460✔
192
  tDeleteSMqConsumerObj(pConsumerNew);
105,460✔
193
  mndTransDrop(pTrans);
105,460✔
194
  return code;
105,460✔
195
}
196

197
static void checkOnePrivilege(const char* topic, SMnode *pMnode, SMqHbRsp *rsp, const char *user, const char* token) {
914,238✔
198
  int32_t code = 0;
914,238✔
199
  int32_t lino = 0;
914,238✔
200
  SMqTopicObj *pTopic = NULL;
914,238✔
201
  PRINT_LOG_START
914,238✔
202
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
914,238✔
203
  taosRLockLatch(&pTopic->lock);
914,238✔
204
  STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
914,238✔
205
  MND_TMQ_NULL_CHECK(data);
914,238✔
206
  tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
914,238✔
207
  if (mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
1,827,820✔
208
      grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
913,582✔
209
    data->noPrivilege = 1;
656✔
210
  } else {
211
    data->noPrivilege = 0;
913,582✔
212
  }
213

214
END:
914,238✔
215
  PRINT_LOG_END
914,238✔
216
  if (pTopic != NULL) {
914,238✔
217
    taosRUnLockLatch(&pTopic->lock);
914,238✔
218
  }
219
  mndReleaseTopic(pMnode, pTopic);
914,238✔
220
}
914,238✔
221

222
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, const char *user, const char* token) {
988,775✔
223
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
988,775✔
UNCOV
224
    return TSDB_CODE_INVALID_PARA;
×
225
  }
226
  int32_t code = 0;
988,775✔
227
  int32_t lino = 0;
988,775✔
228
  PRINT_LOG_START
988,775✔
229

230
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
988,775✔
231
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
988,775✔
232
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
1,903,013✔
233
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
914,238✔
234
    checkOnePrivilege(topic, pMnode, rsp, user, token);
914,238✔
235
  }
236

237
END:
988,775✔
238
  PRINT_LOG_END
988,775✔
239
  return code;
988,775✔
240
}
241

242
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
988,775✔
243
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
988,775✔
UNCOV
244
    return;
×
245
  }
246
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
1,880,744✔
247
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
891,969✔
248
    if (data == NULL){
891,969✔
UNCOV
249
      continue;
×
250
    }
251
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
891,969✔
252

253
    SMqSubscribeObj *pSub = NULL;
891,969✔
254
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
891,969✔
255
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
891,969✔
256
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
891,969✔
257
    if (code != 0) {
891,969✔
UNCOV
258
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
259
      continue;
×
260
    }
261
    taosWLockLatch(&pSub->lock);
891,969✔
262
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
891,969✔
263
    if (pConsumerEp) {
891,969✔
264
      taosArrayDestroy(pConsumerEp->offsetRows);
880,616✔
265
      pConsumerEp->offsetRows = data->offsetRows;
880,616✔
266
      data->offsetRows = NULL;
880,616✔
267
    }
268
    taosWUnLockLatch(&pSub->lock);
891,969✔
269

270
    mndReleaseSubscribe(pMnode, pSub);
891,969✔
271
  }
272
}
273

274
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
988,775✔
275
  if (pMsg == NULL || rsp == NULL){
988,775✔
UNCOV
276
    return TSDB_CODE_INVALID_PARA;
×
277
  }
278
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
988,775✔
279
  if (tlen <= 0){
988,775✔
UNCOV
280
    return TSDB_CODE_TMQ_INVALID_MSG;
×
281
  }
282
  void   *buf = rpcMallocCont(tlen);
988,775✔
283
  if (buf == NULL) {
988,775✔
UNCOV
284
    return terrno;
×
285
  }
286

287
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
988,775✔
UNCOV
288
    rpcFreeCont(buf);
×
289
    return TSDB_CODE_TMQ_INVALID_MSG;
×
290
  }
291
  pMsg->info.rsp = buf;
988,775✔
292
  pMsg->info.rspLen = tlen;
988,775✔
293
  return 0;
988,775✔
294
}
295

296
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
993,637✔
297
  if (pMsg == NULL) {
993,637✔
UNCOV
298
    return TSDB_CODE_INVALID_PARA;
×
299
  }
300
  int32_t         code = 0;
993,637✔
301
  int32_t         lino = 0;
993,637✔
302
  SMnode         *pMnode = pMsg->info.node;
993,637✔
303
  SMqHbReq        req = {0};
993,637✔
304
  SMqHbRsp        rsp = {0};
993,637✔
305
  SMqConsumerObj *pConsumer = NULL;
993,637✔
306
  PRINT_LOG_START
993,637✔
307

308
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
993,637✔
309
  int64_t consumerId = req.consumerId;
993,637✔
310
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
993,637✔
311
  taosWLockLatch(&pConsumer->lock);
988,775✔
312
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
988,775✔
313
  atomic_store_32(&pConsumer->hbStatus, 0);
988,775✔
314
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
988,775✔
315
  if (req.pollFlag == 1){
988,775✔
316
    atomic_store_32(&pConsumer->pollStatus, 0);
434,067✔
317
    pConsumer->pollTime = taosGetTimestampMs();
868,134✔
318
  }
319

320
  storeOffsetRows(pMnode, &req, pConsumer);
988,775✔
321
  rsp.debugFlag = tqClientDebugFlag;
988,775✔
322
  code = buildMqHbRsp(pMsg, &rsp);
988,775✔
323

324
END:
993,637✔
325
  if (pConsumer != NULL) {
993,637✔
326
    taosWUnLockLatch(&pConsumer->lock);
988,775✔
327
  }
328
  tDestroySMqHbRsp(&rsp);
993,637✔
329
  mndReleaseConsumer(pMnode, pConsumer);
993,637✔
330
  tDestroySMqHbReq(&req);
993,637✔
331
  PRINT_LOG_END
993,637✔
332
  return code;
993,637✔
333
}
334

335
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp, int32_t epoch) {
1,110,263✔
336
  int32_t         code = 0;
1,110,263✔
337
  int32_t         lino = 0;
1,110,263✔
338
  SMqSubscribeObj *pSub = NULL;
1,110,263✔
339
  SMqSubTopicEp topicEp = {0};
1,110,263✔
340
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,110,263✔
341
  PRINT_LOG_START
1,110,263✔
342
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,110,263✔
343
  if(mndAcquireSubscribeByKey(pMnode, key, &pSub) != 0) {
1,110,263✔
UNCOV
344
    mWarn("%s failed to acquire subscribe by key:%s", __func__, key);
×
345
    goto END;
×
346
  }
347
  tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
1,110,263✔
348

349
  taosWLockLatch(&pSub->lock);
1,110,263✔
350
  // 2.2 iterate all vg assigned to the consumer of that topic
351
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,110,263✔
352
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,110,263✔
353
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,110,263✔
354
  topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
1,110,263✔
355
  MND_TMQ_NULL_CHECK(topicEp.vgs);
1,110,263✔
356

357
  tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
1,110,263✔
358
  for (int32_t j = 0; j < vgNum; j++) {
2,501,919✔
359
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
1,391,656✔
360
    if (pVgEp == NULL) {
1,391,656✔
UNCOV
361
      continue;
×
362
    }
363
    if (epoch == -1) {
1,391,656✔
364
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
950,629✔
365
      if (pVgroup) {
950,629✔
366
        pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
56,655✔
367
        mndReleaseVgroup(pMnode, pVgroup);
56,655✔
368
      }
369
    }
370
    SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
1,391,656✔
371
    MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
2,783,312✔
372
  }
373
  MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
2,220,526✔
374
  topicEp.vgs = NULL;
1,110,263✔
375

376
END:
1,110,263✔
377
  if (pSub != NULL) {
1,110,263✔
378
    taosWUnLockLatch(&pSub->lock);
1,110,263✔
379
  }
380
  taosArrayDestroy(topicEp.vgs);
1,110,263✔
381
  mndReleaseSubscribe(pMnode, pSub);
1,110,263✔
382
  PRINT_LOG_END
1,110,263✔
383
  return code;
1,110,263✔
384
}
385

386
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
1,216,309✔
387
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
1,216,309✔
UNCOV
388
    return TSDB_CODE_INVALID_PARA;
×
389
  }
390
  int32_t code = 0;
1,216,309✔
391
  int32_t lino = 0;
1,216,309✔
392
  PRINT_LOG_START
1,216,309✔
393

394
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
1,216,309✔
395
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
1,216,309✔
396
  MND_TMQ_NULL_CHECK(rsp->topics);
1,216,309✔
397

398
  // handle all topics subscribed by this consumer
399
  for (int32_t i = 0; i < numOfTopics; i++) {
2,326,572✔
400
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,110,263✔
401
    MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp, epoch));
1,110,263✔
402
  }
403

404
END:
1,216,309✔
405
  PRINT_LOG_END
1,216,309✔
406
  return code;
1,216,309✔
407
}
408

409
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
4,174,565✔
410
  if (pMsg == NULL || rsp == NULL) {
4,174,565✔
UNCOV
411
    return TSDB_CODE_INVALID_PARA;
×
412
  }
413
  int32_t code = 0;
4,174,565✔
414
  // encode rsp
415
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
4,174,244✔
416
  void   *buf = rpcMallocCont(tlen);
4,174,244✔
417
  if (buf == NULL) {
4,173,902✔
UNCOV
418
    return terrno;
×
419
  }
420

421
  SMqRspHead *pHead = buf;
4,173,902✔
422

423
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
4,173,902✔
424
  pHead->epoch = serverEpoch;
4,173,902✔
425
  pHead->consumerId = consumerId;
4,173,902✔
426
  pHead->walsver = 0;
4,174,244✔
427
  pHead->walever = 0;
4,174,244✔
428

429
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
4,173,581✔
430
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
4,174,244✔
UNCOV
431
    rpcFreeCont(buf);
×
432
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
433
  }
434

435
  // send rsp
436
  pMsg->info.rsp = buf;
4,174,244✔
437
  pMsg->info.rspLen = tlen;
4,173,557✔
438
  return code;
4,173,554✔
439
}
440

441
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
4,177,905✔
442
  if (pMsg == NULL) {
4,177,905✔
UNCOV
443
    return TSDB_CODE_INVALID_PARA;
×
444
  }
445
  SMnode     *pMnode = pMsg->info.node;
4,177,905✔
446
  SMqAskEpReq req = {0};
4,177,905✔
447
  SMqAskEpRsp rsp = {0};
4,177,862✔
448
  int32_t     code = 0;
4,177,220✔
449
  int32_t     lino = 0;
4,177,220✔
450
  SMqConsumerObj *pConsumer = NULL;
4,177,220✔
451
  PRINT_LOG_START
4,177,541✔
452

453
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
4,177,541✔
454
  int64_t consumerId = req.consumerId;
4,177,905✔
455
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
4,177,905✔
456
  taosRLockLatch(&pConsumer->lock);
4,174,565✔
457
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
4,174,565✔
UNCOV
458
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
459
           pConsumer->cgroup);
UNCOV
460
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
461
    goto END;
×
462
  }
463

464
  // 1. check consumer status
465
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
4,174,220✔
466
  int32_t status = atomic_load_32(&pConsumer->status);
4,174,220✔
467
  if (status != MQ_CONSUMER_STATUS_READY) {
4,174,565✔
468
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
879,767✔
469
    rsp.code = TSDB_CODE_MND_CONSUMER_NOT_READY;
879,767✔
470
  } else {
471
    int32_t epoch = req.epoch;
3,294,798✔
472

473
    // 2. check epoch, only send ep info when epochs do not match
474
    if (epoch != serverEpoch) {
3,294,798✔
475
      mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
1,216,309✔
476
            consumerId, epoch, serverEpoch);
477
      MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
1,216,309✔
478
    }
479
  }
480
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
4,174,565✔
481

482
END:
4,177,215✔
483
  if (pConsumer != NULL) {
4,177,215✔
484
    taosRUnLockLatch(&pConsumer->lock);
4,173,875✔
485
  }
486
  tDeleteSMqAskEpRsp(&rsp);
487
  mndReleaseConsumer(pMnode, pConsumer);
4,176,557✔
488
  PRINT_LOG_END
4,177,905✔
489
  return code;
4,177,905✔
490
}
491

492
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
105,535✔
493
  if (pConsumer == NULL || pTrans == NULL) {
105,535✔
UNCOV
494
    return TSDB_CODE_INVALID_PARA;
×
495
  }
496
  int32_t  code = 0;
105,535✔
497
  int32_t lino = 0;
105,535✔
498
  PRINT_LOG_START
105,535✔
499
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
105,535✔
500
  MND_TMQ_NULL_CHECK(pCommitRaw);
105,535✔
501
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
105,535✔
502
  if (code != 0) {
105,535✔
UNCOV
503
    sdbFreeRaw(pCommitRaw);
×
504
    goto END;
×
505
  }
506
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
105,535✔
507

508
END:
105,535✔
509
  PRINT_LOG_END
105,535✔
510
  return code;
105,535✔
511
}
512

513
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
531,509✔
514
  if (pConsumer == NULL || pTrans == NULL) {
531,509✔
UNCOV
515
    return TSDB_CODE_INVALID_PARA;
×
516
  }
517
  int32_t  code = 0;
531,509✔
518
  int32_t lino = 0;
531,509✔
519
  PRINT_LOG_START
531,509✔
520
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
531,509✔
521
  MND_TMQ_NULL_CHECK(pCommitRaw);
531,509✔
522
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
531,509✔
523
  if (code != 0) {
531,509✔
UNCOV
524
    sdbFreeRaw(pCommitRaw);
×
525
    goto END;
×
526
  }
527
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
531,509✔
528
END:
531,509✔
529

530
  return code;
531,509✔
531
}
532

533
static void freeItem(void *param) {
313✔
534
  if (param == NULL) {
313✔
UNCOV
535
    return;
×
536
  }
537
  void *pItem = *(void **)param;
313✔
538
  if (pItem != NULL) {
313✔
539
    taosMemoryFree(pItem);
313✔
540
  }
541
}
542

543
#define ADD_TOPIC_TO_ARRAY(element, array) \
544
char *newTopicCopy = taosStrdup(element); \
545
MND_TMQ_NULL_CHECK(newTopicCopy);\
546
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
547
  taosMemoryFree(newTopicCopy);\
548
  code = terrno;\
549
  goto END;\
550
}
551

552
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
116,861✔
553
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
116,861✔
UNCOV
554
    return TSDB_CODE_INVALID_PARA;
×
555
  }
556
  int32_t code = 0;
116,861✔
557
  int32_t lino = 0;
116,861✔
558
  PRINT_LOG_START
116,861✔
559
  taosRLockLatch(&pExistedConsumer->lock);
116,861✔
560

561
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
116,861✔
562
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
116,861✔
563
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
116,861✔
564
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
116,861✔
565

566
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
116,861✔
567
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
116,861✔
568
  int32_t i = 0, j = 0;
116,861✔
569
  while (i < oldTopicNum || j < newTopicNum) {
236,302✔
570
    if (i >= oldTopicNum) {
119,441✔
571
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
627✔
572
      MND_TMQ_NULL_CHECK(tmp);
627✔
573
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
1,254✔
574
      j++;
627✔
575
      continue;
627✔
576
    } else if (j >= newTopicNum) {
118,814✔
577
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
111,206✔
578
      MND_TMQ_NULL_CHECK(tmp);
111,206✔
579
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
222,412✔
580
      i++;
111,206✔
581
      continue;
111,206✔
582
    } else {
583
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
7,608✔
584
      MND_TMQ_NULL_CHECK(oldTopic);
7,608✔
585
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
7,608✔
586
      MND_TMQ_NULL_CHECK(newTopic);
7,608✔
587
      int   comp = strcmp(oldTopic, newTopic);
7,608✔
588
      if (comp == 0) {
7,608✔
589
        i++;
7,608✔
590
        j++;
7,608✔
591
        continue;
7,608✔
UNCOV
592
      } else if (comp < 0) {
×
593
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
594
        i++;
×
595
        continue;
×
596
      } else {
×
597
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
598
        j++;
×
599
        continue;
×
600
      }
601
    }
602
  }
603
  // no topics need to be rebalanced
604
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
116,861✔
605
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
7,608✔
606
  }
607

608
END:
109,253✔
609
  taosRUnLockLatch(&pExistedConsumer->lock);
116,861✔
610
  PRINT_LOG_END
116,861✔
611
  return code;
116,861✔
612
}
613

614
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
267,537✔
615
  if (pTopicList == NULL || pMnode == NULL) {
267,537✔
UNCOV
616
    return TSDB_CODE_INVALID_PARA;
×
617
  }
618
  taosArraySort(pTopicList, taosArrayCompareString);
267,537✔
619
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
267,537✔
620

621
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
267,537✔
622
  for (int i = 0; i < newTopicNum; i++) {
428,683✔
623
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
161,491✔
624
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
161,491✔
625
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
345✔
626
    }
627
  }
628
  return 0;
267,192✔
629
}
630

631
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
264,753✔
632
  if (pMnode == NULL || subscribe == NULL) {
264,753✔
UNCOV
633
    return TSDB_CODE_INVALID_PARA;
×
634
  }
635
  int64_t         consumerId = subscribe->consumerId;
264,753✔
636
  char           *cgroup     = subscribe->cgroup;
264,753✔
637
  SMqConsumerObj *pConsumerNew     = NULL;
264,753✔
638
  SMqConsumerObj *pExistedConsumer = NULL;
264,753✔
639
  int32_t lino = 0;
264,753✔
640
  PRINT_LOG_START
264,753✔
641
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
264,753✔
642
  if (code != 0) {
264,753✔
643
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
147,892✔
644
              ",cgroup:%s, numOfTopics:%d", consumerId,
645
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
646

647
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
147,892✔
648
  } else {
649
    int32_t status = atomic_load_32(&pExistedConsumer->status);
116,861✔
650

651
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
116,861✔
652
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
653
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
654
          (int32_t)taosArrayGetSize(subscribe->topicNames));
655

656
    if (status != MQ_CONSUMER_STATUS_READY) {
116,861✔
UNCOV
657
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
658
      goto END;
×
659
    }
660
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
116,861✔
661
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
116,861✔
662
  }
663
  if (ppConsumer){
257,145✔
664
    *ppConsumer = pConsumerNew;
257,145✔
665
    pConsumerNew = NULL;
257,145✔
666
  }
667

668
END:
264,441✔
669
  PRINT_LOG_END
264,753✔
670
  mndReleaseConsumer(pMnode, pExistedConsumer);
264,753✔
671
  tDeleteSMqConsumerObj(pConsumerNew);
264,753✔
672
  return code;
264,753✔
673
}
674

675
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
355,522✔
676
  if (pMsg == NULL) {
355,522✔
UNCOV
677
    return TSDB_CODE_INVALID_PARA;
×
678
  }
679
  SMnode         *pMnode = pMsg->info.node;
355,522✔
680
  char           *msgStr = pMsg->pCont;
355,522✔
681
  int32_t         code = 0;
355,522✔
682
  int32_t         lino = 0;
355,522✔
683
  SMqConsumerObj *pConsumerNew = NULL;
355,522✔
684
  STrans         *pTrans = NULL;
355,522✔
685

686
  PRINT_LOG_START
355,522✔
687
  SCMSubscribeReq subscribe = {0};
355,522✔
688
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
711,044✔
689
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
355,522✔
690
  if(unSubscribe){
355,522✔
691
    SMqConsumerObj *pConsumerTmp = NULL;
196,611✔
692
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
196,611✔
693
    taosRLockLatch(&pConsumerTmp->lock);
194,856✔
694
    size_t topicNum = taosArrayGetSize(pConsumerTmp->assignedTopics);
194,856✔
695
    taosRUnLockLatch(&pConsumerTmp->lock);
194,856✔
696
    mndReleaseConsumer(pMnode, pConsumerTmp);
194,856✔
697
    if (topicNum == 0){
194,856✔
698
      goto END;
86,230✔
699
    }
700
  }
701
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
267,537✔
702
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
267,192✔
703
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
704
                          pMsg, "subscribe");
705
  MND_TMQ_NULL_CHECK(pTrans);
267,192✔
706

707
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
267,192✔
708
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
264,753✔
709
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
257,145✔
710
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
257,145✔
711
  code = TSDB_CODE_ACTION_IN_PROGRESS;
257,145✔
712

713
END:
355,522✔
714
  mndTransDrop(pTrans);
355,522✔
715
  tDeleteSMqConsumerObj(pConsumerNew);
355,522✔
716
  taosArrayDestroyP(subscribe.topicNames, NULL);
355,522✔
717
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
355,522✔
718
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS){
355,522✔
719
    mError("tmq subscribe request from consumer:0x%" PRIx64 " failed, code:%d", subscribe.consumerId, code);
2,784✔
720
  } else {
721
    mInfo("tmq subscribe request from consumer:0x%" PRIx64 " processed, code:%d", subscribe.consumerId, code);
352,738✔
722
  }
723
  return code;
355,522✔
724
}
725

726
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
752,332✔
727
  if (pConsumer == NULL) {
752,332✔
UNCOV
728
    return NULL;
×
729
  }
730
  int32_t code = 0;
752,332✔
731
  int32_t lino = 0;
752,332✔
732
  terrno = TSDB_CODE_OUT_OF_MEMORY;
752,332✔
733

734
  void   *buf = NULL;
752,332✔
735
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
752,332✔
736
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
752,332✔
737

738
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
752,332✔
739
  if (pRaw == NULL) goto CM_ENCODE_OVER;
752,332✔
740

741
  buf = taosMemoryMalloc(tlen);
752,332✔
742
  if (buf == NULL) goto CM_ENCODE_OVER;
752,332✔
743

744
  void *abuf = buf;
752,332✔
745
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
752,332✔
UNCOV
746
    goto CM_ENCODE_OVER;
×
747
  }
748

749
  int32_t dataPos = 0;
752,332✔
750
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
752,332✔
751
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
752,332✔
752
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
752,332✔
753
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
752,332✔
754

755
  terrno = TSDB_CODE_SUCCESS;
752,332✔
756

757
CM_ENCODE_OVER:
752,332✔
758
  taosMemoryFreeClear(buf);
752,332✔
759
  if (terrno != 0) {
752,332✔
UNCOV
760
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
761
    sdbFreeRaw(pRaw);
×
762
    return NULL;
×
763
  }
764

765
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
752,332✔
766
  return pRaw;
752,332✔
767
}
768

769
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
649,326✔
770
  if (pRaw == NULL) {
649,326✔
UNCOV
771
    return NULL;
×
772
  }
773
  int32_t         code = 0;
649,326✔
774
  int32_t         lino = 0;
649,326✔
775
  SSdbRow        *pRow = NULL;
649,326✔
776
  SMqConsumerObj *pConsumer = NULL;
649,326✔
777
  void           *buf = NULL;
649,326✔
778

779
  terrno = 0;
649,326✔
780
  int8_t sver = 0;
649,326✔
781
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
649,326✔
UNCOV
782
    goto CM_DECODE_OVER;
×
783
  }
784

785
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
649,326✔
UNCOV
786
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
787
    goto CM_DECODE_OVER;
×
788
  }
789

790
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
649,326✔
791
  if (pRow == NULL) {
649,326✔
UNCOV
792
    goto CM_DECODE_OVER;
×
793
  }
794

795
  pConsumer = sdbGetRowObj(pRow);
649,326✔
796
  if (pConsumer == NULL) {
649,326✔
UNCOV
797
    goto CM_DECODE_OVER;
×
798
  }
799

800
  int32_t dataPos = 0;
649,326✔
801
  int32_t len;
648,598✔
802
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
649,326✔
803
  buf = taosMemoryMalloc(len);
649,326✔
804
  if (buf == NULL) {
649,326✔
UNCOV
805
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
806
    goto CM_DECODE_OVER;
×
807
  }
808

809
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
649,326✔
810
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
649,326✔
811

812
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
649,326✔
UNCOV
813
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
814
    goto CM_DECODE_OVER;
×
815
  }
816

817
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
649,326✔
818

819
CM_DECODE_OVER:
649,326✔
820
  taosMemoryFreeClear(buf);
649,326✔
821
  if (terrno != TSDB_CODE_SUCCESS) {
649,326✔
UNCOV
822
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
823
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
UNCOV
824
    taosMemoryFreeClear(pRow);
×
825
  }
826

827
  return pRow;
649,326✔
828
}
829

830
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
155,899✔
831
  if (pConsumer == NULL) {
155,899✔
UNCOV
832
    return TSDB_CODE_INVALID_PARA;
×
833
  }
834
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
155,899✔
835
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
836
  pConsumer->subscribeTime = pConsumer->createTime;
155,899✔
837
  return 0;
155,899✔
838
}
839

840
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
649,326✔
841
  if (pConsumer == NULL) {
649,326✔
UNCOV
842
    return TSDB_CODE_INVALID_PARA;
×
843
  }
844
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
649,326✔
845
        mndConsumerStatusName(pConsumer->status));
846
  tClearSMqConsumerObj(pConsumer);
649,326✔
847
  return 0;
649,326✔
848
}
849

850
// remove from topic list
851
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
384,254✔
852
  if (topicList == NULL || pTopic == NULL) {
384,254✔
UNCOV
853
    return;
×
854
  }
855
  int32_t size = taosArrayGetSize(topicList);
384,254✔
856
  for (int32_t i = 0; i < size; i++) {
387,585✔
857
    char *p = taosArrayGetP(topicList, i);
383,971✔
858
    if (strcmp(pTopic, p) == 0) {
383,971✔
859
      taosArrayRemove(topicList, i);
380,640✔
860
      taosMemoryFree(p);
380,640✔
861

862
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
380,640✔
863
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
864
      break;
380,640✔
865
    }
866
  }
867
}
868

869
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
152,430✔
870
  if (pConsumer == NULL || pTopic == NULL) {
152,430✔
UNCOV
871
    return false;
×
872
  }
873
  bool    existing = false;
152,430✔
874
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
152,430✔
875
  for (int32_t i = 0; i < size; i++) {
155,285✔
876
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,855✔
877
    if (topic && strcmp(topic, pTopic) == 0) {
2,855✔
UNCOV
878
      existing = true;
×
879
      break;
×
880
    }
881
  }
882

883
  return existing;
152,430✔
884
}
885

886
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
387,449✔
887
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
387,449✔
UNCOV
888
    return TSDB_CODE_INVALID_PARA;
×
889
  }
890
  int32_t lino = 0;
387,449✔
891
  int32_t code = 0;
387,449✔
892
  char *pNewTopic = NULL;
387,449✔
893
  PRINT_LOG_START
387,449✔
894
  taosWLockLatch(&pOldConsumer->lock);
387,449✔
895
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
387,449✔
896
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
897

898
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
387,449✔
899
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
110,345✔
900
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
110,345✔
901
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
110,345✔
902

903
    pOldConsumer->subscribeTime = taosGetTimestampMs();
110,345✔
904
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
110,345✔
905
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
110,345✔
906
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
277,104✔
907
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
8,762✔
908
    pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
8,762✔
909
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
8,762✔
910
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
8,762✔
911
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
268,342✔
912
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
152,430✔
913
    MND_TMQ_NULL_CHECK(tmp);
152,430✔
914
    char *pNewTopic = taosStrdup(tmp);
152,430✔
915
    MND_TMQ_NULL_CHECK(pNewTopic);
152,430✔
916
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
152,430✔
917
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
152,430✔
918
    if (existing) {
152,430✔
UNCOV
919
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
920
    } else {
921
      MND_TMQ_NULL_CHECK(taosArrayPush(pOldConsumer->currentTopics, &pNewTopic));
304,860✔
922
      pNewTopic = NULL;
152,430✔
923
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
152,430✔
924
    }
925

926
    int32_t status = pOldConsumer->status;
152,430✔
927
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
152,430✔
928
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
149,850✔
929
    }
930

931
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
152,430✔
932
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
152,430✔
933

934
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
152,430✔
935
          ", current topics:%d, newTopics:%d, removeTopics:%d",
936
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
937
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
938
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
939
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
940

941
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
115,912✔
942
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
115,912✔
943
    MND_TMQ_NULL_CHECK(topic);
115,912✔
944
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
115,912✔
945
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
115,912✔
946

947
    int32_t status = pOldConsumer->status;
115,912✔
948
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
115,912✔
949
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
113,332✔
950
    }
951
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
115,912✔
952
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
115,912✔
953

954
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
115,912✔
955
          ", current topics:%d, newTopics:%d, removeTopics:%d",
956
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
957
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
958
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
959
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
960
  }
961

UNCOV
962
END:
×
963
  taosMemoryFree(pNewTopic);
387,449✔
964
  taosWUnLockLatch(&pOldConsumer->lock);
387,449✔
965

966
  PRINT_LOG_END
387,449✔
967

968
  return code;
387,449✔
969
}
970

971
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
5,738,366✔
972
  if (pMnode == NULL || pConsumer == NULL) {
5,738,366✔
UNCOV
973
    return TSDB_CODE_INVALID_PARA;
×
974
  }
975
  SSdb           *pSdb = pMnode->pSdb;
5,738,366✔
976
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
5,738,366✔
977
  if (*pConsumer == NULL) {
5,738,366✔
978
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
158,773✔
979
  }
980
  return 0;
5,579,593✔
981
}
982

983
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
20,652,964✔
984
  if (pMnode == NULL || pConsumer == NULL) {
20,652,964✔
985
    return;
13,666,344✔
986
  }
987
  SSdb *pSdb = pMnode->pSdb;
6,986,620✔
988
  sdbRelease(pSdb, pConsumer);
6,986,620✔
989
}
990

991
static int32_t buildResult(SMqConsumerObj *pConsumer, SShowObj *pShow, SSDataBlock *pBlock, int32_t numOfRows, const char* showTopic, bool hasTopic) {
45,774✔
992
  int32_t         code = 0;
45,774✔
993
  int32_t         lino = 0;
45,774✔
994
  SColumnInfoData *pColInfo = NULL;
45,774✔
995
  int32_t          cols = 0;
45,774✔
996
  char * parasStr = NULL;
45,774✔
997
  char           *status = NULL;
45,774✔
998

999
  PRINT_LOG_START
45,774✔
1000
  // consumer id
1001
  char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1002
  (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
45,774✔
1003
  varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
45,774✔
1004

1005
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1006
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1007
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
45,774✔
1008

1009
  // consumer group
1010
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1011
  STR_TO_VARSTR(cgroup, pConsumer->cgroup);
45,774✔
1012

1013
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1014
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1015
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
45,774✔
1016

1017
  // client id
1018
  char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1019
  STR_TO_VARSTR(clientId, pConsumer->clientId);
45,774✔
1020

1021
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1022
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1023
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
45,774✔
1024

1025
  // user
1026
  char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1027
  STR_TO_VARSTR(user, pConsumer->user);
45,774✔
1028

1029
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1030
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1031
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
45,774✔
1032

1033
  // fqdn
1034
  char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1035
  STR_TO_VARSTR(fqdn, pConsumer->fqdn);
45,774✔
1036

1037
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1038
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1039
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
45,774✔
1040

1041
  // status
1042
  const char *pStatusName = mndConsumerStatusName(pConsumer->status);
45,774✔
1043
  status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
45,774✔
1044
  MND_TMQ_NULL_CHECK(status);
45,774✔
1045
  STR_TO_VARSTR(status, pStatusName);
45,774✔
1046

1047
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1048
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1049
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
45,774✔
1050

1051
  // one subscribed topic
1052
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1053
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1054
  if (hasTopic) {
45,774✔
1055
    char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
45,774✔
1056
    mndTopicGetShowName(showTopic, topic + VARSTR_HEADER_SIZE);
45,774✔
1057
    *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
45,774✔
1058
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
45,774✔
1059
  } else {
UNCOV
1060
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1061
  }
1062

1063
  // up time
1064
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1065
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1066
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
45,774✔
1067

1068
  // subscribe time
1069
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1070
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1071
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
45,774✔
1072

1073
  // rebalance time
1074
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1075
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1076
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
45,774✔
1077

1078
  char         buf[TSDB_OFFSET_LEN] = {0};
45,774✔
1079
  STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
45,774✔
1080
  tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
45,774✔
1081

1082
  parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
45,774✔
1083
  MND_TMQ_NULL_CHECK(parasStr);
45,774✔
1084
  (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
91,548✔
1085
          pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
45,774✔
1086
  varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
45,774✔
1087

1088
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1089
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1090
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
45,774✔
1091

1092
  // rebalance time
1093
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
45,774✔
1094
  MND_TMQ_NULL_CHECK(pColInfo);
45,774✔
1095
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
45,774✔
1096

1097
END:
45,774✔
1098
  PRINT_LOG_END
45,774✔
1099
  taosMemoryFreeClear(status);
45,774✔
1100
  taosMemoryFreeClear(parasStr);
45,774✔
1101
  return code;
45,774✔
1102
}
1103

1104
static int32_t retrieveOneConsumer(SRpcMsg *pReq, SMqConsumerObj *pConsumer, SUserObj *pOperUser, bool showAll,
46,361✔
1105
                                   int32_t *numOfRows, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1106
  int32_t code = 0;
46,361✔
1107
  int32_t lino = 0;
46,361✔
1108
  SMnode *pMnode = pReq->info.node;
46,361✔
1109
  PRINT_LOG_START
46,361✔
1110
  taosRLockLatch(&pConsumer->lock);
46,361✔
1111
  mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
46,361✔
1112
  if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
46,361✔
1113
    mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
587✔
1114
    goto END;
587✔
1115
  }
1116

1117
  int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
45,774✔
1118
  bool    hasTopic = true;
45,774✔
1119
  if (topicSz == 0) {
45,774✔
UNCOV
1120
    hasTopic = false;
×
UNCOV
1121
    topicSz = 1;
×
1122
  }
1123

1124
  if (*numOfRows + topicSz > rowsCapacity) {
45,774✔
UNCOV
1125
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + topicSz));
×
1126
  }
1127

1128
  for (int32_t i = 0; i < topicSz; i++) {
91,548✔
1129
    char *pTopicFName = taosArrayGetP(pConsumer->assignedTopics, i);
45,774✔
1130
    if (!showAll && (strncmp(pOperUser->name, pConsumer->user, TSDB_USER_LEN) != 0)) {
45,774✔
NEW
1131
      bool         showConsumer = false;
×
NEW
1132
      SMqTopicObj *pTopic = NULL;
×
NEW
1133
      (void)mndAcquireTopic(pMnode, pTopicFName, &pTopic);
×
NEW
1134
      if (pTopic) {
×
NEW
1135
        SName name = {0};  // 1.topic1
×
NEW
1136
        if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
×
NEW
1137
          if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CONSUMER_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
×
NEW
1138
                                            pTopic->db, name.dbname)) {
×
NEW
1139
            showConsumer = true;
×
1140
          }
1141
        }
NEW
1142
        mndReleaseTopic(pMnode, pTopic);
×
1143
      }
NEW
1144
      if (!showConsumer) {
×
NEW
1145
        continue;
×
1146
      }
1147
    }
1148
    // char  topic[TSDB_TOPIC_FNAME_LEN] = {0};
1149
    // mndTopicGetShowName(showTopic, topic);
1150
    MND_TMQ_RETURN_CHECK(buildResult(pConsumer, pShow, pBlock, *numOfRows, pTopicFName, hasTopic));
45,774✔
1151
    (*numOfRows)++;
45,774✔
1152
  }
1153

1154
END:
45,774✔
1155
  taosRUnLockLatch(&pConsumer->lock);
46,361✔
1156
  PRINT_LOG_END
46,361✔
1157
  return code;
46,361✔
1158
}
1159

1160
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
15,391✔
1161
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
15,391✔
1162
    return TSDB_CODE_INVALID_PARA;
×
1163
  }
1164
  SMnode         *pMnode = pReq->info.node;
15,391✔
1165
  SSdb           *pSdb = pMnode->pSdb;
15,391✔
1166
  int32_t         numOfRows = 0;
15,391✔
1167
  SMqConsumerObj *pConsumer = NULL;
15,391✔
1168
  SUserObj       *pOperUser = NULL;
15,391✔
1169
  int32_t         code = 0;
15,391✔
1170
  int32_t         lino = 0;
15,391✔
1171
  bool            showAll = false;
15,391✔
1172
  char            objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
15,391✔
1173
  PRINT_LOG_START
15,391✔
1174

1175
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
15,391✔
1176
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
15,391✔
1177
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_CONSUMER_SHOW,
15,391✔
1178
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1179
  while (numOfRows < rowsCapacity) {
61,752✔
1180
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
61,752✔
1181
    if (pShow->pIter == NULL) {
61,752✔
1182
      break;
15,391✔
1183
    }
1184
    MND_TMQ_RETURN_CHECK(retrieveOneConsumer(pReq, pConsumer, pOperUser, showAll,  &numOfRows, pShow, pBlock, rowsCapacity));
46,361✔
1185
    
1186
    pBlock->info.rows = numOfRows;
46,361✔
1187
    sdbRelease(pSdb, pConsumer);
46,361✔
1188
    pConsumer = NULL;
46,361✔
1189
  }
1190

1191
  pShow->numOfRows += numOfRows;
15,391✔
1192

1193
END:
15,391✔
1194
  sdbRelease(pSdb, pConsumer);
15,391✔
1195
  sdbCancelFetch(pSdb, pShow->pIter);
15,391✔
1196
  if (code != 0) {
15,391✔
UNCOV
1197
    mError("show consumer failed, code:%d", code);
×
UNCOV
1198
    return code;
×
1199
  } else {
1200
    mDebug("show consumer processed, numOfRows:%d", numOfRows);
15,391✔
1201
    return numOfRows;
15,391✔
1202
  }
1203
}
1204

UNCOV
1205
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
UNCOV
1206
  if (pMnode == NULL || pIter == NULL) return;
×
UNCOV
1207
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1208
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1209
}
1210

1211
const char *mndConsumerStatusName(int status) {
3,900,154✔
1212
  switch (status) {
3,900,154✔
1213
    case MQ_CONSUMER_STATUS_READY:
1,811,722✔
1214
      return "ready";
1,811,722✔
1215
    case MQ_CONSUMER_STATUS_REBALANCE:
2,088,432✔
1216
      return "rebalancing";
2,088,432✔
UNCOV
1217
    default:
×
UNCOV
1218
      return "unknown";
×
1219
  }
1220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc