• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4893

20 Dec 2025 01:15PM UTC coverage: 65.57% (-0.001%) from 65.571%
#4893

push

travis-ci

web-flow
feat: support taos_connect_with func (#33952)

* feat: support taos_connect_with

* refactor: enhance connection options and add tests for taos_set_option and taos_connect_with

* fix: handle NULL keys and values in taos_connect_with options

* fix: revert TAOSWS_GIT_TAG to default value "main"

* docs: add TLS configuration options for WebSocket connections in documentation

* docs: modify zh docs and add en docs

* chore: update taos.cfg

* docs: add examples

* docs: add error handling for connection failure in example code

2 of 82 new or added lines in 3 files covered. (2.44%)

1158 existing lines in 109 files now uncovered.

182854 of 278870 relevant lines covered (65.57%)

105213271.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.58
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
504,668✔
45
  SSdbTable table = {
504,668✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
504,668✔
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
504,668✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
504,668✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
504,668✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
504,668✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
504,668✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
504,668✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
504,668✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
504,548✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
32,199✔
72
  if (pMnode == NULL || info == NULL) {
32,199✔
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
32,199✔
76
  int32_t lino = 0;
32,199✔
77
  PRINT_LOG_START
32,199✔
78
  void   *msg  = rpcMallocCont(sizeof(int64_t));
32,199✔
79
  MND_TMQ_NULL_CHECK(msg);
32,199✔
80

81
  *(int64_t*)msg = consumerId;
32,199✔
82
  SRpcMsg rpcMsg = {
32,199✔
83
      .msgType = msgType,
84
      .pCont = msg,
85
      .contLen = sizeof(int64_t),
86
      .info = *info,
87
  };
88

89
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
32,199✔
90
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
32,199✔
91

92
END:
32,199✔
93
  PRINT_LOG_END
32,199✔
94
  return code;
32,199✔
95
}
96

97
static int32_t validateOneTopic(STrans* pTrans,char *pOneTopic, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
39,939✔
98
  int32_t      code = 0;
39,939✔
99
  int32_t lino = 0;
39,939✔
100
  SMqTopicObj *pTopic = NULL;
39,939✔
101

102
  PRINT_LOG_START
39,939✔
103
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
39,939✔
104
  taosRLockLatch(&pTopic->lock);
39,843✔
105

106
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
39,843✔
107
  MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
39,725✔
108

109
  if (subscribe->enableReplay) {
39,725✔
110
    if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
245✔
111
      code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
70✔
112
      goto END;
70✔
113
    } 
114
    if (pTopic->stbName[0] != 0) {
175✔
115
      SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
140✔
116
      if (pDb == NULL) {
140✔
117
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
        goto END;
×
119
      }
120
      if (pDb->cfg.numOfVgroups != 1) {
140✔
121
        mndReleaseDb(pMnode, pDb);
35✔
122
        code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
35✔
123
        goto END;
35✔
124
      }
125
      mndReleaseDb(pMnode, pDb);
105✔
126
    }
127
  }
128
  char  key[TSDB_CONSUMER_ID_LEN] = {0};
39,620✔
129
  (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
39,620✔
130
  mndTransSetDbName(pTrans, pTopic->db, key);
39,620✔
131
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
39,620✔
132

133
END:
39,939✔
134
  PRINT_LOG_END
39,939✔
135
  if (pTopic != NULL) {
39,939✔
136
    taosRUnLockLatch(&pTopic->lock);
39,843✔
137
  }
138
  mndReleaseTopic(pMnode, pTopic);
39,939✔
139
  return code;
39,939✔
140
}
141

142
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
73,641✔
143
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
73,641✔
144
    return TSDB_CODE_INVALID_PARA;
×
145
  }
146
  int32_t      code = 0;
73,641✔
147
  int32_t lino = 0;
73,641✔
148

149
  PRINT_LOG_START
73,641✔
150
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
73,641✔
151
  for (int32_t i = 0; i < numOfTopics; i++) {
113,261✔
152
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
39,939✔
153
    MND_TMQ_RETURN_CHECK(validateOneTopic(pTrans, pOneTopic, subscribe, pMnode, pUser));
39,939✔
154
  }
155

156
END:
73,322✔
157
  PRINT_LOG_END
73,641✔
158
  return code;
73,641✔
159
}
160

161
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
32,163✔
162
  if (pMsg == NULL || pMsg->pCont == NULL) {
32,163✔
163
    return TSDB_CODE_INVALID_PARA;
×
164
  }
165
  int32_t              code = 0;
32,163✔
166
  int32_t              lino = 0;
32,163✔
167
  SMnode              *pMnode = pMsg->info.node;
32,163✔
168
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
32,163✔
169
  SMqConsumerObj      *pConsumerNew = NULL;
32,163✔
170
  STrans              *pTrans = NULL;
32,163✔
171
  SMqConsumerObj      *pConsumer = NULL;
32,163✔
172

173
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
32,163✔
174
  taosRLockLatch(&pConsumer->lock);
32,163✔
175
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
32,163✔
176
        mndConsumerStatusName(pConsumer->status));
177

178
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
32,163✔
179

180
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
32,163✔
181
  MND_TMQ_NULL_CHECK(pTrans);
32,163✔
182
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
32,163✔
183
  code = mndTransPrepare(pMnode, pTrans);
32,163✔
184

185
END:
32,163✔
186
  PRINT_LOG_END
32,163✔
187
  if (pConsumer != NULL) {
32,163✔
188
    taosRUnLockLatch(&pConsumer->lock);
32,163✔
189
  }
190
  mndReleaseConsumer(pMnode, pConsumer);
32,163✔
191
  tDeleteSMqConsumerObj(pConsumerNew);
32,163✔
192
  mndTransDrop(pTrans);
32,163✔
193
  return code;
32,163✔
194
}
195

196
static void checkOnePrivilege(const char* topic, SMnode *pMnode, SMqHbRsp *rsp, const char *user) {
276,825✔
197
  int32_t code = 0;
276,825✔
198
  int32_t lino = 0;
276,825✔
199
  SMqTopicObj *pTopic = NULL;
276,825✔
200
  PRINT_LOG_START
276,825✔
201
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
276,825✔
202
  taosRLockLatch(&pTopic->lock);
276,825✔
203
  STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
276,825✔
204
  MND_TMQ_NULL_CHECK(data);
276,825✔
205
  tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
276,825✔
206
  if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
553,178✔
207
      grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
276,353✔
208
    data->noPrivilege = 1;
472✔
209
  } else {
210
    data->noPrivilege = 0;
276,353✔
211
  }
212

213
END:
276,825✔
214
  PRINT_LOG_END
276,825✔
215
  if (pTopic != NULL) {
276,825✔
216
    taosRUnLockLatch(&pTopic->lock);
276,825✔
217
  }
218
  mndReleaseTopic(pMnode, pTopic);
276,825✔
219
}
276,825✔
220

221
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
301,446✔
222
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
301,446✔
223
    return TSDB_CODE_INVALID_PARA;
×
224
  }
225
  int32_t code = 0;
301,446✔
226
  int32_t lino = 0;
301,446✔
227
  PRINT_LOG_START
301,446✔
228

229
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
301,446✔
230
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
301,446✔
231
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
578,271✔
232
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
276,825✔
233
    checkOnePrivilege(topic, pMnode, rsp, user);
276,825✔
234
  }
235

236
END:
301,446✔
237
  PRINT_LOG_END
301,446✔
238
  return code;
301,446✔
239
}
240

241
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
301,446✔
242
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
301,446✔
243
    return;
×
244
  }
245
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
580,485✔
246
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
279,039✔
247
    if (data == NULL){
279,039✔
248
      continue;
×
249
    }
250
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
279,039✔
251

252
    SMqSubscribeObj *pSub = NULL;
279,039✔
253
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
279,039✔
254
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
279,039✔
255
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
279,039✔
256
    if (code != 0) {
279,039✔
257
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
258
      continue;
×
259
    }
260
    taosWLockLatch(&pSub->lock);
279,039✔
261
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
279,039✔
262
    if (pConsumerEp) {
279,039✔
263
      taosArrayDestroy(pConsumerEp->offsetRows);
274,291✔
264
      pConsumerEp->offsetRows = data->offsetRows;
274,291✔
265
      data->offsetRows = NULL;
274,291✔
266
    }
267
    taosWUnLockLatch(&pSub->lock);
279,039✔
268

269
    mndReleaseSubscribe(pMnode, pSub);
279,039✔
270
  }
271
}
272

273
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
301,446✔
274
  if (pMsg == NULL || rsp == NULL){
301,446✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
301,446✔
278
  if (tlen <= 0){
301,446✔
279
    return TSDB_CODE_TMQ_INVALID_MSG;
×
280
  }
281
  void   *buf = rpcMallocCont(tlen);
301,446✔
282
  if (buf == NULL) {
301,446✔
283
    return terrno;
×
284
  }
285

286
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
301,446✔
287
    rpcFreeCont(buf);
×
288
    return TSDB_CODE_TMQ_INVALID_MSG;
×
289
  }
290
  pMsg->info.rsp = buf;
301,446✔
291
  pMsg->info.rspLen = tlen;
301,446✔
292
  return 0;
301,446✔
293
}
294

295
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
304,619✔
296
  if (pMsg == NULL) {
304,619✔
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
299
  int32_t         code = 0;
304,619✔
300
  int32_t         lino = 0;
304,619✔
301
  SMnode         *pMnode = pMsg->info.node;
304,619✔
302
  SMqHbReq        req = {0};
304,619✔
303
  SMqHbRsp        rsp = {0};
304,619✔
304
  SMqConsumerObj *pConsumer = NULL;
304,619✔
305
  PRINT_LOG_START
304,619✔
306

307
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
304,619✔
308
  int64_t consumerId = req.consumerId;
304,619✔
309
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
304,619✔
310
  taosWLockLatch(&pConsumer->lock);
301,446✔
311
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
301,446✔
312
  atomic_store_32(&pConsumer->hbStatus, 0);
301,446✔
313
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
301,446✔
314
  if (req.pollFlag == 1){
301,446✔
315
    atomic_store_32(&pConsumer->pollStatus, 0);
166,760✔
316
    pConsumer->pollTime = taosGetTimestampMs();
333,520✔
317
  }
318

319
  storeOffsetRows(pMnode, &req, pConsumer);
301,446✔
320
  rsp.debugFlag = tqClientDebugFlag;
301,446✔
321
  code = buildMqHbRsp(pMsg, &rsp);
301,446✔
322

323
END:
304,619✔
324
  if (pConsumer != NULL) {
304,619✔
325
    taosWUnLockLatch(&pConsumer->lock);
301,446✔
326
  }
327
  tDestroySMqHbRsp(&rsp);
304,619✔
328
  mndReleaseConsumer(pMnode, pConsumer);
304,619✔
329
  tDestroySMqHbReq(&req);
304,619✔
330
  PRINT_LOG_END
304,619✔
331
  return code;
304,619✔
332
}
333

334
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp, int32_t epoch) {
94,500✔
335
  int32_t         code = 0;
94,500✔
336
  int32_t         lino = 0;
94,500✔
337
  SMqSubscribeObj *pSub = NULL;
94,500✔
338
  SMqSubTopicEp topicEp = {0};
94,500✔
339
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
94,500✔
340
  PRINT_LOG_START
94,500✔
341
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
94,500✔
342
  if(mndAcquireSubscribeByKey(pMnode, key, &pSub) != 0) {
94,500✔
343
    mWarn("%s failed to acquire subscribe by key:%s", __func__, key);
×
344
    goto END;
×
345
  }
346
  tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
94,500✔
347

348
  taosWLockLatch(&pSub->lock);
94,500✔
349
  // 2.2 iterate all vg assigned to the consumer of that topic
350
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
94,500✔
351
  MND_TMQ_NULL_CHECK(pConsumerEp);
94,500✔
352
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
94,500✔
353
  topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
94,500✔
354
  MND_TMQ_NULL_CHECK(topicEp.vgs);
94,500✔
355

356
  tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
94,500✔
357
  for (int32_t j = 0; j < vgNum; j++) {
295,915✔
358
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
201,415✔
359
    if (pVgEp == NULL) {
201,415✔
360
      continue;
×
361
    }
362
    if (epoch == -1) {
201,415✔
363
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
54,586✔
364
      if (pVgroup) {
54,586✔
365
        pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,418✔
366
        mndReleaseVgroup(pMnode, pVgroup);
4,418✔
367
      }
368
    }
369
    SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
201,415✔
370
    MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
402,830✔
371
  }
372
  MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
189,000✔
373
  topicEp.vgs = NULL;
94,500✔
374

375
END:
94,500✔
376
  if (pSub != NULL) {
94,500✔
377
    taosWUnLockLatch(&pSub->lock);
94,500✔
378
  }
379
  taosArrayDestroy(topicEp.vgs);
94,500✔
380
  mndReleaseSubscribe(pMnode, pSub);
94,500✔
381
  PRINT_LOG_END
94,500✔
382
  return code;
94,500✔
383
}
384

385
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
128,334✔
386
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
128,334✔
387
    return TSDB_CODE_INVALID_PARA;
×
388
  }
389
  int32_t code = 0;
128,334✔
390
  int32_t lino = 0;
128,334✔
391
  PRINT_LOG_START
128,334✔
392

393
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
128,334✔
394
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
128,334✔
395
  MND_TMQ_NULL_CHECK(rsp->topics);
128,334✔
396

397
  // handle all topics subscribed by this consumer
398
  for (int32_t i = 0; i < numOfTopics; i++) {
222,834✔
399
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
94,500✔
400
    MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp, epoch));
94,500✔
401
  }
402

403
END:
128,334✔
404
  PRINT_LOG_END
128,334✔
405
  return code;
128,334✔
406
}
407

408
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
772,627✔
409
  if (pMsg == NULL || rsp == NULL) {
772,627✔
UNCOV
410
    return TSDB_CODE_INVALID_PARA;
×
411
  }
412
  int32_t code = 0;
772,627✔
413
  // encode rsp
414
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
772,627✔
415
  void   *buf = rpcMallocCont(tlen);
772,627✔
416
  if (buf == NULL) {
772,627✔
417
    return terrno;
×
418
  }
419

420
  SMqRspHead *pHead = buf;
772,627✔
421

422
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
772,627✔
423
  pHead->epoch = serverEpoch;
772,627✔
424
  pHead->consumerId = consumerId;
772,627✔
425
  pHead->walsver = 0;
772,627✔
426
  pHead->walever = 0;
772,627✔
427

428
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
772,609✔
429
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
772,609✔
430
    rpcFreeCont(buf);
×
431
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
432
  }
433

434
  // send rsp
435
  pMsg->info.rsp = buf;
772,609✔
436
  pMsg->info.rspLen = tlen;
772,609✔
437
  return code;
772,591✔
438
}
439

440
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
985,070✔
441
  if (pMsg == NULL) {
985,070✔
442
    return TSDB_CODE_INVALID_PARA;
×
443
  }
444
  SMnode     *pMnode = pMsg->info.node;
985,070✔
445
  SMqAskEpReq req = {0};
985,070✔
446
  SMqAskEpRsp rsp = {0};
985,070✔
447
  int32_t     code = 0;
985,070✔
448
  int32_t     lino = 0;
985,070✔
449
  SMqConsumerObj *pConsumer = NULL;
985,070✔
450
  PRINT_LOG_START
985,070✔
451

452
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
985,070✔
453
  int64_t consumerId = req.consumerId;
985,070✔
454
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
985,070✔
455
  taosRLockLatch(&pConsumer->lock);
981,590✔
456
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
981,590✔
457
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
458
           pConsumer->cgroup);
459
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
460
    goto END;
×
461
  }
462

463
  // 1. check consumer status
464
  int32_t status = atomic_load_32(&pConsumer->status);
981,572✔
465
  if (status != MQ_CONSUMER_STATUS_READY) {
981,590✔
466
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
208,963✔
467
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
208,963✔
468
    goto END;
208,963✔
469
  }
470

471
  int32_t epoch = req.epoch;
772,627✔
472
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
772,627✔
473

474
  // 2. check epoch, only send ep info when epochs do not match
475
  if (epoch != serverEpoch) {
772,609✔
476
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
128,334✔
477
          consumerId, epoch, serverEpoch);
478
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
128,334✔
479
  }
480

481
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
772,609✔
482

483
END:
985,034✔
484
  if (pConsumer != NULL) {
985,034✔
485
    taosRUnLockLatch(&pConsumer->lock);
981,554✔
486
  }
487
  tDeleteSMqAskEpRsp(&rsp);
488
  mndReleaseConsumer(pMnode, pConsumer);
985,052✔
489
  PRINT_LOG_END
985,070✔
490
  return code;
985,070✔
491
}
492

493
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
32,654✔
494
  if (pConsumer == NULL || pTrans == NULL) {
32,654✔
495
    return TSDB_CODE_INVALID_PARA;
×
496
  }
497
  int32_t  code = 0;
32,654✔
498
  int32_t lino = 0;
32,654✔
499
  PRINT_LOG_START
32,654✔
500
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
32,654✔
501
  MND_TMQ_NULL_CHECK(pCommitRaw);
32,654✔
502
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
32,654✔
503
  if (code != 0) {
32,654✔
504
    sdbFreeRaw(pCommitRaw);
×
505
    goto END;
×
506
  }
507
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
32,654✔
508

509
END:
32,654✔
510
  PRINT_LOG_END
32,654✔
511
  return code;
32,654✔
512
}
513

514
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
149,088✔
515
  if (pConsumer == NULL || pTrans == NULL) {
149,088✔
516
    return TSDB_CODE_INVALID_PARA;
×
517
  }
518
  int32_t  code = 0;
149,088✔
519
  int32_t lino = 0;
149,088✔
520
  PRINT_LOG_START
149,088✔
521
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
149,088✔
522
  MND_TMQ_NULL_CHECK(pCommitRaw);
149,088✔
523
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
149,088✔
524
  if (code != 0) {
149,088✔
525
    sdbFreeRaw(pCommitRaw);
×
526
    goto END;
×
527
  }
528
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
149,088✔
529
END:
149,088✔
530

531
  return code;
149,088✔
532
}
533

534
static void freeItem(void *param) {
73✔
535
  if (param == NULL) {
73✔
536
    return;
×
537
  }
538
  void *pItem = *(void **)param;
73✔
539
  if (pItem != NULL) {
73✔
540
    taosMemoryFree(pItem);
73✔
541
  }
542
}
543

544
#define ADD_TOPIC_TO_ARRAY(element, array) \
545
char *newTopicCopy = taosStrdup(element); \
546
MND_TMQ_NULL_CHECK(newTopicCopy);\
547
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
548
  taosMemoryFree(newTopicCopy);\
549
  code = terrno;\
550
  goto END;\
551
}
552

553
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
35,430✔
554
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
35,430✔
555
    return TSDB_CODE_INVALID_PARA;
×
556
  }
557
  int32_t code = 0;
35,430✔
558
  int32_t lino = 0;
35,430✔
559
  PRINT_LOG_START
35,430✔
560
  taosRLockLatch(&pExistedConsumer->lock);
35,430✔
561

562
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
35,430✔
563
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
35,430✔
564
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
35,430✔
565
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
35,430✔
566

567
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
35,430✔
568
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
35,430✔
569
  int32_t i = 0, j = 0;
35,430✔
570
  while (i < oldTopicNum || j < newTopicNum) {
71,417✔
571
    if (i >= oldTopicNum) {
35,987✔
572
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
354✔
573
      MND_TMQ_NULL_CHECK(tmp);
354✔
574
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
708✔
575
      j++;
354✔
576
      continue;
354✔
577
    } else if (j >= newTopicNum) {
35,633✔
578
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
34,816✔
579
      MND_TMQ_NULL_CHECK(tmp);
34,816✔
580
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
69,632✔
581
      i++;
34,816✔
582
      continue;
34,816✔
583
    } else {
584
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
817✔
585
      MND_TMQ_NULL_CHECK(oldTopic);
817✔
586
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
817✔
587
      MND_TMQ_NULL_CHECK(newTopic);
817✔
588
      int   comp = strcmp(oldTopic, newTopic);
817✔
589
      if (comp == 0) {
817✔
590
        i++;
817✔
591
        j++;
817✔
592
        continue;
817✔
593
      } else if (comp < 0) {
×
594
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
595
        i++;
×
596
        continue;
×
597
      } else {
×
598
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
599
        j++;
×
600
        continue;
×
601
      }
602
    }
603
  }
604
  // no topics need to be rebalanced
605
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
35,430✔
606
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
817✔
607
  }
608

609
END:
34,613✔
610
  taosRUnLockLatch(&pExistedConsumer->lock);
35,430✔
611
  PRINT_LOG_END
35,430✔
612
  return code;
35,430✔
613
}
614

615
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
73,659✔
616
  if (pTopicList == NULL || pMnode == NULL) {
73,659✔
617
    return TSDB_CODE_INVALID_PARA;
×
618
  }
619
  taosArraySort(pTopicList, taosArrayCompareString);
73,659✔
620
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
73,659✔
621

622
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
73,659✔
623
  for (int i = 0; i < newTopicNum; i++) {
113,598✔
624
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
39,957✔
625
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
39,957✔
626
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
18✔
627
    }
628
  }
629
  return 0;
73,641✔
630
}
631

632
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
73,322✔
633
  if (pMnode == NULL || subscribe == NULL) {
73,322✔
634
    return TSDB_CODE_INVALID_PARA;
×
635
  }
636
  int64_t         consumerId = subscribe->consumerId;
73,322✔
637
  char           *cgroup     = subscribe->cgroup;
73,322✔
638
  SMqConsumerObj *pConsumerNew     = NULL;
73,322✔
639
  SMqConsumerObj *pExistedConsumer = NULL;
73,322✔
640
  int32_t lino = 0;
73,322✔
641
  PRINT_LOG_START
73,322✔
642
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
73,322✔
643
  if (code != 0) {
73,322✔
644
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
37,892✔
645
              ",cgroup:%s, numOfTopics:%d", consumerId,
646
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
647

648
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
37,892✔
649
  } else {
650
    int32_t status = atomic_load_32(&pExistedConsumer->status);
35,430✔
651

652
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
35,430✔
653
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
654
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
655
          (int32_t)taosArrayGetSize(subscribe->topicNames));
656

657
    if (status != MQ_CONSUMER_STATUS_READY) {
35,430✔
658
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
659
      goto END;
×
660
    }
661
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
35,430✔
662
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
35,430✔
663
  }
664
  if (ppConsumer){
72,505✔
665
    *ppConsumer = pConsumerNew;
72,505✔
666
    pConsumerNew = NULL;
72,505✔
667
  }
668

669
END:
73,304✔
670
  PRINT_LOG_END
73,322✔
671
  mndReleaseConsumer(pMnode, pExistedConsumer);
73,322✔
672
  tDeleteSMqConsumerObj(pConsumerNew);
73,322✔
673
  return code;
73,322✔
674
}
675

676
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
103,194✔
677
  if (pMsg == NULL) {
103,194✔
678
    return TSDB_CODE_INVALID_PARA;
×
679
  }
680
  SMnode *pMnode = pMsg->info.node;
103,194✔
681
  char   *msgStr = pMsg->pCont;
103,194✔
682
  int32_t code = 0;
103,194✔
683
  int32_t lino = 0;
103,194✔
684
  SMqConsumerObj *pConsumerNew = NULL;
103,194✔
685
  STrans         *pTrans = NULL;
103,194✔
686

687
  PRINT_LOG_START
103,194✔
688
  SCMSubscribeReq subscribe = {0};
103,194✔
689
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
206,388✔
690
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
103,194✔
691
  if(unSubscribe){
103,194✔
692
    SMqConsumerObj *pConsumerTmp = NULL;
63,794✔
693
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
63,794✔
694
    taosRLockLatch(&pConsumerTmp->lock);
62,125✔
695
    size_t topicNum = taosArrayGetSize(pConsumerTmp->assignedTopics);
62,125✔
696
    taosRUnLockLatch(&pConsumerTmp->lock);
62,125✔
697
    mndReleaseConsumer(pMnode, pConsumerTmp);
62,125✔
698
    if (topicNum == 0){
62,125✔
699
      goto END;
27,866✔
700
    }
701
  }
702
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
73,659✔
703
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
73,641✔
704
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
705
                          pMsg, "subscribe");
706
  MND_TMQ_NULL_CHECK(pTrans);
73,641✔
707

708
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
73,641✔
709
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
73,322✔
710
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
72,505✔
711
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
72,505✔
712
  code = TSDB_CODE_ACTION_IN_PROGRESS;
72,505✔
713

714
END:
103,194✔
715
  mndTransDrop(pTrans);
103,194✔
716
  tDeleteSMqConsumerObj(pConsumerNew);
103,194✔
717
  taosArrayDestroyP(subscribe.topicNames, NULL);
103,194✔
718
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
103,194✔
719
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS){
103,194✔
720
    mError("tmq subscribe request from consumer:0x%" PRIx64 " failed, code:%d", subscribe.consumerId, code);
337✔
721
  } else {
722
    mInfo("tmq subscribe request from consumer:0x%" PRIx64 " processed, code:%d", subscribe.consumerId, code);
102,857✔
723
  }
724
  return code;
103,194✔
725
}
726

727
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
191,476✔
728
  if (pConsumer == NULL) {
191,476✔
729
    return NULL;
×
730
  }
731
  int32_t code = 0;
191,476✔
732
  int32_t lino = 0;
191,476✔
733
  terrno = TSDB_CODE_OUT_OF_MEMORY;
191,476✔
734

735
  void   *buf = NULL;
191,476✔
736
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
191,476✔
737
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
191,476✔
738

739
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
191,476✔
740
  if (pRaw == NULL) goto CM_ENCODE_OVER;
191,476✔
741

742
  buf = taosMemoryMalloc(tlen);
191,476✔
743
  if (buf == NULL) goto CM_ENCODE_OVER;
191,476✔
744

745
  void *abuf = buf;
191,476✔
746
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
191,476✔
747
    goto CM_ENCODE_OVER;
×
748
  }
749

750
  int32_t dataPos = 0;
191,476✔
751
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
191,476✔
752
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
191,476✔
753
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
191,476✔
754
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
191,476✔
755

756
  terrno = TSDB_CODE_SUCCESS;
191,476✔
757

758
CM_ENCODE_OVER:
191,476✔
759
  taosMemoryFreeClear(buf);
191,476✔
760
  if (terrno != 0) {
191,476✔
761
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
762
    sdbFreeRaw(pRaw);
×
763
    return NULL;
×
764
  }
765

766
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
191,476✔
767
  return pRaw;
191,476✔
768
}
769

770
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
182,816✔
771
  if (pRaw == NULL) {
182,816✔
772
    return NULL;
×
773
  }
774
  int32_t         code = 0;
182,816✔
775
  int32_t         lino = 0;
182,816✔
776
  SSdbRow        *pRow = NULL;
182,816✔
777
  SMqConsumerObj *pConsumer = NULL;
182,816✔
778
  void           *buf = NULL;
182,816✔
779

780
  terrno = 0;
182,816✔
781
  int8_t sver = 0;
182,816✔
782
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
182,816✔
783
    goto CM_DECODE_OVER;
×
784
  }
785

786
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
182,816✔
787
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
788
    goto CM_DECODE_OVER;
×
789
  }
790

791
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
182,816✔
792
  if (pRow == NULL) {
182,816✔
793
    goto CM_DECODE_OVER;
×
794
  }
795

796
  pConsumer = sdbGetRowObj(pRow);
182,816✔
797
  if (pConsumer == NULL) {
182,816✔
798
    goto CM_DECODE_OVER;
×
799
  }
800

801
  int32_t dataPos = 0;
182,816✔
802
  int32_t len;
182,774✔
803
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
182,816✔
804
  buf = taosMemoryMalloc(len);
182,816✔
805
  if (buf == NULL) {
182,816✔
806
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
807
    goto CM_DECODE_OVER;
×
808
  }
809

810
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
182,816✔
811
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
182,816✔
812

813
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
182,816✔
814
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
815
    goto CM_DECODE_OVER;
×
816
  }
817

818
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
182,816✔
819

820
CM_DECODE_OVER:
182,816✔
821
  taosMemoryFreeClear(buf);
182,816✔
822
  if (terrno != TSDB_CODE_SUCCESS) {
182,816✔
823
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
824
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
825
    taosMemoryFreeClear(pRow);
×
826
  }
827

828
  return pRow;
182,816✔
829
}
830

831
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
38,794✔
832
  if (pConsumer == NULL) {
38,794✔
833
    return TSDB_CODE_INVALID_PARA;
×
834
  }
835
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
38,794✔
836
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
837
  pConsumer->subscribeTime = pConsumer->createTime;
38,794✔
838
  return 0;
38,794✔
839
}
840

841
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
182,816✔
842
  if (pConsumer == NULL) {
182,816✔
843
    return TSDB_CODE_INVALID_PARA;
×
844
  }
845
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
182,816✔
846
        mndConsumerStatusName(pConsumer->status));
847
  tClearSMqConsumerObj(pConsumer);
182,816✔
848
  return 0;
182,816✔
849
}
850

851
// remove from topic list
852
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
111,615✔
853
  if (topicList == NULL || pTopic == NULL) {
111,615✔
854
    return;
×
855
  }
856
  int32_t size = taosArrayGetSize(topicList);
111,615✔
857
  for (int32_t i = 0; i < size; i++) {
112,587✔
858
    char *p = taosArrayGetP(topicList, i);
111,049✔
859
    if (strcmp(pTopic, p) == 0) {
111,049✔
860
      taosArrayRemove(topicList, i);
110,077✔
861
      taosMemoryFree(p);
110,077✔
862

863
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
110,077✔
864
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
865
      break;
110,077✔
866
    }
867
  }
868
}
869

870
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
38,871✔
871
  if (pConsumer == NULL || pTopic == NULL) {
38,871✔
872
    return false;
×
873
  }
874
  bool    existing = false;
38,871✔
875
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
38,871✔
876
  for (int32_t i = 0; i < size; i++) {
39,446✔
877
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
575✔
878
    if (topic && strcmp(topic, pTopic) == 0) {
575✔
879
      existing = true;
×
880
      break;
×
881
    }
882
  }
883

884
  return existing;
38,871✔
885
}
886

887
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
111,415✔
888
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
111,415✔
889
    return TSDB_CODE_INVALID_PARA;
×
890
  }
891
  int32_t lino = 0;
111,415✔
892
  int32_t code = 0;
111,415✔
893
  char *pNewTopic = NULL;
111,415✔
894
  PRINT_LOG_START
111,415✔
895
  taosWLockLatch(&pOldConsumer->lock);
111,415✔
896
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
111,415✔
897
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
898

899
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
111,415✔
900
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
34,667✔
901
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
34,667✔
902
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
34,667✔
903

904
    pOldConsumer->subscribeTime = taosGetTimestampMs();
34,667✔
905
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
34,667✔
906
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
34,667✔
907
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
76,748✔
908
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
1,505✔
909
    pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
1,505✔
910
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
1,505✔
911
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
1,505✔
912
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
75,243✔
913
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
38,871✔
914
    MND_TMQ_NULL_CHECK(tmp);
38,871✔
915
    char *pNewTopic = taosStrdup(tmp);
38,871✔
916
    MND_TMQ_NULL_CHECK(pNewTopic);
38,871✔
917
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
38,871✔
918
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
38,871✔
919
    if (existing) {
38,871✔
920
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
921
    } else {
922
      MND_TMQ_NULL_CHECK(taosArrayPush(pOldConsumer->currentTopics, &pNewTopic));
77,742✔
923
      pNewTopic = NULL;
38,871✔
924
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
38,871✔
925
    }
926

927
    int32_t status = pOldConsumer->status;
38,871✔
928
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
38,871✔
929
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
38,314✔
930
    }
931

932
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
38,871✔
933
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
38,871✔
934

935
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
38,871✔
936
          ", current topics:%d, newTopics:%d, removeTopics:%d",
937
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
938
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
939
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
940
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
941

942
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
36,372✔
943
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
36,372✔
944
    MND_TMQ_NULL_CHECK(topic);
36,372✔
945
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
36,372✔
946
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
36,372✔
947

948
    int32_t status = pOldConsumer->status;
36,372✔
949
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
36,372✔
950
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
35,815✔
951
    }
952
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
36,372✔
953
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
36,372✔
954

955
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
36,372✔
956
          ", current topics:%d, newTopics:%d, removeTopics:%d",
957
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
958
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
959
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
960
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
961
  }
962

963
END:
×
964
  taosMemoryFree(pNewTopic);
111,415✔
965
  taosWUnLockLatch(&pOldConsumer->lock);
111,415✔
966

967
  PRINT_LOG_END
111,415✔
968

969
  return code;
111,415✔
970
}
971

972
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
1,458,968✔
973
  if (pMnode == NULL || pConsumer == NULL) {
1,458,968✔
974
    return TSDB_CODE_INVALID_PARA;
×
975
  }
976
  SSdb           *pSdb = pMnode->pSdb;
1,458,968✔
977
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
1,458,968✔
978
  if (*pConsumer == NULL) {
1,458,968✔
979
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
46,214✔
980
  }
981
  return 0;
1,412,754✔
982
}
983

984
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
16,984,968✔
985
  if (pMnode == NULL || pConsumer == NULL) {
16,984,968✔
986
    return;
15,154,063✔
987
  }
988
  SSdb *pSdb = pMnode->pSdb;
1,830,905✔
989
  sdbRelease(pSdb, pConsumer);
1,830,905✔
990
}
991

992
static int32_t buildResult(SMqConsumerObj *pConsumer, SShowObj *pShow, SSDataBlock *pBlock, int32_t numOfRows, const char* showTopic, bool hasTopic) {
7,021✔
993
  int32_t         code = 0;
7,021✔
994
  int32_t         lino = 0;
7,021✔
995
  SColumnInfoData *pColInfo = NULL;
7,021✔
996
  int32_t          cols = 0;
7,021✔
997
  char * parasStr = NULL;
7,021✔
998
  char           *status = NULL;
7,021✔
999

1000
  PRINT_LOG_START
7,021✔
1001
  // consumer id
1002
  char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1003
  (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
7,021✔
1004
  varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
7,021✔
1005

1006
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1007
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1008
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
7,021✔
1009

1010
  // consumer group
1011
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1012
  STR_TO_VARSTR(cgroup, pConsumer->cgroup);
7,021✔
1013

1014
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1015
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1016
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
7,021✔
1017

1018
  // client id
1019
  char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1020
  STR_TO_VARSTR(clientId, pConsumer->clientId);
7,021✔
1021

1022
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1023
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1024
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
7,021✔
1025

1026
  // user
1027
  char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1028
  STR_TO_VARSTR(user, pConsumer->user);
7,021✔
1029

1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1031
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1032
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
7,021✔
1033

1034
  // fqdn
1035
  char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1036
  STR_TO_VARSTR(fqdn, pConsumer->fqdn);
7,021✔
1037

1038
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1039
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1040
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
7,021✔
1041

1042
  // status
1043
  const char *pStatusName = mndConsumerStatusName(pConsumer->status);
7,021✔
1044
  status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
7,021✔
1045
  MND_TMQ_NULL_CHECK(status);
7,021✔
1046
  STR_TO_VARSTR(status, pStatusName);
7,021✔
1047

1048
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1049
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1050
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
7,021✔
1051

1052
  // one subscribed topic
1053
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1054
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1055
  if (hasTopic) {
7,021✔
1056
    char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
7,021✔
1057
    mndTopicGetShowName(showTopic, topic + VARSTR_HEADER_SIZE);
7,021✔
1058
    *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
7,021✔
1059
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
7,021✔
1060
  } else {
1061
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1062
  }
1063

1064
  // up time
1065
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1066
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1067
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
7,021✔
1068

1069
  // subscribe time
1070
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1071
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1072
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
7,021✔
1073

1074
  // rebalance time
1075
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1076
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1077
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
7,021✔
1078

1079
  char         buf[TSDB_OFFSET_LEN] = {0};
7,021✔
1080
  STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
7,021✔
1081
  tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
7,021✔
1082

1083
  parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
7,021✔
1084
  MND_TMQ_NULL_CHECK(parasStr);
7,021✔
1085
  (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
14,042✔
1086
          pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
7,021✔
1087
  varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
7,021✔
1088

1089
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1090
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1091
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
7,021✔
1092

1093
  // rebalance time
1094
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
7,021✔
1095
  MND_TMQ_NULL_CHECK(pColInfo);
7,021✔
1096
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
7,021✔
1097

1098
END:
7,021✔
1099
  PRINT_LOG_END
7,021✔
1100
  taosMemoryFreeClear(status);
7,021✔
1101
  taosMemoryFreeClear(parasStr);
7,021✔
1102
  return code;
7,021✔
1103
}
1104

1105
static int32_t retrieveOneConsumer(SMqConsumerObj *pConsumer, int32_t* numOfRows, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
7,544✔
1106
  int32_t         code = 0;
7,544✔
1107
  int32_t         lino = 0;
7,544✔
1108
  PRINT_LOG_START
7,544✔
1109
  taosRLockLatch(&pConsumer->lock);
7,544✔
1110
  mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
7,544✔
1111
  if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
7,544✔
1112
    mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
523✔
1113
    goto END;
523✔
1114
  }
1115

1116

1117
  int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
7,021✔
1118
  bool    hasTopic = true;
7,021✔
1119
  if (topicSz == 0) {
7,021✔
1120
    hasTopic = false;
×
1121
    topicSz = 1;
×
1122
  }
1123

1124
  if (*numOfRows + topicSz > rowsCapacity) {
7,021✔
1125
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + topicSz));
×
1126
  }
1127

1128
  for (int32_t i = 0; i < topicSz; i++) {
14,042✔
1129
    MND_TMQ_RETURN_CHECK(buildResult(pConsumer, pShow, pBlock, *numOfRows, taosArrayGetP(pConsumer->assignedTopics, i), hasTopic));
7,021✔
1130
    (*numOfRows)++;
7,021✔
1131
  }
1132

1133
END:
7,021✔
1134
  taosRUnLockLatch(&pConsumer->lock);
7,544✔
1135
  PRINT_LOG_END
7,544✔
1136
  return code;
7,544✔
1137
}
1138

1139
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
9,885✔
1140
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
9,885✔
1141
    return TSDB_CODE_INVALID_PARA;
×
1142
  }
1143
  SMnode         *pMnode = pReq->info.node;
9,885✔
1144
  SSdb           *pSdb = pMnode->pSdb;
9,885✔
1145
  int32_t         numOfRows = 0;
9,885✔
1146
  SMqConsumerObj *pConsumer = NULL;
9,885✔
1147
  int32_t         code = 0;
9,885✔
1148
  int32_t         lino = 0;
9,885✔
1149
  PRINT_LOG_START
9,885✔
1150

1151
  while (numOfRows < rowsCapacity) {
17,429✔
1152
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
17,429✔
1153
    if (pShow->pIter == NULL) {
17,429✔
1154
      break;
9,885✔
1155
    }
1156
    MND_TMQ_RETURN_CHECK(retrieveOneConsumer(pConsumer, &numOfRows, pShow, pBlock, rowsCapacity));
7,544✔
1157
    
1158
    pBlock->info.rows = numOfRows;
7,544✔
1159
    sdbRelease(pSdb, pConsumer);
7,544✔
1160
    pConsumer = NULL;
7,544✔
1161
  }
1162

1163
  pShow->numOfRows += numOfRows;
9,885✔
1164

1165
END:
9,885✔
1166
  sdbRelease(pSdb, pConsumer);
9,885✔
1167
  sdbCancelFetch(pSdb, pShow->pIter);
9,885✔
1168
  if (code != 0) {
9,885✔
1169
    mError("show consumer failed, code:%d", code);
×
1170
    return code;
×
1171
  } else {
1172
    mDebug("show consumer processed, numOfRows:%d", numOfRows);
9,885✔
1173
    return numOfRows;
9,885✔
1174
  }
1175
}
1176

1177
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1178
  if (pMnode == NULL || pIter == NULL) return;
×
1179
  SSdb *pSdb = pMnode->pSdb;
×
1180
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1181
}
1182

1183
const char *mndConsumerStatusName(int status) {
1,075,380✔
1184
  switch (status) {
1,075,380✔
1185
    case MQ_CONSUMER_STATUS_READY:
533,384✔
1186
      return "ready";
533,384✔
1187
    case MQ_CONSUMER_STATUS_REBALANCE:
541,996✔
1188
      return "rebalancing";
541,996✔
1189
    default:
×
1190
      return "unknown";
×
1191
  }
1192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc