• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4687

25 Aug 2025 07:22AM UTC coverage: 57.894% (-2.2%) from 60.092%
#4687

push

travis-ci

web-flow
fix: add taosBenchmark windows support params (#32708)

132643 of 292257 branches covered (45.39%)

Branch coverage included in aggregate %.

201266 of 284501 relevant lines covered (70.74%)

4743408.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.4
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
760✔
45
  if (pTrans == NULL || pSub == NULL) {
760!
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t  code = 0;
760✔
49
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
760✔
50
  MND_TMQ_NULL_CHECK(pCommitRaw);
760!
51
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
760✔
52
  if (code != 0) {
760!
53
    sdbFreeRaw(pCommitRaw);
×
54
    goto END;
×
55
  }
56
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
760✔
57

58
END:
760✔
59
  return code;
760✔
60
}
61

62
int32_t mndInitSubscribe(SMnode *pMnode) {
1,923✔
63
  SSdbTable table = {
1,923✔
64
      .sdbType = SDB_SUBSCRIBE,
65
      .keyType = SDB_KEY_BINARY,
66
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
67
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
68
      .insertFp = (SdbInsertFp)mndSubActionInsert,
69
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
70
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
71
  };
72

73
  if (pMnode == NULL) {
1,923!
74
    return TSDB_CODE_INVALID_PARA;
×
75
  }
76
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
1,923✔
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
1,923✔
78
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
1,923✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
1,923✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
1,923✔
81

82
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
1,923✔
83
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
1,923✔
84

85
  return sdbSetTable(pMnode->pSdb, table);
1,923✔
86
}
87

88
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
373✔
89
  int32_t     code = 0;
373✔
90
  SSdb*       pSdb = pMnode->pSdb;
373✔
91
  SVgObj*     pVgroup = NULL;
373✔
92
  SQueryPlan* pPlan = NULL;
373✔
93
  SSubplan*   pSubplan = NULL;
373✔
94

95
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
373✔
96
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
300✔
97
    if (pPlan == NULL) {
300!
98
      return TSDB_CODE_QRY_INVALID_INPUT;
×
99
    }
100
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
73✔
101
    SNode* pAst = NULL;
6✔
102
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
103
    if (code != 0) {
6!
104
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
105
      return code;
×
106
    }
107

108
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
109
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
110
    if (code != 0) {
6!
111
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
112
      nodesDestroyNode(pAst);
×
113
      return code;
×
114
    }
115
    nodesDestroyNode(pAst);
6✔
116
  }
117

118
  if (pPlan) {
373✔
119
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
306!
120
    if (levelNum != 1) {
306!
121
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
122
      goto END;
×
123
    }
124

125
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
306✔
126
    if (pNodeListNode == NULL){
306!
127
      code = TSDB_CODE_OUT_OF_MEMORY;
×
128
      goto END;
×
129
    }
130
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
306!
131
    if (opNum != 1) {
306!
132
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
133
      goto END;
×
134
    }
135

136
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
306✔
137
  }
138

139
  void* pIter = NULL;
373✔
140
  while (1) {
1,878✔
141
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
2,251✔
142
    if (pIter == NULL) {
2,251✔
143
      break;
373✔
144
    }
145

146
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
1,878✔
147
      sdbRelease(pSdb, pVgroup);
918✔
148
      continue;
918✔
149
    }
150

151
    pSub->vgNum++;
960✔
152

153
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
960!
154
    if (pVgEp == NULL){
960!
155
      code = terrno;
×
156
      goto END;
×
157
    }
158
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
960✔
159
    pVgEp->vgId = pVgroup->vgId;
960✔
160
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
1,920!
161
      code = terrno;
×
162
      taosMemoryFree(pVgEp);
×
163
      goto END;
×
164
    }
165
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
960!
166
    sdbRelease(pSdb, pVgroup);
960✔
167
  }
168

169
  if (pSubplan) {
373✔
170
    int32_t msgLen;
171

172
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
306!
173
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
174
      goto END;
×
175
    }
176
  } else {
177
    pSub->qmsg = taosStrdup("");
67!
178
  }
179

180
END:
373✔
181
  qDestroyQueryPlan(pPlan);
373✔
182
  return code;
373✔
183
}
184

185

186
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
373✔
187
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
373!
188
    return TSDB_CODE_INVALID_PARA;
×
189
  }
190
  int32_t code = 0;
373✔
191
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
373!
192
  (*pSub)->dbUid = pTopic->dbUid;
373✔
193
  (*pSub)->stbUid = pTopic->stbUid;
373✔
194
  (*pSub)->subType = pTopic->subType;
373✔
195
  (*pSub)->withMeta = pTopic->withMeta;
373✔
196

197
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
373!
198
  return code;
373✔
199

200
END:
×
201
  tDeleteSubscribeObj(*pSub);
×
202
  taosMemoryFree(*pSub);
×
203
  *pSub = NULL;
×
204
  return code;
×
205
}
206

207
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,116✔
208
                                    SSubplan *pPlan) {
209
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
2,116!
210
    return TSDB_CODE_INVALID_PARA;
×
211
  }
212
  SMqRebVgReq req = {0};
2,116✔
213
  int32_t     code = 0;
2,116✔
214
  SEncoder encoder = {0};
2,116✔
215

216
  req.oldConsumerId = pRebVg->oldConsumerId;
2,116✔
217
  req.newConsumerId = pRebVg->newConsumerId;
2,116✔
218
  req.vgId = pRebVg->pVgEp->vgId;
2,116✔
219
  if (pPlan) {
2,116✔
220
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
1,687✔
221
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
1,687✔
222
    int32_t msgLen = 0;
1,687✔
223
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
1,687!
224
  } else {
225
    req.qmsg = taosStrdup("");
429!
226
    MND_TMQ_NULL_CHECK(req.qmsg);
429!
227
  }
228
  req.subType = pSub->subType;
2,116✔
229
  req.withMeta = pSub->withMeta;
2,116✔
230
  req.suid = pSub->stbUid;
2,116✔
231
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,116✔
232

233
  int32_t tlen = 0;
2,116✔
234
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,116!
235
  if (code < 0) {
2,116!
236
    goto END;
×
237
  }
238

239
  tlen += sizeof(SMsgHead);
2,116✔
240
  void *buf = taosMemoryMalloc(tlen);
2,116!
241
  MND_TMQ_NULL_CHECK(buf);
2,116!
242
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,116✔
243
  pMsgHead->contLen = htonl(tlen);
2,116✔
244
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,116✔
245

246
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,116✔
247
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,116!
248
  *pBuf = buf;
2,116✔
249
  *pLen = tlen;
2,116✔
250

251
END:
2,116✔
252
  tEncoderClear(&encoder);
2,116✔
253
  taosMemoryFree(req.qmsg);
2,116!
254
  return code;
2,116✔
255
}
256

257
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,116✔
258
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
259
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
2,116!
260
    return TSDB_CODE_INVALID_PARA;
×
261
  }
262
  int32_t code = 0;
2,116✔
263
  void   *buf  = NULL;
2,116✔
264

265
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,116!
266
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
267
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
268
    goto END;
×
269
  }
270

271
  int32_t tlen = 0;
2,116✔
272
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,116!
273
  int32_t vgId = pRebVg->pVgEp->vgId;
2,116✔
274
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,116✔
275
  if (pVgObj == NULL) {
2,116!
276
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
277
    goto END;
×
278
  }
279

280
  STransAction action = {0};
2,116✔
281
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,116✔
282
  action.pCont = buf;
2,116✔
283
  action.contLen = tlen;
2,116✔
284
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,116✔
285

286
  mndReleaseVgroup(pMnode, pVgObj);
2,116✔
287
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,116!
288
  return code;
2,116✔
289

290
END:
×
291
  taosMemoryFree(buf);
×
292
  return code;
×
293
}
294

295
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
2,651✔
296
  if (key == NULL || topic == NULL || cgroup == NULL) {
2,651!
297
    return;
×
298
  }
299
  int32_t i = 0;
2,651✔
300
  while (key[i] != TMQ_SEPARATOR_CHAR) {
21,328✔
301
    i++;
18,677✔
302
  }
303
  (void)memcpy(cgroup, key, i);
2,651✔
304
  cgroup[i] = 0;
2,651✔
305
  if (fullName) {
2,651✔
306
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
2,329✔
307
  } else {
308
    while (key[i] != '.') {
966✔
309
      i++;
644✔
310
    }
311
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
322✔
312
  }
313
}
314

315
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,021✔
316
  if (pHash == NULL || key == NULL) {
1,021!
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  int32_t code = 0;
1,021✔
320
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,021✔
321
  if (pRebInfo == NULL) {
1,021✔
322
    pRebInfo = tNewSMqRebSubscribe(key);
1,006✔
323
    if (pRebInfo == NULL) {
1,006!
324
      code = terrno;
×
325
      goto END;
×
326
    }
327
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,006✔
328
    taosMemoryFreeClear(pRebInfo);
1,006!
329
    if (code != 0) {
1,006!
330
      goto END;
×
331
    }
332
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,006✔
333
    MND_TMQ_NULL_CHECK(pRebInfo);
1,006!
334
  }
335
  if (pReb){
1,021✔
336
    *pReb = pRebInfo;
774✔
337
  }
338

339
END:
247✔
340
  return code;
1,021✔
341
}
342

343
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
2,598✔
344
  if (vgs == NULL || pHash == NULL || key == NULL) {
2,598!
345
    return TSDB_CODE_INVALID_PARA;
×
346
  }
347
  int32_t         code = 0;
2,598✔
348
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
2,598✔
349
  MND_TMQ_NULL_CHECK(pVgEp);
2,598!
350
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
2,598✔
351
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
2,598!
352
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
2,598!
353
END:
×
354
  return code;
2,598✔
355
}
356

357
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,006✔
358
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
1,006!
359
    return TSDB_CODE_INVALID_PARA;
×
360
  }
361
  int32_t code = 0;
1,006✔
362
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,006✔
363
  int32_t actualRemoved = 0;
1,006✔
364
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,347✔
365
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
341✔
366
    MND_TMQ_NULL_CHECK(consumerId);
341!
367
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
341✔
368
    if (pConsumerEp == NULL) {
341!
369
      continue;
×
370
    }
371

372
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
341✔
373
    for (int32_t j = 0; j < consumerVgNum; j++) {
1,359✔
374
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,018!
375
    }
376

377
    taosArrayDestroy(pConsumerEp->vgs);
341✔
378
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
341!
379
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
682!
380
    actualRemoved++;
341✔
381
  }
382

383
  if (numOfRemoved != actualRemoved) {
1,006!
384
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
385
           actualRemoved);
386
  } else {
387
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,006!
388
  }
389
END:
×
390
  return code;
1,006✔
391
}
392

393
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,006✔
394
  if (pOutput == NULL || pInput == NULL) {
1,006!
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
1,006✔
398
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,006✔
399

400
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,439✔
401
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
433✔
402
    MND_TMQ_NULL_CHECK(consumerId);
433!
403
    SMqConsumerEp newConsumerEp = {0};
433✔
404
    newConsumerEp.consumerId = *consumerId;
433✔
405
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
433✔
406
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
433!
407
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
433!
408
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
866!
409
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
433!
410
  }
411
END:
1,006✔
412
  return code;
1,006✔
413
}
414

415
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,006✔
416
  if (pOutput == NULL || pHash == NULL) {
1,006!
417
    return TSDB_CODE_INVALID_PARA;
×
418
  }
419
  int32_t code = 0;
1,006✔
420
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,006✔
421
  for (int32_t i = 0; i < numOfVgroups; i++) {
2,576✔
422
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,570!
423
  }
424
END:
1,006✔
425
  return code;
1,006✔
426
}
427

428
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,006✔
429
                                     int32_t remainderVgCnt) {
430
  if (pOutput == NULL || pHash == NULL) {
1,006!
431
    return TSDB_CODE_INVALID_PARA;
×
432
  }
433
  int32_t code = 0;
1,006✔
434
  int32_t cnt = 0;
1,006✔
435
  void   *pIter = NULL;
1,006✔
436

437
  while (1) {
270✔
438
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,276✔
439
    if (pIter == NULL) {
1,276✔
440
      break;
1,006✔
441
    }
442

443
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
270✔
444
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
270✔
445

446
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
540!
447
    if (consumerVgNum > minVgCnt) {
270✔
448
      if (cnt < remainderVgCnt) {
7✔
449
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
2!
450
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
451
        }
452
        cnt++;
2✔
453
      } else {
454
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
15✔
455
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
10!
456
        }
457
      }
458
    }
459
  }
460
END:
1,006✔
461
  return code;
1,006✔
462
}
463

464
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,006✔
465
  if (pMnode == NULL || pOutput == NULL) {
1,006!
466
    return TSDB_CODE_INVALID_PARA;
×
467
  }
468
  int32_t code = 0;
1,006✔
469
  int32_t totalVgNum = 0;
1,006✔
470
  SVgObj *pVgroup = NULL;
1,006✔
471
  SMqVgEp *pVgEp = NULL;
1,006✔
472
  void   *pIter = NULL;
1,006✔
473
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,006✔
474
  MND_TMQ_NULL_CHECK(newVgs);
1,006!
475
  while (1) {
476
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
5,996✔
477
    if (pIter == NULL) {
5,996✔
478
      break;
1,006✔
479
    }
480
    if (pVgroup->mountVgId) {
4,990!
481
      sdbRelease(pMnode->pSdb, pVgroup);
×
482
      continue;
×
483
    }
484

485
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
4,990✔
486
      sdbRelease(pMnode->pSdb, pVgroup);
2,359✔
487
      continue;
2,359✔
488
    }
489

490
    totalVgNum++;
2,631✔
491
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
2,631!
492
    MND_TMQ_NULL_CHECK(pVgEp);
2,631!
493
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,631✔
494
    pVgEp->vgId = pVgroup->vgId;
2,631✔
495
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
2,631!
496
    pVgEp = NULL;
2,631✔
497
    sdbRelease(pMnode->pSdb, pVgroup);
2,631✔
498
  }
499

500
  pIter = NULL;
1,006✔
501
  while (1) {
611✔
502
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,617✔
503
    if (pIter == NULL) break;
1,617✔
504
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
611✔
505
    int32_t j = 0;
611✔
506
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
1,919✔
507
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,308✔
508
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,308!
509
      bool     find = false;
1,308✔
510
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
1,847✔
511
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
1,600✔
512
        MND_TMQ_NULL_CHECK(pnewVgEp);
1,600!
513
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
1,600✔
514
          tDeleteSMqVgEp(pnewVgEp);
1,061✔
515
          taosArrayRemove(newVgs, k);
1,061✔
516
          find = true;
1,061✔
517
          break;
1,061✔
518
        }
519
      }
520
      if (!find) {
1,308✔
521
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
247!
522
        tDeleteSMqVgEp(pVgEpTmp);
247✔
523
        taosArrayRemove(pConsumerEp->vgs, j);
247✔
524
        continue;
247✔
525
      }
526
      j++;
1,061✔
527
    }
528
  }
529

530
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,006✔
531
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
247!
532
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
247!
533
    taosArrayDestroy(newVgs);
247✔
534
  } else {
535
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
759✔
536
  }
537
  return totalVgNum;
1,006✔
538

539
END:
×
540
  sdbRelease(pMnode->pSdb, pVgroup);
×
541
  taosMemoryFree(pVgEp);
×
542
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
543
  return code;
×
544
}
545

546
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,006✔
547
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,006!
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550
  SMqSubscribeObj *pSub = NULL;
1,006✔
551
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,006✔
552
  if( code != 0){
1,006✔
553
    return 0;
373✔
554
  }
555
  taosRLockLatch(&pSub->lock);
633✔
556
  if (pOutput->pSub->offsetRows == NULL) {
633✔
557
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
532✔
558
    if(pOutput->pSub->offsetRows == NULL) {
532!
559
      taosRUnLockLatch(&pSub->lock);
×
560
      code = terrno;
×
561
      goto END;
×
562
    }
563
  }
564
  void *pIter = NULL;
633✔
565
  while (1) {
611✔
566
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,244✔
567
    if (pIter == NULL) break;
1,244✔
568
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
611✔
569
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
611✔
570

571
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
1,873✔
572
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,262✔
573
      MND_TMQ_NULL_CHECK(d1);
1,262!
574
      bool        jump = false;
1,262✔
575
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
1,766✔
576
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
525✔
577
        MND_TMQ_NULL_CHECK(pVgEp);
525!
578
        if (pVgEp->vgId == d1->vgId) {
525✔
579
          jump = true;
21✔
580
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
21!
581
                pConsumerEp->consumerId, pVgEp->vgId);
582
          break;
21✔
583
        }
584
      }
585
      if (jump) continue;
1,262✔
586
      bool find = false;
1,241✔
587
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
2,983✔
588
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
1,860✔
589
        MND_TMQ_NULL_CHECK(d2);
1,860!
590
        if (d1->vgId == d2->vgId) {
1,860✔
591
          d2->rows += d1->rows;
118✔
592
          d2->offset = d1->offset;
118✔
593
          d2->ever = d1->ever;
118✔
594
          find = true;
118✔
595
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
118!
596
          break;
118✔
597
        }
598
      }
599
      if (!find) {
1,241✔
600
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,246!
601
      }
602
    }
603
  }
604
  taosRUnLockLatch(&pSub->lock);
633✔
605
  mndReleaseSubscribe(pMnode, pSub);
633✔
606

607
END:
633✔
608
  return code;
633✔
609
}
610

611
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,006✔
612
  if (pOutput == NULL) return;
1,006!
613
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,006!
614
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
3,604✔
615
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
2,598✔
616
    if (pOutputRebVg == NULL) continue;
2,598!
617
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
2,598!
618
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
619
  }
620

621
  void *pIter = NULL;
1,006✔
622
  while (1) {
703✔
623
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,709✔
624
    if (pIter == NULL) break;
1,709✔
625
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
703✔
626
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
703✔
627
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
703!
628
          pConsumerEp->consumerId, sz);
629
    for (int32_t i = 0; i < sz; i++) {
2,342✔
630
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
1,639✔
631
      if (pVgEp == NULL) continue;
1,639!
632
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
1,639!
633
            pConsumerEp->consumerId);
634
    }
635
  }
636
}
637

638
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,006✔
639
                           int32_t *remainderVgCnt) {
640
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
1,006!
641
    return;
×
642
  }
643
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,006✔
644
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,006✔
645
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,006✔
646

647
  // calc num
648
  if (numOfFinal != 0) {
1,006✔
649
    *minVgCnt = totalVgNum / numOfFinal;
684✔
650
    *remainderVgCnt = totalVgNum % numOfFinal;
684✔
651
  } else {
652
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
322!
653
  }
654
  mInfo(
1,006!
655
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
656
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
657
}
658

659
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,006✔
660
  if (pOutput == NULL || pHash == NULL) {
1,006!
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
663
  SMqRebOutputVg *pRebVg = NULL;
1,006✔
664
  void           *pAssignIter = NULL;
1,006✔
665
  void           *pIter = NULL;
1,006✔
666
  int32_t         code = 0;
1,006✔
667

668
  while (1) {
703✔
669
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,709✔
670
    if (pIter == NULL) {
1,709✔
671
      break;
1,006✔
672
    }
673
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
703✔
674
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,306✔
675
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,603✔
676
      if (pAssignIter == NULL) {
1,603!
677
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
678
        break;
×
679
      }
680

681
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,603✔
682
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,603✔
683
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,206!
684
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,603!
685
            pConsumerEp->consumerId);
686
    }
687
  }
688

689
  while (1) {
4✔
690
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,010✔
691
    if (pIter == NULL) {
1,010✔
692
      break;
322✔
693
    }
694
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
688✔
695
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
688✔
696
      pAssignIter = taosHashIterate(pHash, pAssignIter);
687✔
697
      if (pAssignIter == NULL) {
687✔
698
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
684!
699
        break;
684✔
700
      }
701

702
      pRebVg = (SMqRebOutputVg *)pAssignIter;
3✔
703
      pRebVg->newConsumerId = pConsumerEp->consumerId;
3✔
704
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
6!
705
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
3!
706
            pConsumerEp->consumerId);
707
    }
708
  }
709

710
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,006✔
711
  if (pAssignIter != NULL) {
1,006!
712
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
713
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
714
    goto END;
×
715
  }
716
  while (1) {
2,598✔
717
    pAssignIter = taosHashIterate(pHash, pAssignIter);
3,604✔
718
    if (pAssignIter == NULL) {
3,604✔
719
      break;
1,006✔
720
    }
721

722
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
2,598✔
723
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
5,196!
724
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
2,598✔
725
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
1,984!
726
    }
727
  }
728

729
END:
1,006✔
730
  return code;
1,006✔
731
}
732

733
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,006✔
734
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,006!
735
    return TSDB_CODE_INVALID_PARA;
×
736
  }
737
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,006✔
738
  if (totalVgNum < 0){
1,006!
739
    return totalVgNum;
×
740
  }
741
  const char *pSubKey = pOutput->pSub->key;
1,006✔
742
  int32_t     minVgCnt = 0;
1,006✔
743
  int32_t     remainderVgCnt = 0;
1,006✔
744
  int32_t     code = 0;
1,006✔
745
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,006✔
746
  MND_TMQ_NULL_CHECK(pHash);
1,006!
747
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,006!
748
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,006!
749
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,006✔
750
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,006!
751
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,006!
752
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,006!
753
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,006!
754
  printRebalanceLog(pOutput);
1,006✔
755
  taosHashCleanup(pHash);
1,006✔
756

757
END:
1,006✔
758
  return code;
1,006✔
759
}
760

761
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
2,280✔
762
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
2,280!
763
    return TSDB_CODE_INVALID_PARA;
×
764
  }
765
  int32_t         code = 0;
2,280✔
766
  SMqConsumerObj *pConsumerNew = NULL;
2,280✔
767
  int32_t         consumerNum = taosArrayGetSize(consumers);
2,280✔
768
  for (int32_t i = 0; i < consumerNum; i++) {
3,078✔
769
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
798✔
770
    MND_TMQ_NULL_CHECK(consumerId);
798!
771
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
798!
772
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
798!
773
    tDeleteSMqConsumerObj(pConsumerNew);
798✔
774
  }
775
  pConsumerNew = NULL;
2,280✔
776

777
END:
2,280✔
778
  tDeleteSMqConsumerObj(pConsumerNew);
2,280✔
779
  return code;
2,280✔
780
}
781

782
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
760✔
783
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
760!
784
    return TSDB_CODE_INVALID_PARA;
×
785
  }
786
  int32_t code = 0;
760✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
760!
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
760!
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
760!
790
END:
760✔
791
  return code;
760✔
792
}
793

794
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,006✔
795
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
1,006!
796
    return TSDB_CODE_INVALID_PARA;
×
797
  }
798
  struct SSubplan *pPlan = NULL;
1,006✔
799
  int32_t          code = 0;
1,006✔
800
  STrans          *pTrans = NULL;
1,006✔
801

802
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,006✔
803
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
748!
804
  }
805

806
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,006✔
807
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,006✔
808
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,006✔
809

810
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,006✔
811
  if (pTrans == NULL) {
1,006!
812
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
813
    if (terrno != 0) code = terrno;
×
814
    goto END;
×
815
  }
816

817
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,006✔
818
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,006✔
819

820
  // 1. redo action: action to all vg
821
  const SArray *rebVgs = pOutput->rebVgs;
760✔
822
  int32_t       vgNum = taosArrayGetSize(rebVgs);
760✔
823
  for (int32_t i = 0; i < vgNum; i++) {
2,876✔
824
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,116✔
825
    MND_TMQ_NULL_CHECK(pRebVg);
2,116!
826
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,116!
827
  }
828

829
  // 2. commit log: subscribe and vg assignment
830
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
760!
831

832
  // 3. commit log: consumer to update status and epoch
833
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
760!
834

835
  // 4. set cb
836
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
760✔
837

838
  // 5. execution
839
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
760!
840

841
END:
760✔
842
  nodesDestroyNode((SNode *)pPlan);
1,006✔
843
  mndTransDrop(pTrans);
1,006✔
844
  TAOS_RETURN(code);
1,006✔
845
}
846

847
static void freeRebalanceItem(void *param) {
1,006✔
848
  if (param == NULL) return;
1,006!
849
  SMqRebInfo *pInfo = param;
1,006✔
850
  taosArrayDestroy(pInfo->newConsumers);
1,006✔
851
  taosArrayDestroy(pInfo->removedConsumers);
1,006✔
852
}
853

854
// type = 0 remove  type = 1 add
855
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
1,520✔
856
  if (rebSubHash == NULL || topicList == NULL) {
1,520!
857
    return TSDB_CODE_INVALID_PARA;
×
858
  }
859
  taosRLockLatch(&pConsumer->lock);
1,520✔
860
  int32_t code = 0;
1,520✔
861
  int32_t topicNum = taosArrayGetSize(topicList);
1,520✔
862
  for (int32_t i = 0; i < topicNum; i++) {
2,294✔
863
    char *removedTopic = taosArrayGetP(topicList, i);
774✔
864
    MND_TMQ_NULL_CHECK(removedTopic);
774!
865
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
774✔
866
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
774✔
867
    SMqRebInfo *pRebSub = NULL;
774✔
868
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
774!
869
    if (type == 0)
774✔
870
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
682!
871
    else if (type == 1)
433!
872
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
866!
873
  }
874

875
END:
1,520✔
876
  taosRUnLockLatch(&pConsumer->lock);
1,520✔
877
  return code;
1,520✔
878
}
879

880
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,388✔
881
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
2,388!
882
    return;
×
883
  }
884
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,388✔
885
  for (int32_t i = 0; i < newTopicNum; i++) {
4,777✔
886
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,389✔
887
    if (topic == NULL){
2,389!
888
      continue;
×
889
    }
890
    SMqSubscribeObj *pSub = NULL;
2,389✔
891
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,389✔
892
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,389✔
893
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,389✔
894
    if (code != 0) {
2,389!
895
      continue;
×
896
    }
897
    taosRLockLatch(&pSub->lock);
2,389✔
898

899
    // iterate all vg assigned to the consumer of that topic
900
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,389✔
901
    if (pConsumerEp == NULL){
2,389!
902
      taosRUnLockLatch(&pSub->lock);
×
903
      mndReleaseSubscribe(pMnode, pSub);
×
904
      continue;
×
905
    }
906
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,389✔
907
    for (int32_t j = 0; j < vgNum; j++) {
7,331✔
908
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
4,942✔
909
      if (pVgEp == NULL) {
4,942!
910
        continue;
×
911
      }
912
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
4,942✔
913
      if (!pVgroup) {
4,942✔
914
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
247✔
915
        if (code != 0){
247!
916
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
917
        }else{
918
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
247!
919
        }
920
      }
921
      mndReleaseVgroup(pMnode, pVgroup);
4,942✔
922
    }
923
    taosRUnLockLatch(&pSub->lock);
2,389✔
924
    mndReleaseSubscribe(pMnode, pSub);
2,389✔
925
  }
926
}
927

928
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
27,302✔
929
  if (pMsg == NULL || rebSubHash == NULL) {
27,302!
930
    return TSDB_CODE_INVALID_PARA;
×
931
  }
932
  SMnode         *pMnode = pMsg->info.node;
27,302✔
933
  SSdb           *pSdb = pMnode->pSdb;
27,302✔
934
  SMqConsumerObj *pConsumer = NULL;
27,302✔
935
  void           *pIter = NULL;
27,302✔
936
  int32_t         code = 0;
27,302✔
937

938
  // iterate all consumers, find all modification
939
  while (1) {
3,440✔
940
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
30,742✔
941
    if (pIter == NULL) {
30,742✔
942
      break;
27,302✔
943
    }
944

945
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,440✔
946
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,440✔
947
    int32_t status = atomic_load_32(&pConsumer->status);
3,440✔
948

949
    mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,440!
950
           ", hbstatus:%d, pollStatus:%d",
951
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
952
           pConsumer->createTime, hbStatus, pollStatus);
953

954
    if (status == MQ_CONSUMER_STATUS_READY) {
3,440✔
955
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
2,684✔
956
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
288!
957
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,396✔
958
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,389✔
959
        mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
8!
960
           ", hb lost cnt:%d, or long time no poll cnt:%d",
961
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
962
           pConsumer->createTime, hbStatus, pollStatus);
963
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
8!
964
      } else {
965
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,388✔
966
      }
967
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
756!
968
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
756!
969
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
756!
970
    } else {
971
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
972
    }
973

974
    mndReleaseConsumer(pMnode, pConsumer);
3,440✔
975
  }
976
END:
27,302✔
977
  return code;
27,302✔
978
}
979

980
bool mndRebTryStart() {
27,302✔
981
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
27,302✔
982
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
27,302!
983
}
984

985
void mndRebCntInc() {
771✔
986
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
771✔
987
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
771!
988
}
771✔
989

990
void mndRebCntDec() {
28,073✔
991
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
28,073✔
992
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
28,073!
993
}
28,073✔
994

995
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,006✔
996
  if (rebOutput == NULL) {
1,006!
997
    return;
×
998
  }
999
  taosArrayDestroy(rebOutput->newConsumers);
1,006✔
1000
  taosArrayDestroy(rebOutput->modifyConsumers);
1,006✔
1001
  taosArrayDestroy(rebOutput->removedConsumers);
1,006✔
1002
  taosArrayDestroy(rebOutput->rebVgs);
1,006✔
1003
  tDeleteSubscribeObj(rebOutput->pSub);
1,006✔
1004
  taosMemoryFree(rebOutput->pSub);
1,006!
1005
  rebOutput->pSub = NULL;
1,006✔
1006
}
1007

1008
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,006✔
1009
  if (rebOutput == NULL) {
1,006!
1010
    return TSDB_CODE_INVALID_PARA;
×
1011
  }
1012
  int32_t code = 0;
1,006✔
1013
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,006✔
1014
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,006!
1015
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,006✔
1016
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,006!
1017
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,006✔
1018
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,006!
1019
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,006✔
1020
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,006!
1021
  return code;
1,006✔
1022

1023
END:
×
1024
  clearRebOutput(rebOutput);
×
1025
  return code;
×
1026
}
1027

1028
// This function only works when there are dirty consumers
1029
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
633✔
1030
  if (pMnode == NULL || pSub == NULL) {
633!
1031
    return TSDB_CODE_INVALID_PARA;
×
1032
  }
1033
  int32_t code = 0;
633✔
1034
  void   *pIter = NULL;
633✔
1035
  while (1) {
611✔
1036
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,244✔
1037
    if (pIter == NULL) {
1,244✔
1038
      break;
633✔
1039
    }
1040

1041
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
611✔
1042
    SMqConsumerObj *pConsumer = NULL;
611✔
1043
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
611✔
1044
    if (code == 0) {
611!
1045
      mndReleaseConsumer(pMnode, pConsumer);
611✔
1046
      continue;
611✔
1047
    }
1048
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
1049
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
1050

1051
    taosArrayDestroy(pConsumerEp->vgs);
×
1052
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
1053
  }
1054
END:
633✔
1055
  return code;
633✔
1056
}
1057

1058
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,006✔
1059
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
1,006!
1060
    return TSDB_CODE_INVALID_PARA;
×
1061
  }
1062
  const char      *key = rebInput->pRebInfo->key;
1,006✔
1063
  SMqSubscribeObj *pSub = NULL;
1,006✔
1064
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,006✔
1065

1066
  if (code != 0) {
1,006✔
1067
    // split sub key and extract topic
1068
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
373✔
1069
    char cgroup[TSDB_CGROUP_LEN] = {0};
373✔
1070
    mndSplitSubscribeKey(key, topic, cgroup, true);
373✔
1071
    SMqTopicObj *pTopic = NULL;
373✔
1072
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
373!
1073
    taosRLockLatch(&pTopic->lock);
373✔
1074

1075
    rebInput->oldConsumerNum = 0;
373✔
1076
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
373✔
1077
    if (code != 0) {
373!
1078
      mError("tmq rebalance mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
1079
      taosRUnLockLatch(&pTopic->lock);
×
1080
      mndReleaseTopic(pMnode, pTopic);
×
1081
      return code;
×
1082
    }
1083

1084
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
373✔
1085
    taosRUnLockLatch(&pTopic->lock);
373✔
1086
    mndReleaseTopic(pMnode, pTopic);
373✔
1087

1088
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
373!
1089
  } else {
1090
    taosRLockLatch(&pSub->lock);
633✔
1091
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
633✔
1092
    if(code != 0){
633!
1093
      taosRUnLockLatch(&pSub->lock);
×
1094
      goto END;
×
1095
    }
1096
    code = checkConsumer(pMnode, rebOutput->pSub);
633✔
1097
    if(code != 0){
633!
1098
      taosRUnLockLatch(&pSub->lock);
×
1099
      goto END;
×
1100
    }
1101
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
633✔
1102
    taosRUnLockLatch(&pSub->lock);
633✔
1103

1104
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
633!
1105
    mndReleaseSubscribe(pMnode, pSub);
633✔
1106
  }
1107

1108
END:
1,006✔
1109
  return code;
1,006✔
1110
}
1111

1112
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
27,302✔
1113
  if (pMsg == NULL) {
27,302!
1114
    return TSDB_CODE_INVALID_PARA;
×
1115
  }
1116
  int     code = 0;
27,302✔
1117
  void   *pIter = NULL;
27,302✔
1118
  SMnode *pMnode = pMsg->info.node;
27,302✔
1119
  PRINT_LOG_START;
27,302✔
1120
  if (!mndRebTryStart()) {
27,302!
1121
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
1122
    return code;
×
1123
  }
1124

1125
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
27,302✔
1126
  MND_TMQ_NULL_CHECK(rebSubHash);
27,302!
1127

1128
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
27,302✔
1129

1130
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
27,302!
1131
  if (taosHashGetSize(rebSubHash) > 0) {
27,302✔
1132
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
889!
1133
  }
1134

1135
  while (1) {
1,006✔
1136
    pIter = taosHashIterate(rebSubHash, pIter);
28,308✔
1137
    if (pIter == NULL) {
28,308✔
1138
      break;
27,302✔
1139
    }
1140

1141
    SMqRebInputObj  rebInput = {0};
1,006✔
1142
    SMqRebOutputObj rebOutput = {0};
1,006✔
1143
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,006!
1144
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,006✔
1145
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,006✔
1146
    if (code != 0) {
1,006!
1147
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1148
    }
1149

1150
    if (code == 0){
1,006!
1151
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,006✔
1152
      if (code != 0) {
1,006!
1153
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1154
      }
1155
    }
1156

1157
    if (code == 0){
1,006!
1158
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,006✔
1159
      if (code != 0) {
1,006✔
1160
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
246!
1161
      }
1162
    }
1163

1164
    clearRebOutput(&rebOutput);
1,006✔
1165
  }
1166

1167
  if (taosHashGetSize(rebSubHash) > 0) {
27,302✔
1168
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
889!
1169
  }
1170

1171
END:
26,413✔
1172
  taosHashCancelIterate(rebSubHash, pIter);
27,302✔
1173
  taosHashCleanup(rebSubHash);
27,302✔
1174
  mndRebCntDec();
27,302✔
1175

1176
  PRINT_LOG_END(code);
27,302!
1177
  TAOS_RETURN(code);
27,302✔
1178
}
1179

1180
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
231✔
1181
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
231!
1182
    return TSDB_CODE_INVALID_PARA;
×
1183
  }
1184
  void   *pIter = NULL;
231✔
1185
  SVgObj *pVgObj = NULL;
231✔
1186
  int32_t code = 0;
231✔
1187
  while (1) {
1,531✔
1188
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
1,762✔
1189
    if (pIter == NULL) {
1,762✔
1190
      break;
231✔
1191
    }
1192
    if (pVgObj->mountVgId) {
1,531!
1193
      sdbRelease(pMnode->pSdb, pVgObj);
×
1194
      continue;
754✔
1195
    }
1196

1197
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,531✔
1198
      sdbRelease(pMnode->pSdb, pVgObj);
754✔
1199
      continue;
754✔
1200
    }
1201
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
777!
1202
    MND_TMQ_NULL_CHECK(pReq);
777!
1203
    pReq->head.vgId = htonl(pVgObj->vgId);
777✔
1204
    pReq->vgId = pVgObj->vgId;
777✔
1205
    pReq->consumerId = -1;
777✔
1206
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
777✔
1207

1208
    STransAction action = {0};
777✔
1209
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
777✔
1210
    action.pCont = pReq;
777✔
1211
    action.contLen = sizeof(SMqVDeleteReq);
777✔
1212
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
777✔
1213
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
777✔
1214

1215
    sdbRelease(pMnode->pSdb, pVgObj);
777✔
1216
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
777!
1217
  }
1218

1219
END:
231✔
1220
  sdbRelease(pMnode->pSdb, pVgObj);
231✔
1221
  sdbCancelFetch(pMnode->pSdb, pIter);
231✔
1222
  return code;
231✔
1223
}
1224

1225
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
1✔
1226
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
1!
1227
    return TSDB_CODE_INVALID_PARA;
×
1228
  }
1229
  void           *pIter = NULL;
1✔
1230
  SMqConsumerObj *pConsumer = NULL;
1✔
1231
  SMqConsumerObj *pConsumerNew = NULL;
1✔
1232
  int             code = 0;
1✔
1233
  while (1) {
1234
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
2✔
1235
    if (pIter == NULL) {
2✔
1236
      break;
1✔
1237
    }
1238

1239
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
1!
1240
      sdbRelease(pMnode->pSdb, pConsumer);
×
1241
      continue;
×
1242
    }
1243

1244
    if (deleteConsumer) {
1!
1245
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
×
1246
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
×
1247
      tDeleteSMqConsumerObj(pConsumerNew);
×
1248
      pConsumerNew = NULL;
×
1249
    } else {
1250
      bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1251
      if (found){
1!
1252
        mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1253
               topic, pConsumer->consumerId, pConsumer->cgroup);
1254
        code = TSDB_CODE_MND_CGROUP_USED;
×
1255
        goto END;
×
1256
      }
1257
    }
1258

1259

1260
    sdbRelease(pMnode->pSdb, pConsumer);
1✔
1261
  }
1262

1263
END:
1✔
1264
  tDeleteSMqConsumerObj(pConsumerNew);
1✔
1265
  sdbRelease(pMnode->pSdb, pConsumer);
1✔
1266
  sdbCancelFetch(pMnode->pSdb, pIter);
1✔
1267
  return code;
1✔
1268
}
1269

1270
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
1✔
1271
  if (pMsg == NULL) {
1!
1272
    return TSDB_CODE_INVALID_PARA;
×
1273
  }
1274
  SMnode         *pMnode = pMsg->info.node;
1✔
1275
  SMDropCgroupReq dropReq = {0};
1✔
1276
  STrans         *pTrans = NULL;
1✔
1277
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
1✔
1278
  SMqSubscribeObj *pSub = NULL;
1✔
1279

1280
  PRINT_LOG_START
1!
1281
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
1!
1282
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1✔
1283
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
1✔
1284
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1✔
1285
  if (code != 0) {
1!
1286
    if (dropReq.igNotExists) {
×
1287
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1288
      return 0;
×
1289
    } else {
1290
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1291
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1292
      return code;
×
1293
    }
1294
  }
1295

1296
  taosWLockLatch(&pSub->lock);
1✔
1297
  if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
1!
1298
    code = TSDB_CODE_MND_CGROUP_USED;
×
1299
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1300
    goto END;
×
1301
  }
1302

1303
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
1✔
1304
  MND_TMQ_NULL_CHECK(pTrans);
1!
1305
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
1!
1306
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
1✔
1307
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1!
1308
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
1!
1309
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
1!
1310
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
1!
1311
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1!
1312

1313
END:
1✔
1314
  taosWUnLockLatch(&pSub->lock);
1✔
1315
  mndReleaseSubscribe(pMnode, pSub);
1✔
1316
  mndTransDrop(pTrans);
1✔
1317
  PRINT_LOG_END(code);
1!
1318

1319
  if (code != 0) {
1!
1320
    TAOS_RETURN(code);
×
1321
  }
1322
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
1✔
1323
}
1324

1325
void mndCleanupSubscribe(SMnode *pMnode) {}
1,923✔
1326

1327
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,278✔
1328
  if (pSub == NULL) {
1,278!
1329
    return NULL;
×
1330
  }
1331
  int32_t code = 0;
1,278✔
1332
  int32_t lino = 0;
1,278✔
1333
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,278✔
1334
  void   *buf = NULL;
1,278✔
1335
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,278✔
1336
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,278!
1337
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,278✔
1338

1339
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,278✔
1340
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,278!
1341

1342
  buf = taosMemoryMalloc(tlen);
1,278!
1343
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,278!
1344

1345
  void *abuf = buf;
1,278✔
1346
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,278!
1347
    goto SUB_ENCODE_OVER;
×
1348
  }
1349

1350
  int32_t dataPos = 0;
1,278✔
1351
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,278!
1352
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,278!
1353
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,278!
1354
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,278!
1355

1356
  terrno = TSDB_CODE_SUCCESS;
1,278✔
1357

1358
SUB_ENCODE_OVER:
1,278✔
1359
  taosMemoryFreeClear(buf);
1,278!
1360
  if (terrno != TSDB_CODE_SUCCESS) {
1,278!
1361
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1362
    sdbFreeRaw(pRaw);
×
1363
    return NULL;
×
1364
  }
1365

1366
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,278!
1367
  return pRaw;
1,278✔
1368
}
1369

1370
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,025✔
1371
  if (pRaw == NULL) {
1,025!
1372
    return NULL;
×
1373
  }
1374
  int32_t code = 0;
1,025✔
1375
  int32_t lino = 0;
1,025✔
1376
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,025✔
1377
  SSdbRow         *pRow = NULL;
1,025✔
1378
  SMqSubscribeObj *pSub = NULL;
1,025✔
1379
  void            *buf = NULL;
1,025✔
1380

1381
  int8_t sver = 0;
1,025✔
1382
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,025!
1383

1384
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,025!
1385
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1386
    goto SUB_DECODE_OVER;
×
1387
  }
1388

1389
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,025✔
1390
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,025!
1391

1392
  pSub = sdbGetRowObj(pRow);
1,025✔
1393
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,025!
1394

1395
  int32_t dataPos = 0;
1,025✔
1396
  int32_t tlen;
1397
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,025!
1398
  buf = taosMemoryMalloc(tlen);
1,025!
1399
  if (buf == NULL) goto SUB_DECODE_OVER;
1,025!
1400
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,025!
1401
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,025!
1402

1403
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,025!
1404
    goto SUB_DECODE_OVER;
×
1405
  }
1406

1407
  // update epset saved in mnode
1408
  if (pSub->unassignedVgs != NULL) {
1,025!
1409
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,025✔
1410
    for (int32_t i = 0; i < size; ++i) {
2,799✔
1411
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
1,774✔
1412
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,774✔
1413
    }
1414
  }
1415
  if (pSub->consumerHash != NULL) {
1,025!
1416
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,025✔
1417
    while (pIter) {
1,519✔
1418
      SMqConsumerEp *pConsumerEp = pIter;
494✔
1419
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
494✔
1420
      for (int32_t i = 0; i < size; ++i) {
1,710✔
1421
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,216✔
1422
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,216✔
1423
      }
1424
      pIter = taosHashIterate(pSub->consumerHash, pIter);
494✔
1425
    }
1426
  }
1427

1428
  terrno = TSDB_CODE_SUCCESS;
1,025✔
1429

1430
SUB_DECODE_OVER:
1,025✔
1431
  taosMemoryFreeClear(buf);
1,025!
1432
  if (terrno != TSDB_CODE_SUCCESS) {
1,025!
1433
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1434
    taosMemoryFreeClear(pRow);
×
1435
    return NULL;
×
1436
  }
1437

1438
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,025!
1439
  return pRow;
1,025✔
1440
}
1441

1442
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
402✔
1443
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
402!
1444
  return 0;
402✔
1445
}
1446

1447
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,025✔
1448
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
1,025!
1449
  tDeleteSubscribeObj(pSub);
1,025✔
1450
  return 0;
1,025✔
1451
}
1452

1453
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
392✔
1454
  if (pOldSub == NULL || pNewSub == NULL) return -1;
392!
1455
  mTrace("subscribe:%s, perform update action", pOldSub->key);
392!
1456
  taosWLockLatch(&pOldSub->lock);
392✔
1457

1458
  SHashObj *tmp = pOldSub->consumerHash;
392✔
1459
  pOldSub->consumerHash = pNewSub->consumerHash;
392✔
1460
  pNewSub->consumerHash = tmp;
392✔
1461

1462
  SArray *tmp1 = pOldSub->unassignedVgs;
392✔
1463
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
392✔
1464
  pNewSub->unassignedVgs = tmp1;
392✔
1465

1466
  SArray *tmp2 = pOldSub->offsetRows;
392✔
1467
  pOldSub->offsetRows = pNewSub->offsetRows;
392✔
1468
  pNewSub->offsetRows = tmp2;
392✔
1469

1470
  taosWUnLockLatch(&pOldSub->lock);
392✔
1471
  return 0;
392✔
1472
}
1473

1474
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
9,916✔
1475
  if (pMnode == NULL || key == NULL || pSub == NULL){
9,916!
1476
    return TSDB_CODE_INVALID_PARA;
×
1477
  }
1478
  SSdb            *pSdb = pMnode->pSdb;
9,916✔
1479
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
9,916✔
1480
  if (*pSub == NULL) {
9,916✔
1481
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
746✔
1482
  }
1483
  return 0;
9,170✔
1484
}
1485

1486
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
454✔
1487
  if (pMnode == NULL || topicName == NULL) return 0;
454!
1488
  int32_t num = 0;
454✔
1489
  SSdb   *pSdb = pMnode->pSdb;
454✔
1490

1491
  void            *pIter = NULL;
454✔
1492
  SMqSubscribeObj *pSub = NULL;
454✔
1493
  while (1) {
507✔
1494
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
961✔
1495
    if (pIter == NULL) break;
961✔
1496

1497
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
507✔
1498
    char cgroup[TSDB_CGROUP_LEN] = {0};
507✔
1499
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
507✔
1500
    if (strcmp(topic, topicName) != 0) {
507✔
1501
      sdbRelease(pSdb, pSub);
240✔
1502
      continue;
240✔
1503
    }
1504

1505
    num++;
267✔
1506
    sdbRelease(pSdb, pSub);
267✔
1507
  }
1508

1509
  return num;
454✔
1510
}
1511

1512
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
9,170✔
1513
  if (pMnode == NULL || pSub == NULL) return;
9,170!
1514
  SSdb *pSdb = pMnode->pSdb;
9,170✔
1515
  sdbRelease(pSdb, pSub);
9,170✔
1516
}
1517

1518
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
231✔
1519
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
231!
1520
  int32_t  code = 0;
231✔
1521
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
231✔
1522
  MND_TMQ_NULL_CHECK(pCommitRaw);
231!
1523
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
231✔
1524
  if (code != 0){
231!
1525
    sdbFreeRaw(pCommitRaw);
×
1526
    goto END;
×
1527
  }
1528
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
231✔
1529
END:
231✔
1530
  return code;
231✔
1531
}
1532

1533
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
313✔
1534
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
313!
1535
  SSdb            *pSdb = pMnode->pSdb;
313✔
1536
  int32_t          code = 0;
313✔
1537
  void            *pIter = NULL;
313✔
1538
  SMqSubscribeObj *pSub = NULL;
313✔
1539
  while (1) {
433✔
1540
    sdbRelease(pSdb, pSub);
746✔
1541
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
746✔
1542
    if (pIter == NULL) break;
746✔
1543

1544
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
443✔
1545
    char cgroup[TSDB_CGROUP_LEN] = {0};
443✔
1546
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
443✔
1547
    if (strcmp(topic, topicName) != 0) {
443✔
1548
      continue;
203✔
1549
    }
1550

1551
    // iter all vnode to delete handle
1552
    if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
240✔
1553
      code = TSDB_CODE_MND_IN_REBALANCE;
10✔
1554
      goto END;
10✔
1555
    }
1556

1557
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
230!
1558
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
230!
1559
  }
1560

1561
END:
313✔
1562
  sdbRelease(pSdb, pSub);
313✔
1563
  sdbCancelFetch(pSdb, pIter);
313✔
1564

1565
  TAOS_RETURN(code);
313✔
1566
}
1567

1568
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
583✔
1569
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1570
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
583!
1571
    return TSDB_CODE_INVALID_PARA;
×
1572
  }
1573
  int32_t code = 0;
583✔
1574
  int32_t sz = taosArrayGetSize(vgs);
583✔
1575
  for (int32_t j = 0; j < sz; j++) {
1,075✔
1576
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
492✔
1577
    MND_TMQ_NULL_CHECK(pVgEp);
492!
1578

1579
    SColumnInfoData *pColInfo = NULL;
492✔
1580
    int32_t          cols = 0;
492✔
1581

1582
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1583
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1584
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
492!
1585

1586
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1587
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1588
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
492!
1589

1590
    // vg id
1591
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1592
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1593
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
492!
1594

1595
    // consumer id
1596
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
492✔
1597
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
492✔
1598
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
492✔
1599

1600
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1601
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1602
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
492!
1603

1604
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
492✔
1605
    if (user) STR_TO_VARSTR(userStr, user);
492✔
1606
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1607
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1608
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
492!
1609

1610
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
492✔
1611
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
492✔
1612
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
492✔
1613
    MND_TMQ_NULL_CHECK(pColInfo);
492!
1614
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
492!
1615

1616
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
492!
1617
          varDataVal(cgroup), pVgEp->vgId);
1618

1619
    // offset
1620
    OffsetRows *data = NULL;
492✔
1621
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,327✔
1622
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
835✔
1623
      MND_TMQ_NULL_CHECK(tmp);
835!
1624
      if (tmp->vgId != pVgEp->vgId) {
835✔
1625
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1626
        continue;
616✔
1627
      }
1628
      data = tmp;
219✔
1629
    }
1630
    if (data) {
492✔
1631
      // vg id
1632
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
219✔
1633
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
219✔
1634
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
219✔
1635
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
219✔
1636
      varDataSetLen(buf, strlen(varDataVal(buf)));
219✔
1637
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1638
      MND_TMQ_NULL_CHECK(pColInfo);
219!
1639
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
219!
1640
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
219✔
1641
      MND_TMQ_NULL_CHECK(pColInfo);
219!
1642
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
219!
1643
    } else {
1644
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
273✔
1645
      MND_TMQ_NULL_CHECK(pColInfo);
273!
1646
      colDataSetNULL(pColInfo, *numOfRows);
273!
1647
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
273✔
1648
      MND_TMQ_NULL_CHECK(pColInfo);
273!
1649
      colDataSetNULL(pColInfo, *numOfRows);
273!
1650
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
273!
1651
    }
1652
    (*numOfRows)++;
492✔
1653
  }
1654
  return 0;
583✔
1655
END:
×
1656
  return code;
×
1657
}
1658

1659
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
83✔
1660
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
83!
1661
    return TSDB_CODE_INVALID_PARA;
×
1662
  }
1663
  SMnode          *pMnode = pReq->info.node;
83✔
1664
  SSdb            *pSdb = pMnode->pSdb;
83✔
1665
  int32_t          numOfRows = 0;
83✔
1666
  SMqSubscribeObj *pSub = NULL;
83✔
1667
  int32_t          code = 0;
83✔
1668

1669
  mInfo("mnd show subscriptions begin");
83!
1670

1671
  while (numOfRows < rowsCapacity) {
405!
1672
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
405✔
1673
    if (pShow->pIter == NULL) {
405✔
1674
      break;
83✔
1675
    }
1676

1677
    taosRLockLatch(&pSub->lock);
322✔
1678

1679
    if (numOfRows + pSub->vgNum > rowsCapacity) {
322!
1680
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1681
    }
1682

1683
    // topic and cgroup
1684
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
322✔
1685
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
322✔
1686
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
322✔
1687
    varDataSetLen(topic, strlen(varDataVal(topic)));
322✔
1688
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
322✔
1689

1690
    SMqConsumerEp *pConsumerEp = NULL;
322✔
1691
    void          *pIter = NULL;
322✔
1692

1693
    while (1) {
261✔
1694
      pIter = taosHashIterate(pSub->consumerHash, pIter);
583✔
1695
      if (pIter == NULL) break;
583✔
1696
      pConsumerEp = (SMqConsumerEp *)pIter;
261✔
1697

1698
      char          *user = NULL;
261✔
1699
      char          *fqdn = NULL;
261✔
1700
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
261✔
1701
      if (pConsumer != NULL) {
261!
1702
        user = pConsumer->user;
261✔
1703
        fqdn = pConsumer->fqdn;
261✔
1704
        sdbRelease(pSdb, pConsumer);
261✔
1705
      }
1706
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
261!
1707
                  pConsumerEp->offsetRows));
1708
    }
1709

1710
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
322!
1711

1712
    pBlock->info.rows = numOfRows;
322✔
1713

1714
    taosRUnLockLatch(&pSub->lock);
322✔
1715
    sdbRelease(pSdb, pSub);
322✔
1716
  }
1717

1718
  mInfo("mnd end show subscriptions");
83!
1719

1720
  pShow->numOfRows += numOfRows;
83✔
1721
  return numOfRows;
83✔
1722

1723
END:
×
1724
  taosRUnLockLatch(&pSub->lock);
×
1725
  sdbRelease(pSdb, pSub);
×
1726

1727
  return code;
×
1728
}
1729

1730
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1731
  if (pMnode == NULL) {
×
1732
    return;
×
1733
  }
1734
  SSdb *pSdb = pMnode->pSdb;
×
1735
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1736
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc