• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4667

14 Aug 2025 01:04PM UTC coverage: 59.532% (-0.6%) from 60.112%
#4667

push

travis-ci

web-flow
fix(query): fix order by column check of union operator (#32524)

136437 of 292055 branches covered (46.72%)

Branch coverage included in aggregate %.

1 of 13 new or added lines in 1 file covered. (7.69%)

2683 existing lines in 164 files now uncovered.

206730 of 284385 relevant lines covered (72.69%)

4603587.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.41
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
995✔
45
  if (pTrans == NULL || pSub == NULL) {
995!
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t  code = 0;
995✔
49
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
995✔
50
  MND_TMQ_NULL_CHECK(pCommitRaw);
995!
51
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
995✔
52
  if (code != 0) {
995!
53
    sdbFreeRaw(pCommitRaw);
×
54
    goto END;
×
55
  }
56
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
995✔
57

58
END:
995✔
59
  return code;
995✔
60
}
61

62
int32_t mndInitSubscribe(SMnode *pMnode) {
2,353✔
63
  SSdbTable table = {
2,353✔
64
      .sdbType = SDB_SUBSCRIBE,
65
      .keyType = SDB_KEY_BINARY,
66
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
67
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
68
      .insertFp = (SdbInsertFp)mndSubActionInsert,
69
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
70
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
71
  };
72

73
  if (pMnode == NULL) {
2,353!
74
    return TSDB_CODE_INVALID_PARA;
×
75
  }
76
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
2,353✔
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
2,353✔
78
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
2,353✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
2,353✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
2,353✔
81

82
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
2,353✔
83
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
2,353✔
84

85
  return sdbSetTable(pMnode->pSdb, table);
2,353✔
86
}
87

88
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
476✔
89
  int32_t     code = 0;
476✔
90
  SSdb*       pSdb = pMnode->pSdb;
476✔
91
  SVgObj*     pVgroup = NULL;
476✔
92
  SQueryPlan* pPlan = NULL;
476✔
93
  SSubplan*   pSubplan = NULL;
476✔
94

95
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
476✔
96
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
399✔
97
    if (pPlan == NULL) {
399!
98
      return TSDB_CODE_QRY_INVALID_INPUT;
×
99
    }
100
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
77✔
101
    SNode* pAst = NULL;
6✔
102
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
103
    if (code != 0) {
6!
104
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
105
      return code;
×
106
    }
107

108
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
109
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
110
    if (code != 0) {
6!
111
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
112
      nodesDestroyNode(pAst);
×
113
      return code;
×
114
    }
115
    nodesDestroyNode(pAst);
6✔
116
  }
117

118
  if (pPlan) {
476✔
119
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
405!
120
    if (levelNum != 1) {
405!
121
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
122
      goto END;
×
123
    }
124

125
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
405✔
126
    if (pNodeListNode == NULL){
405!
127
      code = TSDB_CODE_OUT_OF_MEMORY;
×
128
      goto END;
×
129
    }
130
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
405!
131
    if (opNum != 1) {
405!
132
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
133
      goto END;
×
134
    }
135

136
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
405✔
137
  }
138

139
  void* pIter = NULL;
476✔
140
  while (1) {
2,469✔
141
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
2,945✔
142
    if (pIter == NULL) {
2,945✔
143
      break;
476✔
144
    }
145

146
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,469✔
147
      sdbRelease(pSdb, pVgroup);
1,286✔
148
      continue;
1,286✔
149
    }
150

151
    pSub->vgNum++;
1,183✔
152

153
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,183!
154
    if (pVgEp == NULL){
1,183!
155
      code = terrno;
×
156
      goto END;
×
157
    }
158
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,183✔
159
    pVgEp->vgId = pVgroup->vgId;
1,183✔
160
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,366!
161
      code = terrno;
×
162
      taosMemoryFree(pVgEp);
×
163
      goto END;
×
164
    }
165
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,183!
166
    sdbRelease(pSdb, pVgroup);
1,183✔
167
  }
168

169
  if (pSubplan) {
476✔
170
    int32_t msgLen;
171

172
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
405!
173
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
174
      goto END;
×
175
    }
176
  } else {
177
    pSub->qmsg = taosStrdup("");
71!
178
  }
179

180
END:
476✔
181
  qDestroyQueryPlan(pPlan);
476✔
182
  return code;
476✔
183
}
184

185

186
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
476✔
187
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
476!
188
    return TSDB_CODE_INVALID_PARA;
×
189
  }
190
  int32_t code = 0;
476✔
191
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
476!
192
  (*pSub)->dbUid = pTopic->dbUid;
476✔
193
  (*pSub)->stbUid = pTopic->stbUid;
476✔
194
  (*pSub)->subType = pTopic->subType;
476✔
195
  (*pSub)->withMeta = pTopic->withMeta;
476✔
196

197
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
476!
198
  return code;
476✔
199

200
END:
×
201
  tDeleteSubscribeObj(*pSub);
×
202
  taosMemoryFree(*pSub);
×
203
  return code;
×
204
}
205

206
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,601✔
207
                                    SSubplan *pPlan) {
208
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
2,601!
209
    return TSDB_CODE_INVALID_PARA;
×
210
  }
211
  SMqRebVgReq req = {0};
2,601✔
212
  int32_t     code = 0;
2,601✔
213
  SEncoder encoder = {0};
2,601✔
214

215
  req.oldConsumerId = pRebVg->oldConsumerId;
2,601✔
216
  req.newConsumerId = pRebVg->newConsumerId;
2,601✔
217
  req.vgId = pRebVg->pVgEp->vgId;
2,601✔
218
  if (pPlan) {
2,601✔
219
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
2,146✔
220
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
2,146✔
221
    int32_t msgLen = 0;
2,146✔
222
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
2,146!
223
  } else {
224
    req.qmsg = taosStrdup("");
455!
225
    MND_TMQ_NULL_CHECK(req.qmsg);
455!
226
  }
227
  req.subType = pSub->subType;
2,601✔
228
  req.withMeta = pSub->withMeta;
2,601✔
229
  req.suid = pSub->stbUid;
2,601✔
230
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,601✔
231

232
  int32_t tlen = 0;
2,601✔
233
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,601!
234
  if (code < 0) {
2,601!
235
    goto END;
×
236
  }
237

238
  tlen += sizeof(SMsgHead);
2,601✔
239
  void *buf = taosMemoryMalloc(tlen);
2,601!
240
  MND_TMQ_NULL_CHECK(buf);
2,601!
241
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,601✔
242
  pMsgHead->contLen = htonl(tlen);
2,601✔
243
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,601✔
244

245
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,601✔
246
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,601!
247
  *pBuf = buf;
2,601✔
248
  *pLen = tlen;
2,601✔
249

250
END:
2,601✔
251
  tEncoderClear(&encoder);
2,601✔
252
  taosMemoryFree(req.qmsg);
2,601!
253
  return code;
2,601✔
254
}
255

256
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,601✔
257
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
258
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
2,601!
259
    return TSDB_CODE_INVALID_PARA;
×
260
  }
261
  int32_t code = 0;
2,601✔
262
  void   *buf  = NULL;
2,601✔
263

264
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,601!
265
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
266
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
267
    goto END;
×
268
  }
269

270
  int32_t tlen = 0;
2,601✔
271
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,601!
272
  int32_t vgId = pRebVg->pVgEp->vgId;
2,601✔
273
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,601✔
274
  if (pVgObj == NULL) {
2,601!
275
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
276
    goto END;
×
277
  }
278

279
  STransAction action = {0};
2,601✔
280
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,601✔
281
  action.pCont = buf;
2,601✔
282
  action.contLen = tlen;
2,601✔
283
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,601✔
284

285
  mndReleaseVgroup(pMnode, pVgObj);
2,601✔
286
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,601!
287
  return code;
2,601✔
288

289
END:
×
290
  taosMemoryFree(buf);
×
291
  return code;
×
292
}
293

294
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
3,535✔
295
  if (key == NULL || topic == NULL || cgroup == NULL) {
3,535!
296
    return;
×
297
  }
298
  int32_t i = 0;
3,535✔
299
  while (key[i] != TMQ_SEPARATOR_CHAR) {
27,394✔
300
    i++;
23,859✔
301
  }
302
  (void)memcpy(cgroup, key, i);
3,535✔
303
  cgroup[i] = 0;
3,535✔
304
  if (fullName) {
3,535✔
305
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
3,136✔
306
  } else {
307
    while (key[i] != '.') {
1,197✔
308
      i++;
798✔
309
    }
310
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
399✔
311
  }
312
}
313

314
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,288✔
315
  if (pHash == NULL || key == NULL) {
1,288!
316
    return TSDB_CODE_INVALID_PARA;
×
317
  }
318
  int32_t code = 0;
1,288✔
319
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,288✔
320
  if (pRebInfo == NULL) {
1,288✔
321
    pRebInfo = tNewSMqRebSubscribe(key);
1,216✔
322
    if (pRebInfo == NULL) {
1,216!
323
      code = terrno;
×
324
      goto END;
×
325
    }
326
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,216✔
327
    taosMemoryFreeClear(pRebInfo);
1,216!
328
    if (code != 0) {
1,216!
329
      goto END;
×
330
    }
331
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,216✔
332
    MND_TMQ_NULL_CHECK(pRebInfo);
1,216!
333
  }
334
  if (pReb){
1,288✔
335
    *pReb = pRebInfo;
1,065✔
336
  }
337

338
END:
223✔
339
  return code;
1,288✔
340
}
341

342
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
3,034✔
343
  if (vgs == NULL || pHash == NULL || key == NULL) {
3,034!
344
    return TSDB_CODE_INVALID_PARA;
×
345
  }
346
  int32_t         code = 0;
3,034✔
347
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
3,034✔
348
  MND_TMQ_NULL_CHECK(pVgEp);
3,034!
349
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
3,034✔
350
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
3,034!
351
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
3,034!
352
END:
×
353
  return code;
3,034✔
354
}
355

356
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,216✔
357
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
1,216!
358
    return TSDB_CODE_INVALID_PARA;
×
359
  }
360
  int32_t code = 0;
1,216✔
361
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,216✔
362
  int32_t actualRemoved = 0;
1,216✔
363
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,702✔
364
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
486✔
365
    MND_TMQ_NULL_CHECK(consumerId);
486!
366
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
486✔
367
    if (pConsumerEp == NULL) {
486!
368
      continue;
×
369
    }
370

371
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
486✔
372
    for (int32_t j = 0; j < consumerVgNum; j++) {
1,769✔
373
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,283!
374
    }
375

376
    taosArrayDestroy(pConsumerEp->vgs);
486✔
377
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
486!
378
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
972!
379
    actualRemoved++;
486✔
380
  }
381

382
  if (numOfRemoved != actualRemoved) {
1,216!
383
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
384
           actualRemoved);
385
  } else {
386
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,216!
387
  }
388
END:
×
389
  return code;
1,216✔
390
}
391

392
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,216✔
393
  if (pOutput == NULL || pInput == NULL) {
1,216!
394
    return TSDB_CODE_INVALID_PARA;
×
395
  }
396
  int32_t code = 0;
1,216✔
397
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,216✔
398

399
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,795✔
400
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
579✔
401
    MND_TMQ_NULL_CHECK(consumerId);
579!
402
    SMqConsumerEp newConsumerEp = {0};
579✔
403
    newConsumerEp.consumerId = *consumerId;
579✔
404
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
579✔
405
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
579!
406
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
579!
407
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
1,158!
408
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
579!
409
  }
410
END:
1,216✔
411
  return code;
1,216✔
412
}
413

414
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,216✔
415
  if (pOutput == NULL || pHash == NULL) {
1,216!
416
    return TSDB_CODE_INVALID_PARA;
×
417
  }
418
  int32_t code = 0;
1,216✔
419
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,216✔
420
  for (int32_t i = 0; i < numOfVgroups; i++) {
2,959✔
421
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,743!
422
  }
423
END:
1,216✔
424
  return code;
1,216✔
425
}
426

427
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,216✔
428
                                     int32_t remainderVgCnt) {
429
  if (pOutput == NULL || pHash == NULL) {
1,216!
430
    return TSDB_CODE_INVALID_PARA;
×
431
  }
432
  int32_t code = 0;
1,216✔
433
  int32_t cnt = 0;
1,216✔
434
  void   *pIter = NULL;
1,216✔
435

436
  while (1) {
276✔
437
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,492✔
438
    if (pIter == NULL) {
1,492✔
439
      break;
1,216✔
440
    }
441

442
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
276✔
443
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
276✔
444

445
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
552!
446
    if (consumerVgNum > minVgCnt) {
276✔
447
      if (cnt < remainderVgCnt) {
6✔
448
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
2!
449
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
450
        }
451
        cnt++;
2✔
452
      } else {
453
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
12✔
454
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
8!
455
        }
456
      }
457
    }
458
  }
459
END:
1,216✔
460
  return code;
1,216✔
461
}
462

463
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,216✔
464
  if (pMnode == NULL || pOutput == NULL) {
1,216!
465
    return TSDB_CODE_INVALID_PARA;
×
466
  }
467
  int32_t code = 0;
1,216✔
468
  int32_t totalVgNum = 0;
1,216✔
469
  SVgObj *pVgroup = NULL;
1,216✔
470
  SMqVgEp *pVgEp = NULL;
1,216✔
471
  void   *pIter = NULL;
1,216✔
472
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,216✔
473
  MND_TMQ_NULL_CHECK(newVgs);
1,216!
474
  while (1) {
475
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
7,480✔
476
    if (pIter == NULL) {
7,480✔
477
      break;
1,216✔
478
    }
479
    if (pVgroup->mountVgId) {
6,264!
480
      sdbRelease(pMnode->pSdb, pVgroup);
×
481
      continue;
×
482
    }
483

484
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
6,264✔
485
      sdbRelease(pMnode->pSdb, pVgroup);
3,179✔
486
      continue;
3,179✔
487
    }
488

489
    totalVgNum++;
3,085✔
490
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
3,085!
491
    MND_TMQ_NULL_CHECK(pVgEp);
3,085!
492
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,085✔
493
    pVgEp->vgId = pVgroup->vgId;
3,085✔
494
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
3,085!
495
    pVgEp = NULL;
3,085✔
496
    sdbRelease(pMnode->pSdb, pVgroup);
3,085✔
497
  }
498

499
  pIter = NULL;
1,216✔
500
  while (1) {
762✔
501
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,978✔
502
    if (pIter == NULL) break;
1,978✔
503
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
762✔
504
    int32_t j = 0;
762✔
505
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
2,327✔
506
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,565✔
507
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,565!
508
      bool     find = false;
1,565✔
509
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
2,068✔
510
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
1,845✔
511
        MND_TMQ_NULL_CHECK(pnewVgEp);
1,845!
512
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
1,845✔
513
          tDeleteSMqVgEp(pnewVgEp);
1,342✔
514
          taosArrayRemove(newVgs, k);
1,342✔
515
          find = true;
1,342✔
516
          break;
1,342✔
517
        }
518
      }
519
      if (!find) {
1,565✔
520
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
223!
521
        tDeleteSMqVgEp(pVgEpTmp);
223✔
522
        taosArrayRemove(pConsumerEp->vgs, j);
223✔
523
        continue;
223✔
524
      }
525
      j++;
1,342✔
526
    }
527
  }
528

529
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,216✔
530
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
223!
531
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
223!
532
    taosArrayDestroy(newVgs);
223✔
533
  } else {
534
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
993✔
535
  }
536
  return totalVgNum;
1,216✔
537

538
END:
×
539
  sdbRelease(pMnode->pSdb, pVgroup);
×
540
  taosMemoryFree(pVgEp);
×
541
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
542
  return code;
×
543
}
544

545
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,216✔
546
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,216!
547
    return TSDB_CODE_INVALID_PARA;
×
548
  }
549
  SMqSubscribeObj *pSub = NULL;
1,216✔
550
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,216✔
551
  if( code != 0){
1,216✔
552
    return 0;
476✔
553
  }
554
  taosRLockLatch(&pSub->lock);
740✔
555
  if (pOutput->pSub->offsetRows == NULL) {
740✔
556
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
608✔
557
    if(pOutput->pSub->offsetRows == NULL) {
608!
558
      taosRUnLockLatch(&pSub->lock);
×
559
      code = terrno;
×
560
      goto END;
×
561
    }
562
  }
563
  void *pIter = NULL;
740✔
564
  while (1) {
762✔
565
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,502✔
566
    if (pIter == NULL) break;
1,502✔
567
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
762✔
568
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
762✔
569

570
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
2,281✔
571
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,519✔
572
      MND_TMQ_NULL_CHECK(d1);
1,519!
573
      bool        jump = false;
1,519✔
574
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
1,980✔
575
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
492✔
576
        MND_TMQ_NULL_CHECK(pVgEp);
492!
577
        if (pVgEp->vgId == d1->vgId) {
492✔
578
          jump = true;
31✔
579
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
31!
580
                pConsumerEp->consumerId, pVgEp->vgId);
581
          break;
31✔
582
        }
583
      }
584
      if (jump) continue;
1,519✔
585
      bool find = false;
1,488✔
586
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
3,488✔
587
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
2,169✔
588
        MND_TMQ_NULL_CHECK(d2);
2,169!
589
        if (d1->vgId == d2->vgId) {
2,169✔
590
          d2->rows += d1->rows;
169✔
591
          d2->offset = d1->offset;
169✔
592
          d2->ever = d1->ever;
169✔
593
          find = true;
169✔
594
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
169!
595
          break;
169✔
596
        }
597
      }
598
      if (!find) {
1,488✔
599
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,638!
600
      }
601
    }
602
  }
603
  taosRUnLockLatch(&pSub->lock);
740✔
604
  mndReleaseSubscribe(pMnode, pSub);
740✔
605

606
END:
740✔
607
  return code;
740✔
608
}
609

610
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,216✔
611
  if (pOutput == NULL) return;
1,216!
612
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,216!
613
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
4,250✔
614
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
3,034✔
615
    if (pOutputRebVg == NULL) continue;
3,034!
616
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
3,034!
617
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
618
  }
619

620
  void *pIter = NULL;
1,216✔
621
  while (1) {
855✔
622
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,071✔
623
    if (pIter == NULL) break;
2,071✔
624
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
855✔
625
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
855✔
626
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
855!
627
          pConsumerEp->consumerId, sz);
628
    for (int32_t i = 0; i < sz; i++) {
2,728✔
629
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
1,873✔
630
      if (pVgEp == NULL) continue;
1,873!
631
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
1,873!
632
            pConsumerEp->consumerId);
633
    }
634
  }
635
}
636

637
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,216✔
638
                           int32_t *remainderVgCnt) {
639
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
1,216!
640
    return;
×
641
  }
642
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,216✔
643
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,216✔
644
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,216✔
645

646
  // calc num
647
  if (numOfFinal != 0) {
1,216✔
648
    *minVgCnt = totalVgNum / numOfFinal;
793✔
649
    *remainderVgCnt = totalVgNum % numOfFinal;
793✔
650
  } else {
651
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
423!
652
  }
653
  mInfo(
1,216!
654
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
655
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
656
}
657

658
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,216✔
659
  if (pOutput == NULL || pHash == NULL) {
1,216!
660
    return TSDB_CODE_INVALID_PARA;
×
661
  }
662
  SMqRebOutputVg *pRebVg = NULL;
1,216✔
663
  void           *pAssignIter = NULL;
1,216✔
664
  void           *pIter = NULL;
1,216✔
665
  int32_t         code = 0;
1,216✔
666

667
  while (1) {
855✔
668
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,071✔
669
    if (pIter == NULL) {
2,071✔
670
      break;
1,216✔
671
    }
672
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
855✔
673
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,650✔
674
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,795✔
675
      if (pAssignIter == NULL) {
1,795!
676
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
677
        break;
×
678
      }
679

680
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,795✔
681
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,795✔
682
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,590!
683
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,795!
684
            pConsumerEp->consumerId);
685
    }
686
  }
687

688
  while (1) {
28✔
689
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,244✔
690
    if (pIter == NULL) {
1,244✔
691
      break;
423✔
692
    }
693
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
821✔
694
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
821✔
695
      pAssignIter = taosHashIterate(pHash, pAssignIter);
820✔
696
      if (pAssignIter == NULL) {
820✔
697
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
793!
698
        break;
793✔
699
      }
700

701
      pRebVg = (SMqRebOutputVg *)pAssignIter;
27✔
702
      pRebVg->newConsumerId = pConsumerEp->consumerId;
27✔
703
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
54!
704
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
27!
705
            pConsumerEp->consumerId);
706
    }
707
  }
708

709
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,216✔
710
  if (pAssignIter != NULL) {
1,216!
711
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
712
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
713
    goto END;
×
714
  }
715
  while (1) {
3,034✔
716
    pAssignIter = taosHashIterate(pHash, pAssignIter);
4,250✔
717
    if (pAssignIter == NULL) {
4,250✔
718
      break;
1,216✔
719
    }
720

721
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
3,034✔
722
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
6,068!
723
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
3,034✔
724
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
2,424!
725
    }
726
  }
727

728
END:
1,216✔
729
  return code;
1,216✔
730
}
731

732
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,216✔
733
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,216!
734
    return TSDB_CODE_INVALID_PARA;
×
735
  }
736
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,216✔
737
  if (totalVgNum < 0){
1,216!
738
    return totalVgNum;
×
739
  }
740
  const char *pSubKey = pOutput->pSub->key;
1,216✔
741
  int32_t     minVgCnt = 0;
1,216✔
742
  int32_t     remainderVgCnt = 0;
1,216✔
743
  int32_t     code = 0;
1,216✔
744
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,216✔
745
  MND_TMQ_NULL_CHECK(pHash);
1,216!
746
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,216!
747
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,216!
748
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,216✔
749
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,216!
750
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,216!
751
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,216!
752
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,216!
753
  printRebalanceLog(pOutput);
1,216✔
754
  taosHashCleanup(pHash);
1,216✔
755

756
END:
1,216✔
757
  return code;
1,216✔
758
}
759

760
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
2,985✔
761
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
2,985!
762
    return TSDB_CODE_INVALID_PARA;
×
763
  }
764
  int32_t         code = 0;
2,985✔
765
  SMqConsumerObj *pConsumerNew = NULL;
2,985✔
766
  int32_t         consumerNum = taosArrayGetSize(consumers);
2,985✔
767
  for (int32_t i = 0; i < consumerNum; i++) {
4,105✔
768
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
1,120✔
769
    MND_TMQ_NULL_CHECK(consumerId);
1,120!
770
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
1,120!
771
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,120!
772
    tDeleteSMqConsumerObj(pConsumerNew);
1,120✔
773
  }
774
  pConsumerNew = NULL;
2,985✔
775

776
END:
2,985✔
777
  tDeleteSMqConsumerObj(pConsumerNew);
2,985✔
778
  return code;
2,985✔
779
}
780

781
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
995✔
782
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
995!
783
    return TSDB_CODE_INVALID_PARA;
×
784
  }
785
  int32_t code = 0;
995✔
786
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
995!
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
995!
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
995!
789
END:
995✔
790
  return code;
995✔
791
}
792

793
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,216✔
794
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
1,216!
795
    return TSDB_CODE_INVALID_PARA;
×
796
  }
797
  struct SSubplan *pPlan = NULL;
1,216✔
798
  int32_t          code = 0;
1,216✔
799
  STrans          *pTrans = NULL;
1,216✔
800

801
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,216✔
802
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
974!
803
  }
804

805
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,216✔
806
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,216✔
807
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,216✔
808

809
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,216✔
810
  if (pTrans == NULL) {
1,216!
811
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
812
    if (terrno != 0) code = terrno;
×
813
    goto END;
×
814
  }
815

816
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,216✔
817
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,216✔
818

819
  // 1. redo action: action to all vg
820
  const SArray *rebVgs = pOutput->rebVgs;
995✔
821
  int32_t       vgNum = taosArrayGetSize(rebVgs);
995✔
822
  for (int32_t i = 0; i < vgNum; i++) {
3,596✔
823
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,601✔
824
    MND_TMQ_NULL_CHECK(pRebVg);
2,601!
825
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,601!
826
  }
827

828
  // 2. commit log: subscribe and vg assignment
829
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
995!
830

831
  // 3. commit log: consumer to update status and epoch
832
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
995!
833

834
  // 4. set cb
835
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
995✔
836

837
  // 5. execution
838
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
995!
839

840
END:
995✔
841
  nodesDestroyNode((SNode *)pPlan);
1,216✔
842
  mndTransDrop(pTrans);
1,216✔
843
  TAOS_RETURN(code);
1,216✔
844
}
845

846
static void freeRebalanceItem(void *param) {
1,216✔
847
  if (param == NULL) return;
1,216!
848
  SMqRebInfo *pInfo = param;
1,216✔
849
  taosArrayDestroy(pInfo->newConsumers);
1,216✔
850
  taosArrayDestroy(pInfo->removedConsumers);
1,216✔
851
}
852

853
// type = 0 remove  type = 1 add
854
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
1,899✔
855
  if (rebSubHash == NULL || topicList == NULL) {
1,899!
856
    return TSDB_CODE_INVALID_PARA;
×
857
  }
858
  taosRLockLatch(&pConsumer->lock);
1,899✔
859
  int32_t code = 0;
1,899✔
860
  int32_t topicNum = taosArrayGetSize(topicList);
1,899✔
861
  for (int32_t i = 0; i < topicNum; i++) {
2,964✔
862
    char *removedTopic = taosArrayGetP(topicList, i);
1,065✔
863
    MND_TMQ_NULL_CHECK(removedTopic);
1,065!
864
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,065✔
865
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
1,065✔
866
    SMqRebInfo *pRebSub = NULL;
1,065✔
867
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
1,065!
868
    if (type == 0)
1,065✔
869
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
972!
870
    else if (type == 1)
579!
871
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
1,158!
872
  }
873

874
END:
1,899✔
875
  taosRUnLockLatch(&pConsumer->lock);
1,899✔
876
  return code;
1,899✔
877
}
878

879
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,643✔
880
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
2,643!
881
    return;
×
882
  }
883
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,643✔
884
  for (int32_t i = 0; i < newTopicNum; i++) {
5,430✔
885
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,787✔
886
    if (topic == NULL){
2,787!
887
      continue;
×
888
    }
889
    SMqSubscribeObj *pSub = NULL;
2,787✔
890
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,787✔
891
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,787✔
892
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,787✔
893
    if (code != 0) {
2,787!
894
      continue;
×
895
    }
896
    taosRLockLatch(&pSub->lock);
2,787✔
897

898
    // iterate all vg assigned to the consumer of that topic
899
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,787✔
900
    if (pConsumerEp == NULL){
2,787!
901
      taosRUnLockLatch(&pSub->lock);
×
902
      mndReleaseSubscribe(pMnode, pSub);
×
903
      continue;
×
904
    }
905
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,787✔
906
    for (int32_t j = 0; j < vgNum; j++) {
8,380✔
907
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
5,593✔
908
      if (pVgEp == NULL) {
5,593!
909
        continue;
×
910
      }
911
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
5,593✔
912
      if (!pVgroup) {
5,593✔
913
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
223✔
914
        if (code != 0){
223!
915
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
916
        }else{
917
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
223!
918
        }
919
      }
920
      mndReleaseVgroup(pMnode, pVgroup);
5,593✔
921
    }
922
    taosRUnLockLatch(&pSub->lock);
2,787✔
923
    mndReleaseSubscribe(pMnode, pSub);
2,787✔
924
  }
925
}
926

927
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
29,734✔
928
  if (pMsg == NULL || rebSubHash == NULL) {
29,734!
929
    return TSDB_CODE_INVALID_PARA;
×
930
  }
931
  SMnode         *pMnode = pMsg->info.node;
29,734✔
932
  SSdb           *pSdb = pMnode->pSdb;
29,734✔
933
  SMqConsumerObj *pConsumer = NULL;
29,734✔
934
  void           *pIter = NULL;
29,734✔
935
  int32_t         code = 0;
29,734✔
936

937
  // iterate all consumers, find all modification
938
  while (1) {
3,986✔
939
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
33,720✔
940
    if (pIter == NULL) {
33,720✔
941
      break;
29,734✔
942
    }
943

944
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,986✔
945
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,986✔
946
    int32_t status = atomic_load_32(&pConsumer->status);
3,986✔
947

948
    mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,986!
949
           ", hbstatus:%d, pollStatus:%d",
950
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
951
           pConsumer->createTime, hbStatus, pollStatus);
952

953
    if (status == MQ_CONSUMER_STATUS_READY) {
3,986✔
954
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
3,040✔
955
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
390!
956
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,650✔
957
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,644✔
958
        mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
7!
959
           ", hb lost cnt:%d, or long time no poll cnt:%d",
960
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
961
           pConsumer->createTime, hbStatus, pollStatus);
962
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
7!
963
      } else {
964
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,643✔
965
      }
966
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
946!
967
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
946!
968
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
946!
969
    } else {
970
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
971
    }
972

973
    mndReleaseConsumer(pMnode, pConsumer);
3,986✔
974
  }
975
END:
29,734✔
976
  return code;
29,734✔
977
}
978

979
bool mndRebTryStart() {
29,734✔
980
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
29,734✔
981
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
29,734!
982
}
983

984
void mndRebCntInc() {
1,005✔
985
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
1,005✔
986
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
1,005!
987
}
1,005✔
988

989
void mndRebCntDec() {
30,739✔
990
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
30,739✔
991
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
30,739!
992
}
30,739✔
993

994
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,216✔
995
  if (rebOutput == NULL) {
1,216!
996
    return;
×
997
  }
998
  taosArrayDestroy(rebOutput->newConsumers);
1,216✔
999
  taosArrayDestroy(rebOutput->modifyConsumers);
1,216✔
1000
  taosArrayDestroy(rebOutput->removedConsumers);
1,216✔
1001
  taosArrayDestroy(rebOutput->rebVgs);
1,216✔
1002
  tDeleteSubscribeObj(rebOutput->pSub);
1,216✔
1003
  taosMemoryFree(rebOutput->pSub);
1,216!
1004
}
1005

1006
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,216✔
1007
  if (rebOutput == NULL) {
1,216!
1008
    return TSDB_CODE_INVALID_PARA;
×
1009
  }
1010
  int32_t code = 0;
1,216✔
1011
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,216✔
1012
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,216!
1013
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,216✔
1014
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,216!
1015
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,216✔
1016
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,216!
1017
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,216✔
1018
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,216!
1019
  return code;
1,216✔
1020

1021
END:
×
1022
  clearRebOutput(rebOutput);
×
1023
  return code;
×
1024
}
1025

1026
// This function only works when there are dirty consumers
1027
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
740✔
1028
  if (pMnode == NULL || pSub == NULL) {
740!
1029
    return TSDB_CODE_INVALID_PARA;
×
1030
  }
1031
  int32_t code = 0;
740✔
1032
  void   *pIter = NULL;
740✔
1033
  while (1) {
762✔
1034
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,502✔
1035
    if (pIter == NULL) {
1,502✔
1036
      break;
740✔
1037
    }
1038

1039
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
762✔
1040
    SMqConsumerObj *pConsumer = NULL;
762✔
1041
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
762✔
1042
    if (code == 0) {
762!
1043
      mndReleaseConsumer(pMnode, pConsumer);
762✔
1044
      continue;
762✔
1045
    }
1046
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
1047
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
1048

1049
    taosArrayDestroy(pConsumerEp->vgs);
×
1050
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
1051
  }
1052
END:
740✔
1053
  return code;
740✔
1054
}
1055

1056
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,216✔
1057
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
1,216!
1058
    return TSDB_CODE_INVALID_PARA;
×
1059
  }
1060
  const char      *key = rebInput->pRebInfo->key;
1,216✔
1061
  SMqSubscribeObj *pSub = NULL;
1,216✔
1062
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,216✔
1063

1064
  if (code != 0) {
1,216✔
1065
    // split sub key and extract topic
1066
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
476✔
1067
    char cgroup[TSDB_CGROUP_LEN] = {0};
476✔
1068
    mndSplitSubscribeKey(key, topic, cgroup, true);
476✔
1069
    SMqTopicObj *pTopic = NULL;
476✔
1070
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
476!
1071
    taosRLockLatch(&pTopic->lock);
476✔
1072

1073
    rebInput->oldConsumerNum = 0;
476✔
1074
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
476✔
1075
    if (code != 0) {
476!
1076
      mError("tmq rebalance mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
1077
      taosRUnLockLatch(&pTopic->lock);
×
1078
      mndReleaseTopic(pMnode, pTopic);
×
1079
      return code;
×
1080
    }
1081

1082
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
476✔
1083
    taosRUnLockLatch(&pTopic->lock);
476✔
1084
    mndReleaseTopic(pMnode, pTopic);
476✔
1085

1086
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
476!
1087
  } else {
1088
    taosRLockLatch(&pSub->lock);
740✔
1089
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
740✔
1090
    if(code != 0){
740!
1091
      taosRUnLockLatch(&pSub->lock);
×
1092
      goto END;
×
1093
    }
1094
    code = checkConsumer(pMnode, rebOutput->pSub);
740✔
1095
    if(code != 0){
740!
1096
      taosRUnLockLatch(&pSub->lock);
×
1097
      goto END;
×
1098
    }
1099
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
740✔
1100
    taosRUnLockLatch(&pSub->lock);
740✔
1101

1102
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
740!
1103
    mndReleaseSubscribe(pMnode, pSub);
740✔
1104
  }
1105

1106
END:
1,216✔
1107
  return code;
1,216✔
1108
}
1109

1110
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
29,734✔
1111
  if (pMsg == NULL) {
29,734!
1112
    return TSDB_CODE_INVALID_PARA;
×
1113
  }
1114
  int     code = 0;
29,734✔
1115
  void   *pIter = NULL;
29,734✔
1116
  SMnode *pMnode = pMsg->info.node;
29,734✔
1117
  PRINT_LOG_START;
29,734✔
1118
  if (!mndRebTryStart()) {
29,734!
1119
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
1120
    return code;
×
1121
  }
1122

1123
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
29,734✔
1124
  MND_TMQ_NULL_CHECK(rebSubHash);
29,734!
1125

1126
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
29,734✔
1127

1128
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
29,734!
1129
  if (taosHashGetSize(rebSubHash) > 0) {
29,734✔
1130
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
1,013!
1131
  }
1132

1133
  while (1) {
1,216✔
1134
    pIter = taosHashIterate(rebSubHash, pIter);
30,950✔
1135
    if (pIter == NULL) {
30,950✔
1136
      break;
29,734✔
1137
    }
1138

1139
    SMqRebInputObj  rebInput = {0};
1,216✔
1140
    SMqRebOutputObj rebOutput = {0};
1,216✔
1141
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,216!
1142
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,216✔
1143
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,216✔
1144
    if (code != 0) {
1,216!
1145
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1146
    }
1147

1148
    if (code == 0){
1,216!
1149
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,216✔
1150
      if (code != 0) {
1,216!
1151
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1152
      }
1153
    }
1154

1155
    if (code == 0){
1,216!
1156
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,216✔
1157
      if (code != 0) {
1,216✔
1158
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
221!
1159
      }
1160
    }
1161

1162
    clearRebOutput(&rebOutput);
1,216✔
1163
  }
1164

1165
  if (taosHashGetSize(rebSubHash) > 0) {
29,734✔
1166
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
1,013!
1167
  }
1168

1169
END:
28,721✔
1170
  taosHashCancelIterate(rebSubHash, pIter);
29,734✔
1171
  taosHashCleanup(rebSubHash);
29,734✔
1172
  mndRebCntDec();
29,734✔
1173

1174
  PRINT_LOG_END(code);
29,734!
1175
  TAOS_RETURN(code);
29,734✔
1176
}
1177

1178
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
235✔
1179
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
235!
1180
    return TSDB_CODE_INVALID_PARA;
×
1181
  }
1182
  void   *pIter = NULL;
235✔
1183
  SVgObj *pVgObj = NULL;
235✔
1184
  int32_t code = 0;
235✔
1185
  while (1) {
1,591✔
1186
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
1,826✔
1187
    if (pIter == NULL) {
1,826✔
1188
      break;
235✔
1189
    }
1190
    if (pVgObj->mountVgId) {
1,591!
1191
      sdbRelease(pMnode->pSdb, pVgObj);
×
1192
      continue;
798✔
1193
    }
1194

1195
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,591✔
1196
      sdbRelease(pMnode->pSdb, pVgObj);
798✔
1197
      continue;
798✔
1198
    }
1199
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
793!
1200
    MND_TMQ_NULL_CHECK(pReq);
793!
1201
    pReq->head.vgId = htonl(pVgObj->vgId);
793✔
1202
    pReq->vgId = pVgObj->vgId;
793✔
1203
    pReq->consumerId = -1;
793✔
1204
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
793✔
1205

1206
    STransAction action = {0};
793✔
1207
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
793✔
1208
    action.pCont = pReq;
793✔
1209
    action.contLen = sizeof(SMqVDeleteReq);
793✔
1210
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
793✔
1211
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
793✔
1212

1213
    sdbRelease(pMnode->pSdb, pVgObj);
793✔
1214
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
793!
1215
  }
1216

1217
END:
235✔
1218
  sdbRelease(pMnode->pSdb, pVgObj);
235✔
1219
  sdbCancelFetch(pMnode->pSdb, pIter);
235✔
1220
  return code;
235✔
1221
}
1222

1223
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
1✔
1224
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
1!
1225
    return TSDB_CODE_INVALID_PARA;
×
1226
  }
1227
  void           *pIter = NULL;
1✔
1228
  SMqConsumerObj *pConsumer = NULL;
1✔
1229
  SMqConsumerObj *pConsumerNew = NULL;
1✔
1230
  int             code = 0;
1✔
1231
  while (1) {
1232
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
2✔
1233
    if (pIter == NULL) {
2✔
1234
      break;
1✔
1235
    }
1236

1237
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
1!
1238
      sdbRelease(pMnode->pSdb, pConsumer);
×
1239
      continue;
×
1240
    }
1241

1242
    if (deleteConsumer) {
1!
1243
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
×
1244
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
×
1245
      tDeleteSMqConsumerObj(pConsumerNew);
×
1246
      pConsumerNew = NULL;
×
1247
    } else {
1248
      bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1249
      if (found){
1!
1250
        mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1251
               topic, pConsumer->consumerId, pConsumer->cgroup);
1252
        code = TSDB_CODE_MND_CGROUP_USED;
×
1253
        goto END;
×
1254
      }
1255
    }
1256

1257

1258
    sdbRelease(pMnode->pSdb, pConsumer);
1✔
1259
  }
1260

1261
END:
1✔
1262
  tDeleteSMqConsumerObj(pConsumerNew);
1✔
1263
  sdbRelease(pMnode->pSdb, pConsumer);
1✔
1264
  sdbCancelFetch(pMnode->pSdb, pIter);
1✔
1265
  return code;
1✔
1266
}
1267

1268
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
1✔
1269
  if (pMsg == NULL) {
1!
1270
    return TSDB_CODE_INVALID_PARA;
×
1271
  }
1272
  SMnode         *pMnode = pMsg->info.node;
1✔
1273
  SMDropCgroupReq dropReq = {0};
1✔
1274
  STrans         *pTrans = NULL;
1✔
1275
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
1✔
1276
  SMqSubscribeObj *pSub = NULL;
1✔
1277

1278
  PRINT_LOG_START
1!
1279
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
1!
1280
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1✔
1281
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
1✔
1282
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1✔
1283
  if (code != 0) {
1!
1284
    if (dropReq.igNotExists) {
×
1285
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1286
      return 0;
×
1287
    } else {
1288
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1289
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1290
      return code;
×
1291
    }
1292
  }
1293

1294
  taosWLockLatch(&pSub->lock);
1✔
1295
  if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
1!
1296
    code = TSDB_CODE_MND_CGROUP_USED;
×
1297
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1298
    goto END;
×
1299
  }
1300

1301
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
1✔
1302
  MND_TMQ_NULL_CHECK(pTrans);
1!
1303
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
1!
1304
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
1✔
1305
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1!
1306
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
1!
1307
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
1!
1308
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
1!
1309
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1!
1310

1311
END:
1✔
1312
  taosWUnLockLatch(&pSub->lock);
1✔
1313
  mndReleaseSubscribe(pMnode, pSub);
1✔
1314
  mndTransDrop(pTrans);
1✔
1315
  PRINT_LOG_END(code);
1!
1316

1317
  if (code != 0) {
1!
1318
    TAOS_RETURN(code);
×
1319
  }
1320
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
1✔
1321
}
1322

1323
void mndCleanupSubscribe(SMnode *pMnode) {}
2,353✔
1324

1325
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,633✔
1326
  if (pSub == NULL) {
1,633!
1327
    return NULL;
×
1328
  }
1329
  int32_t code = 0;
1,633✔
1330
  int32_t lino = 0;
1,633✔
1331
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,633✔
1332
  void   *buf = NULL;
1,633✔
1333
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,633✔
1334
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,633!
1335
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,633✔
1336

1337
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,633✔
1338
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,633!
1339

1340
  buf = taosMemoryMalloc(tlen);
1,633!
1341
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,633!
1342

1343
  void *abuf = buf;
1,633✔
1344
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,633!
1345
    goto SUB_ENCODE_OVER;
×
1346
  }
1347

1348
  int32_t dataPos = 0;
1,633✔
1349
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,633!
1350
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,633!
1351
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,633!
1352
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,633!
1353

1354
  terrno = TSDB_CODE_SUCCESS;
1,633✔
1355

1356
SUB_ENCODE_OVER:
1,633✔
1357
  taosMemoryFreeClear(buf);
1,633!
1358
  if (terrno != TSDB_CODE_SUCCESS) {
1,633!
1359
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1360
    sdbFreeRaw(pRaw);
×
1361
    return NULL;
×
1362
  }
1363

1364
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,633!
1365
  return pRaw;
1,633✔
1366
}
1367

1368
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,263✔
1369
  if (pRaw == NULL) {
1,263!
1370
    return NULL;
×
1371
  }
1372
  int32_t code = 0;
1,263✔
1373
  int32_t lino = 0;
1,263✔
1374
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,263✔
1375
  SSdbRow         *pRow = NULL;
1,263✔
1376
  SMqSubscribeObj *pSub = NULL;
1,263✔
1377
  void            *buf = NULL;
1,263✔
1378

1379
  int8_t sver = 0;
1,263✔
1380
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,263!
1381

1382
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,263!
1383
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1384
    goto SUB_DECODE_OVER;
×
1385
  }
1386

1387
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,263✔
1388
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,263!
1389

1390
  pSub = sdbGetRowObj(pRow);
1,263✔
1391
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,263!
1392

1393
  int32_t dataPos = 0;
1,263✔
1394
  int32_t tlen;
1395
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,263!
1396
  buf = taosMemoryMalloc(tlen);
1,263!
1397
  if (buf == NULL) goto SUB_DECODE_OVER;
1,263!
1398
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,263!
1399
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,263!
1400

1401
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,263!
1402
    goto SUB_DECODE_OVER;
×
1403
  }
1404

1405
  // update epset saved in mnode
1406
  if (pSub->unassignedVgs != NULL) {
1,263!
1407
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,263✔
1408
    for (int32_t i = 0; i < size; ++i) {
3,270✔
1409
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
2,007✔
1410
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
2,007✔
1411
    }
1412
  }
1413
  if (pSub->consumerHash != NULL) {
1,263!
1414
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,263✔
1415
    while (pIter) {
1,933✔
1416
      SMqConsumerEp *pConsumerEp = pIter;
670✔
1417
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
670✔
1418
      for (int32_t i = 0; i < size; ++i) {
2,168✔
1419
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,498✔
1420
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,498✔
1421
      }
1422
      pIter = taosHashIterate(pSub->consumerHash, pIter);
670✔
1423
    }
1424
  }
1425

1426
  terrno = TSDB_CODE_SUCCESS;
1,263✔
1427

1428
SUB_DECODE_OVER:
1,263✔
1429
  taosMemoryFreeClear(buf);
1,263!
1430
  if (terrno != TSDB_CODE_SUCCESS) {
1,263!
1431
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1432
    taosMemoryFreeClear(pRow);
×
1433
    return NULL;
×
1434
  }
1435

1436
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,263!
1437
  return pRow;
1,263✔
1438
}
1439

1440
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
505✔
1441
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
505!
1442
  return 0;
505✔
1443
}
1444

1445
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,263✔
1446
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
1,263!
1447
  tDeleteSubscribeObj(pSub);
1,263✔
1448
  return 0;
1,263✔
1449
}
1450

1451
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
523✔
1452
  if (pOldSub == NULL || pNewSub == NULL) return -1;
523!
1453
  mTrace("subscribe:%s, perform update action", pOldSub->key);
523!
1454
  taosWLockLatch(&pOldSub->lock);
523✔
1455

1456
  SHashObj *tmp = pOldSub->consumerHash;
523✔
1457
  pOldSub->consumerHash = pNewSub->consumerHash;
523✔
1458
  pNewSub->consumerHash = tmp;
523✔
1459

1460
  SArray *tmp1 = pOldSub->unassignedVgs;
523✔
1461
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
523✔
1462
  pNewSub->unassignedVgs = tmp1;
523✔
1463

1464
  SArray *tmp2 = pOldSub->offsetRows;
523✔
1465
  pOldSub->offsetRows = pNewSub->offsetRows;
523✔
1466
  pNewSub->offsetRows = tmp2;
523✔
1467

1468
  taosWUnLockLatch(&pOldSub->lock);
523✔
1469
  return 0;
523✔
1470
}
1471

1472
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
11,147✔
1473
  if (pMnode == NULL || key == NULL || pSub == NULL){
11,147!
1474
    return TSDB_CODE_INVALID_PARA;
×
1475
  }
1476
  SSdb            *pSdb = pMnode->pSdb;
11,147✔
1477
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
11,147✔
1478
  if (*pSub == NULL) {
11,147✔
1479
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
952✔
1480
  }
1481
  return 0;
10,195✔
1482
}
1483

1484
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
604✔
1485
  if (pMnode == NULL || topicName == NULL) return 0;
604!
1486
  int32_t num = 0;
604✔
1487
  SSdb   *pSdb = pMnode->pSdb;
604✔
1488

1489
  void            *pIter = NULL;
604✔
1490
  SMqSubscribeObj *pSub = NULL;
604✔
1491
  while (1) {
996✔
1492
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
1,600✔
1493
    if (pIter == NULL) break;
1,600✔
1494

1495
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
996✔
1496
    char cgroup[TSDB_CGROUP_LEN] = {0};
996✔
1497
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
996✔
1498
    if (strcmp(topic, topicName) != 0) {
996✔
1499
      sdbRelease(pSdb, pSub);
726✔
1500
      continue;
726✔
1501
    }
1502

1503
    num++;
270✔
1504
    sdbRelease(pSdb, pSub);
270✔
1505
  }
1506

1507
  return num;
604✔
1508
}
1509

1510
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
10,195✔
1511
  if (pMnode == NULL || pSub == NULL) return;
10,195!
1512
  SSdb *pSdb = pMnode->pSdb;
10,195✔
1513
  sdbRelease(pSdb, pSub);
10,195✔
1514
}
1515

1516
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
235✔
1517
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
235!
1518
  int32_t  code = 0;
235✔
1519
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
235✔
1520
  MND_TMQ_NULL_CHECK(pCommitRaw);
235!
1521
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
235✔
1522
  if (code != 0){
235!
1523
    sdbFreeRaw(pCommitRaw);
×
1524
    goto END;
×
1525
  }
1526
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
235✔
1527
END:
235✔
1528
  return code;
235✔
1529
}
1530

1531
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
321✔
1532
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
321!
1533
  SSdb            *pSdb = pMnode->pSdb;
321✔
1534
  int32_t          code = 0;
321✔
1535
  void            *pIter = NULL;
321✔
1536
  SMqSubscribeObj *pSub = NULL;
321✔
1537
  while (1) {
439✔
1538
    sdbRelease(pSdb, pSub);
760✔
1539
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
760✔
1540
    if (pIter == NULL) break;
760✔
1541

1542
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
448✔
1543
    char cgroup[TSDB_CGROUP_LEN] = {0};
448✔
1544
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
448✔
1545
    if (strcmp(topic, topicName) != 0) {
448✔
1546
      continue;
205✔
1547
    }
1548

1549
    // iter all vnode to delete handle
1550
    if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
243✔
1551
      code = TSDB_CODE_MND_IN_REBALANCE;
9✔
1552
      goto END;
9✔
1553
    }
1554

1555
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
234!
1556
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
234!
1557
  }
1558

1559
END:
321✔
1560
  sdbRelease(pSdb, pSub);
321✔
1561
  sdbCancelFetch(pSdb, pIter);
321✔
1562

1563
  TAOS_RETURN(code);
321✔
1564
}
1565

1566
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
737✔
1567
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1568
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
737!
1569
    return TSDB_CODE_INVALID_PARA;
×
1570
  }
1571
  int32_t code = 0;
737✔
1572
  int32_t sz = taosArrayGetSize(vgs);
737✔
1573
  for (int32_t j = 0; j < sz; j++) {
1,306✔
1574
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
569✔
1575
    MND_TMQ_NULL_CHECK(pVgEp);
569!
1576

1577
    SColumnInfoData *pColInfo = NULL;
569✔
1578
    int32_t          cols = 0;
569✔
1579

1580
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1581
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1582
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
569!
1583

1584
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1585
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1586
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
569!
1587

1588
    // vg id
1589
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1590
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1591
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
569!
1592

1593
    // consumer id
1594
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
569✔
1595
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
569✔
1596
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
569✔
1597

1598
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1599
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1600
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
569!
1601

1602
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
569✔
1603
    if (user) STR_TO_VARSTR(userStr, user);
569✔
1604
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1605
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1606
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
569!
1607

1608
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
569✔
1609
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
569✔
1610
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
569✔
1611
    MND_TMQ_NULL_CHECK(pColInfo);
569!
1612
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
569!
1613

1614
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
569!
1615
          varDataVal(cgroup), pVgEp->vgId);
1616

1617
    // offset
1618
    OffsetRows *data = NULL;
569✔
1619
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,503✔
1620
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
934✔
1621
      MND_TMQ_NULL_CHECK(tmp);
934!
1622
      if (tmp->vgId != pVgEp->vgId) {
934✔
1623
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1624
        continue;
664✔
1625
      }
1626
      data = tmp;
270✔
1627
    }
1628
    if (data) {
569✔
1629
      // vg id
1630
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
270✔
1631
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
270✔
1632
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
270✔
1633
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
270✔
1634
      varDataSetLen(buf, strlen(varDataVal(buf)));
270✔
1635
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
270✔
1636
      MND_TMQ_NULL_CHECK(pColInfo);
270!
1637
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
270!
1638
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
270✔
1639
      MND_TMQ_NULL_CHECK(pColInfo);
270!
1640
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
270!
1641
    } else {
1642
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
299✔
1643
      MND_TMQ_NULL_CHECK(pColInfo);
299!
1644
      colDataSetNULL(pColInfo, *numOfRows);
299!
1645
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
299✔
1646
      MND_TMQ_NULL_CHECK(pColInfo);
299!
1647
      colDataSetNULL(pColInfo, *numOfRows);
299!
1648
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
299!
1649
    }
1650
    (*numOfRows)++;
569✔
1651
  }
1652
  return 0;
737✔
1653
END:
×
1654
  return code;
×
1655
}
1656

1657
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
99✔
1658
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
99!
1659
    return TSDB_CODE_INVALID_PARA;
×
1660
  }
1661
  SMnode          *pMnode = pReq->info.node;
99✔
1662
  SSdb            *pSdb = pMnode->pSdb;
99✔
1663
  int32_t          numOfRows = 0;
99✔
1664
  SMqSubscribeObj *pSub = NULL;
99✔
1665
  int32_t          code = 0;
99✔
1666

1667
  mInfo("mnd show subscriptions begin");
99!
1668

1669
  while (numOfRows < rowsCapacity) {
498!
1670
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
498✔
1671
    if (pShow->pIter == NULL) {
498✔
1672
      break;
99✔
1673
    }
1674

1675
    taosRLockLatch(&pSub->lock);
399✔
1676

1677
    if (numOfRows + pSub->vgNum > rowsCapacity) {
399!
1678
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1679
    }
1680

1681
    // topic and cgroup
1682
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
399✔
1683
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
399✔
1684
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
399✔
1685
    varDataSetLen(topic, strlen(varDataVal(topic)));
399✔
1686
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
399✔
1687

1688
    SMqConsumerEp *pConsumerEp = NULL;
399✔
1689
    void          *pIter = NULL;
399✔
1690

1691
    while (1) {
338✔
1692
      pIter = taosHashIterate(pSub->consumerHash, pIter);
737✔
1693
      if (pIter == NULL) break;
737✔
1694
      pConsumerEp = (SMqConsumerEp *)pIter;
338✔
1695

1696
      char          *user = NULL;
338✔
1697
      char          *fqdn = NULL;
338✔
1698
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
338✔
1699
      if (pConsumer != NULL) {
338!
1700
        user = pConsumer->user;
338✔
1701
        fqdn = pConsumer->fqdn;
338✔
1702
        sdbRelease(pSdb, pConsumer);
338✔
1703
      }
1704
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
338!
1705
                  pConsumerEp->offsetRows));
1706
    }
1707

1708
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
399!
1709

1710
    pBlock->info.rows = numOfRows;
399✔
1711

1712
    taosRUnLockLatch(&pSub->lock);
399✔
1713
    sdbRelease(pSdb, pSub);
399✔
1714
  }
1715

1716
  mInfo("mnd end show subscriptions");
99!
1717

1718
  pShow->numOfRows += numOfRows;
99✔
1719
  return numOfRows;
99✔
1720

1721
END:
×
1722
  taosRUnLockLatch(&pSub->lock);
×
1723
  sdbRelease(pSdb, pSub);
×
1724

UNCOV
1725
  return code;
×
1726
}
1727

1728
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1729
  if (pMnode == NULL) {
×
1730
    return;
×
1731
  }
1732
  SSdb *pSdb = pMnode->pSdb;
×
1733
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc