• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.62
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
24,928✔
45
  if (pTrans == NULL || pSub == NULL) {
24,928!
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t  code = 0;
24,928✔
49
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
24,928✔
50
  MND_TMQ_NULL_CHECK(pCommitRaw);
24,928!
51
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
24,928✔
52
  if (code != 0) {
24,928!
53
    sdbFreeRaw(pCommitRaw);
×
54
    goto END;
×
55
  }
56
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
24,928✔
57

58
END:
24,928✔
59
  return code;
24,928✔
60
}
61

62
int32_t mndInitSubscribe(SMnode *pMnode) {
122,046✔
63
  SSdbTable table = {
122,046✔
64
      .sdbType = SDB_SUBSCRIBE,
65
      .keyType = SDB_KEY_BINARY,
66
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
67
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
68
      .insertFp = (SdbInsertFp)mndSubActionInsert,
69
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
70
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
71
  };
72

73
  if (pMnode == NULL) {
122,046!
74
    return TSDB_CODE_INVALID_PARA;
×
75
  }
76
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
122,046✔
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
122,046✔
78
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
122,046✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
122,046✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
122,046✔
81

82
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
122,046✔
83
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
122,046✔
84

85
  return sdbSetTable(pMnode->pSdb, table);
122,046✔
86
}
87

88
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
10,322✔
89
  int32_t     code = 0;
10,322✔
90
  SSdb*       pSdb = pMnode->pSdb;
10,322✔
91
  SVgObj*     pVgroup = NULL;
10,322✔
92
  SQueryPlan* pPlan = NULL;
10,322✔
93
  SSubplan*   pSubplan = NULL;
10,322✔
94

95
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
10,322✔
96
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
7,949✔
97
    if (pPlan == NULL) {
7,949!
98
      return TSDB_CODE_QRY_INVALID_INPUT;
×
99
    }
100
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
2,373!
UNCOV
101
    SNode* pAst = NULL;
×
UNCOV
102
    code = nodesStringToNode(pTopic->ast, &pAst);
×
UNCOV
103
    if (code != 0) {
×
104
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
105
      return code;
×
106
    }
107

UNCOV
108
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
×
UNCOV
109
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
×
UNCOV
110
    if (code != 0) {
×
111
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
112
      nodesDestroyNode(pAst);
×
113
      return code;
×
114
    }
UNCOV
115
    nodesDestroyNode(pAst);
×
116
  }
117

118
  if (pPlan) {
10,322✔
119
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
7,949!
120
    if (levelNum != 1) {
7,949!
121
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
122
      goto END;
×
123
    }
124

125
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
7,949✔
126
    if (pNodeListNode == NULL){
7,949!
127
      code = TSDB_CODE_OUT_OF_MEMORY;
×
128
      goto END;
×
129
    }
130
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
7,949!
131
    if (opNum != 1) {
7,949!
132
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
133
      goto END;
×
134
    }
135

136
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
7,949✔
137
  }
138

139
  void* pIter = NULL;
10,322✔
140
  while (1) {
51,280✔
141
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
61,602✔
142
    if (pIter == NULL) {
61,602✔
143
      break;
10,322✔
144
    }
145

146
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
51,280✔
147
      sdbRelease(pSdb, pVgroup);
19,450✔
148
      continue;
19,450✔
149
    }
150

151
    pSub->vgNum++;
31,830✔
152

153
    SMqVgEp pVgEp = {0};
31,830✔
154
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
31,830✔
155
    pVgEp.vgId = pVgroup->vgId;
31,830✔
156
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
63,660!
157
      code = terrno;
×
158
      sdbRelease(pSdb, pVgroup);
×
159
      goto END;
×
160
    }
161
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
31,830!
162
    sdbRelease(pSdb, pVgroup);
31,830✔
163
  }
164

165
  if (pSubplan) {
10,322✔
166
    int32_t msgLen = 0;
7,949✔
167
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
7,949!
168
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
169
      goto END;
×
170
    }
171
  } else {
172
    pSub->qmsg = taosStrdup("");
2,373!
173
    if (pSub->qmsg == NULL) {
2,373!
174
      code = terrno;
×
175
      goto END;
×
176
    }
177
  }
178

179
END:
10,322✔
180
  qDestroyQueryPlan(pPlan);
10,322✔
181
  return code;
10,322✔
182
}
183

184

185
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
10,322✔
186
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
10,322!
187
    return TSDB_CODE_INVALID_PARA;
×
188
  }
189
  int32_t code = 0;
10,322✔
190
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
10,322!
191
  (*pSub)->dbUid = pTopic->dbUid;
10,322✔
192
  (*pSub)->stbUid = pTopic->stbUid;
10,322✔
193
  (*pSub)->subType = pTopic->subType;
10,322✔
194
  (*pSub)->withMeta = pTopic->withMeta;
10,322✔
195

196
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
10,322!
197

198
END:
10,322✔
199
  return code;
10,322✔
200
}
201

202
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
71,681✔
203
                                    SSubplan *pPlan) {
204
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
71,681!
205
    return TSDB_CODE_INVALID_PARA;
×
206
  }
207
  SMqRebVgReq req = {0};
71,681✔
208
  int32_t     code = 0;
71,681✔
209
  SEncoder encoder = {0};
71,681✔
210

211
  req.oldConsumerId = pRebVg->oldConsumerId;
71,681✔
212
  req.newConsumerId = pRebVg->newConsumerId;
71,681✔
213
  req.vgId = pRebVg->pVgEp.vgId;
71,681✔
214
  if (pPlan) {
71,681✔
215
    pPlan->execNode.epSet = pRebVg->pVgEp.epSet;
57,369✔
216
    pPlan->execNode.nodeId = pRebVg->pVgEp.vgId;
57,369✔
217
    int32_t msgLen = 0;
57,369✔
218
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
57,369!
219
  } else {
220
    req.qmsg = taosStrdup("");
14,312!
221
    MND_TMQ_NULL_CHECK(req.qmsg);
14,312!
222
  }
223
  req.subType = pSub->subType;
71,681✔
224
  req.withMeta = pSub->withMeta;
71,681✔
225
  req.suid = pSub->stbUid;
71,681✔
226
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
71,681!
227

228
  int32_t tlen = 0;
71,681✔
229
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
71,681!
230
  if (code < 0) {
71,681!
231
    goto END;
×
232
  }
233

234
  tlen += sizeof(SMsgHead);
71,681✔
235
  void *buf = taosMemoryMalloc(tlen);
71,681!
236
  MND_TMQ_NULL_CHECK(buf);
71,681!
237
  SMsgHead *pMsgHead = (SMsgHead *)buf;
71,681✔
238
  pMsgHead->contLen = htonl(tlen);
71,681✔
239
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
71,681✔
240

241
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
71,681✔
242
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
71,681!
243
  *pBuf = buf;
71,681✔
244
  *pLen = tlen;
71,681✔
245

246
END:
71,681✔
247
  tEncoderClear(&encoder);
71,681✔
248
  taosMemoryFree(req.qmsg);
71,681!
249
  return code;
71,681✔
250
}
251

252
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
71,681✔
253
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
254
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
71,681!
255
    return TSDB_CODE_INVALID_PARA;
×
256
  }
257
  int32_t code = 0;
71,681✔
258
  void   *buf  = NULL;
71,681✔
259

260
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
71,681!
261
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
262
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
263
    goto END;
×
264
  }
265

266
  int32_t tlen = 0;
71,681✔
267
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
71,681!
268
  int32_t vgId = pRebVg->pVgEp.vgId;
71,681✔
269
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
71,681✔
270
  if (pVgObj == NULL) {
71,681!
271
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
272
    goto END;
×
273
  }
274

275
  STransAction action = {0};
71,681✔
276
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
71,681✔
277
  action.pCont = buf;
71,681✔
278
  action.contLen = tlen;
71,681✔
279
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
71,681✔
280

281
  mndReleaseVgroup(pMnode, pVgObj);
71,681✔
282
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
71,681!
283
  return code;
71,681✔
284

285
END:
×
286
  taosMemoryFree(buf);
×
287
  return code;
×
288
}
289

290
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
51,616✔
291
  if (key == NULL || topic == NULL || cgroup == NULL) {
51,616!
292
    return;
×
293
  }
294
  int32_t i = 0;
51,616✔
295
  while (key[i] != TMQ_SEPARATOR_CHAR) {
318,731✔
296
    i++;
267,115✔
297
  }
298
  (void)memcpy(cgroup, key, i);
51,616!
299
  cgroup[i] = 0;
51,616✔
300
  if (fullName) {
51,616✔
301
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
48,104!
302
  } else {
303
    while (key[i] != '.') {
10,536✔
304
      i++;
7,024✔
305
    }
306
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
3,512!
307
  }
308
}
309

310
static void freeRebalanceItem(void *param) {
24,928✔
311
  if (param == NULL) return;
24,928!
312
  SMqRebInfo *pInfo = param;
24,928✔
313
  taosArrayDestroy(pInfo->newConsumers);
24,928✔
314
  taosArrayDestroy(pInfo->removedConsumers);
24,928✔
315
}
316

317
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
25,803✔
318
  if (pHash == NULL || key == NULL) {
25,803!
319
    return TSDB_CODE_INVALID_PARA;
×
320
  }
321
  int32_t code = 0;
25,803✔
322
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
25,803!
323
  if (pRebInfo == NULL) {
25,803✔
324
    pRebInfo = tNewSMqRebSubscribe(key);
24,928✔
325
    if (pRebInfo == NULL) {
24,928!
326
      code = terrno;
×
327
      goto END;
×
328
    }
329
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
24,928!
330
    if (code != 0) {
24,928!
331
      freeRebalanceItem(pRebInfo);
×
332
      taosMemoryFreeClear(pRebInfo);
×
333
      goto END;
×
334
    }
335
    taosMemoryFreeClear(pRebInfo);
24,928!
336

337
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
24,928!
338
    MND_TMQ_NULL_CHECK(pRebInfo);
24,928!
339
  }
340
  if (pReb){
25,803!
341
    *pReb = pRebInfo;
25,803✔
342
  }
343

UNCOV
344
END:
×
345
  return code;
25,803✔
346
}
347

348
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
71,681✔
349
  if (vgs == NULL || pHash == NULL || key == NULL) {
71,681!
350
    return TSDB_CODE_INVALID_PARA;
×
351
  }
352
  int32_t         code = 0;
71,681✔
353
  SMqVgEp       *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
71,681✔
354
  MND_TMQ_NULL_CHECK(pVgEp);
71,681!
355
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
71,681✔
356
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
71,681!
357
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
71,681!
358
END:
71,681✔
359
  return code;
71,681✔
360
}
361

362
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
24,928✔
363
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
24,928!
364
    return TSDB_CODE_INVALID_PARA;
×
365
  }
366
  int32_t code = 0;
24,928✔
367
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
24,928✔
368
  int32_t actualRemoved = 0;
24,928✔
369
  for (int32_t i = 0; i < numOfRemoved; i++) {
37,824✔
370
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
12,896✔
371
    MND_TMQ_NULL_CHECK(consumerId);
12,896!
372
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
12,896✔
373
    if (pConsumerEp == NULL) {
12,896!
374
      continue;
×
375
    }
376

377
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
12,896✔
378
    for (int32_t j = 0; j < consumerVgNum; j++) {
49,510✔
379
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
36,614!
380
    }
381

382
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
12,896!
383
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
25,792!
384
    actualRemoved++;
12,896✔
385
  }
386

387
  if (numOfRemoved != actualRemoved) {
24,928!
388
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
389
           actualRemoved);
390
  } else {
391
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
24,928!
392
  }
393
END:
×
394
  return code;
24,928✔
395
}
396

397
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
24,928✔
398
  if (pOutput == NULL || pInput == NULL) {
24,928!
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401
  int32_t code = 0;
24,928✔
402
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
24,928✔
403

404
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
37,835✔
405
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
12,907✔
406
    MND_TMQ_NULL_CHECK(consumerId);
12,907!
407
    SMqConsumerEp newConsumerEp = {0};
12,907✔
408
    newConsumerEp.consumerId = *consumerId;
12,907✔
409
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
12,907✔
410
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
12,907!
411
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) != 0) {
12,907!
412
      freeSMqConsumerEp(&newConsumerEp);
×
413
      code = terrno;
×
414
      goto END;
×
415
    }
416
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
25,814!
417
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
12,907!
418
  }
419
END:
24,928✔
420
  return code;
24,928✔
421
}
422

423
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
24,928✔
424
  if (pOutput == NULL || pHash == NULL) {
24,928!
425
    return TSDB_CODE_INVALID_PARA;
×
426
  }
427
  int32_t code = 0;
24,928✔
428
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
24,928✔
429
  for (int32_t i = 0; i < numOfVgroups; i++) {
59,119✔
430
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
34,191!
431
  }
432
END:
24,928✔
433
  return code;
24,928✔
434
}
435

436
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
24,928✔
437
                                     int32_t remainderVgCnt) {
438
  if (pOutput == NULL || pHash == NULL) {
24,928!
439
    return TSDB_CODE_INVALID_PARA;
×
440
  }
441
  int32_t code = 0;
24,928✔
442
  int32_t cnt = 0;
24,928✔
443
  void   *pIter = NULL;
24,928✔
444

445
  while (1) {
1,751✔
446
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
26,679✔
447
    if (pIter == NULL) {
26,679✔
448
      break;
24,928✔
449
    }
450

451
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
1,751✔
452
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
1,751✔
453

454
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
3,502!
455
    if (consumerVgNum > minVgCnt) {
1,751✔
456
      if (cnt < remainderVgCnt) {
438!
UNCOV
457
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
×
458
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
459
        }
UNCOV
460
        cnt++;
×
461
      } else {
462
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
1,314✔
463
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
876!
464
        }
465
      }
466
    }
467
  }
468
END:
24,928✔
469
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
24,928✔
470
  return code;
24,928✔
471
}
472

473
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
24,928✔
474
  if (pMnode == NULL || pOutput == NULL) {
24,928!
475
    return TSDB_CODE_INVALID_PARA;
×
476
  }
477
  int32_t code = 0;
24,928✔
478
  int32_t totalVgNum = 0;
24,928✔
479
  SVgObj *pVgroup = NULL;
24,928✔
480
  void   *pIter = NULL;
24,928✔
481
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
24,928✔
482
  MND_TMQ_NULL_CHECK(newVgs);
24,928!
483
  while (1) {
123,523✔
484
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
148,451✔
485
    if (pIter == NULL) {
148,451✔
486
      break;
24,928✔
487
    }
488
    if (pVgroup->mountVgId) {
123,523!
489
      sdbRelease(pMnode->pSdb, pVgroup);
×
490
      continue;
×
491
    }
492

493
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
123,523✔
494
      sdbRelease(pMnode->pSdb, pVgroup);
48,702✔
495
      continue;
48,702✔
496
    }
497

498
    totalVgNum++;
74,821✔
499
    SMqVgEp pVgEp = {0};
74,821✔
500
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
74,821✔
501
    pVgEp.vgId = pVgroup->vgId;
74,821✔
502
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
74,821!
503
    sdbRelease(pMnode->pSdb, pVgroup);
74,821✔
504
  }
505

506
  pIter = NULL;
24,928✔
507
  while (1) {
14,647✔
508
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
39,575✔
509
    if (pIter == NULL) break;
39,575✔
510
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
14,647✔
511
    int32_t j = 0;
14,647✔
512
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
55,277✔
513
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
40,630✔
514
      MND_TMQ_NULL_CHECK(pVgEpTmp);
40,630!
515
      bool     find = false;
40,630✔
516
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
44,134!
517
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
44,134✔
518
        MND_TMQ_NULL_CHECK(pnewVgEp);
44,134!
519
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
44,134✔
520
          taosArrayRemove(newVgs, k);
40,630✔
521
          find = true;
40,630✔
522
          break;
40,630✔
523
        }
524
      }
525
      if (!find) {
40,630!
UNCOV
526
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
×
UNCOV
527
        taosArrayRemove(pConsumerEp->vgs, j);
×
UNCOV
528
        continue;
×
529
      }
530
      j++;
40,630✔
531
    }
532
  }
533

534
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
24,928!
UNCOV
535
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
×
UNCOV
536
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
×
537
  }
538
  taosArrayDestroy(newVgs);
24,928✔
539

540
  return totalVgNum;
24,928✔
541

542
END:
×
543
  sdbRelease(pMnode->pSdb, pVgroup);
×
544
  taosArrayDestroy(newVgs);
×
545
  return code;
×
546
}
547

548
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
24,928✔
549
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
24,928!
550
    return TSDB_CODE_INVALID_PARA;
×
551
  }
552
  void *pIter = NULL;
24,928✔
553
  SMqSubscribeObj *pSub = NULL;
24,928✔
554
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
24,928✔
555
  if( code != 0){
24,928✔
556
    return 0;
10,322✔
557
  }
558
  taosRLockLatch(&pSub->lock);
14,606✔
559
  if (pOutput->pSub->offsetRows == NULL) {
14,606✔
560
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
10,673✔
561
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
10,673!
562
  }
563
  while (1) {
14,647✔
564
    pIter = taosHashIterate(pSub->consumerHash, pIter);
29,253✔
565
    if (pIter == NULL) break;
29,253✔
566
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
14,647✔
567
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
14,647✔
568

569
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
53,856✔
570
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
39,209✔
571
      MND_TMQ_NULL_CHECK(d1);
39,209!
572
      bool        jump = false;
39,209✔
573
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
42,531✔
574
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
6,462✔
575
        MND_TMQ_NULL_CHECK(pVgEp);
6,462!
576
        if (pVgEp->vgId == d1->vgId) {
6,462✔
577
          jump = true;
3,140✔
578
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
3,140!
579
                pConsumerEp->consumerId, pVgEp->vgId);
580
          break;
3,140✔
581
        }
582
      }
583
      if (jump) continue;
39,209✔
584
      bool find = false;
36,069✔
585
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
81,920✔
586
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
50,112✔
587
        MND_TMQ_NULL_CHECK(d2);
50,112!
588
        if (d1->vgId == d2->vgId) {
50,112✔
589
          d2->rows += d1->rows;
4,261✔
590
          d2->offset = d1->offset;
4,261✔
591
          d2->ever = d1->ever;
4,261✔
592
          find = true;
4,261✔
593
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
4,261!
594
          break;
4,261✔
595
        }
596
      }
597
      if (!find) {
36,069✔
598
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
63,616!
599
      }
600
    }
601
  }
602

603
END:
14,606✔
604
  taosRUnLockLatch(&pSub->lock);
14,606✔
605
  taosHashCancelIterate(pSub->consumerHash, pIter);  
14,606✔
606
  mndReleaseSubscribe(pMnode, pSub);
14,606✔
607

608
  return code;
14,606✔
609
}
610

611
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
24,928✔
612
  if (pOutput == NULL) return;
24,928!
613
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
24,928!
614
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
96,609✔
615
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
71,681✔
616
    if (pOutputRebVg == NULL) continue;
71,681!
617
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
71,681!
618
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
619
  }
620

621
  void *pIter = NULL;
24,928✔
622
  while (1) {
14,658✔
623
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
39,586✔
624
    if (pIter == NULL) break;
39,586✔
625
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
14,658✔
626
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
14,658✔
627
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
14,658!
628
          pConsumerEp->consumerId, sz);
629
    for (int32_t i = 0; i < sz; i++) {
55,310✔
630
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
40,652✔
631
      if (pVgEp == NULL) continue;
40,652!
632
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
40,652!
633
            pConsumerEp->consumerId);
634
    }
635
  }
636
}
637

638
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
24,928✔
639
                           int32_t *remainderVgCnt) {
640
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
24,928!
641
    return;
×
642
  }
643
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
24,928✔
644
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
24,928✔
645
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
24,928✔
646

647
  // calc num
648
  if (numOfFinal != 0) {
24,928✔
649
    *minVgCnt = totalVgNum / numOfFinal;
13,345!
650
    *remainderVgCnt = totalVgNum % numOfFinal;
13,345!
651
  } else {
652
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
11,583!
653
  }
654
  mInfo(
24,928!
655
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
656
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
657
}
658

659
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
24,928✔
660
  if (pOutput == NULL || pHash == NULL) {
24,928!
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
663
  SMqRebOutputVg *pRebVg = NULL;
24,928✔
664
  void           *pAssignIter = NULL;
24,928✔
665
  void           *pIter = NULL;
24,928✔
666
  int32_t         code = 0;
24,928✔
667

668
  while (1) {
14,658✔
669
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
39,586✔
670
    if (pIter == NULL) {
39,586✔
671
      break;
24,928✔
672
    }
673
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
14,658✔
674
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
51,989✔
675
      pAssignIter = taosHashIterate(pHash, pAssignIter);
37,331✔
676
      if (pAssignIter == NULL) {
37,331!
677
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
678
        break;
×
679
      }
680

681
      pRebVg = (SMqRebOutputVg *)pAssignIter;
37,331✔
682
      pRebVg->newConsumerId = pConsumerEp->consumerId;
37,331✔
683
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
74,662!
684
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
37,331!
685
            pConsumerEp->consumerId);
686
    }
687
  }
688

689
  while (1) {
181✔
690
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
25,109✔
691
    if (pIter == NULL) {
25,109✔
692
      break;
11,583✔
693
    }
694
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
13,526✔
695
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
13,526!
696
      pAssignIter = taosHashIterate(pHash, pAssignIter);
13,526✔
697
      if (pAssignIter == NULL) {
13,526✔
698
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
13,345!
699
        break;
13,345✔
700
      }
701

702
      pRebVg = (SMqRebOutputVg *)pAssignIter;
181✔
703
      pRebVg->newConsumerId = pConsumerEp->consumerId;
181✔
704
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
362!
705
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
181!
706
            pConsumerEp->consumerId);
707
    }
708
  }
709

710
  if (pAssignIter != NULL) {
24,928!
711
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
712
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
713
    goto END;
×
714
  }
715
  while (1) {
71,681✔
716
    pAssignIter = taosHashIterate(pHash, pAssignIter);
96,609✔
717
    if (pAssignIter == NULL) {
96,609✔
718
      break;
24,928✔
719
    }
720

721
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
71,681✔
722
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
143,362!
723
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
71,681✔
724
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
68,338!
725
    }
726
  }
727

728
END:
24,928✔
729
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
24,928✔
730
  taosHashCancelIterate(pHash, pAssignIter);
24,928✔
731
  return code;
24,928✔
732
}
733

734
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
24,928✔
735
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
24,928!
736
    return TSDB_CODE_INVALID_PARA;
×
737
  }
738
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
24,928✔
739
  if (totalVgNum < 0){
24,928!
740
    return totalVgNum;
×
741
  }
742
  const char *pSubKey = pOutput->pSub->key;
24,928✔
743
  int32_t     minVgCnt = 0;
24,928✔
744
  int32_t     remainderVgCnt = 0;
24,928✔
745
  int32_t     code = 0;
24,928✔
746
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
24,928✔
747
  MND_TMQ_NULL_CHECK(pHash);
24,928!
748
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
24,928!
749
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
24,928!
750
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
24,928✔
751
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
24,928!
752
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
24,928!
753
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
24,928!
754
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
24,928!
755
  printRebalanceLog(pOutput);
24,928✔
756

757
END:
24,928✔
758
  taosHashCleanup(pHash);
24,928✔
759
  return code;
24,928✔
760
}
761

762
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
74,784✔
763
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
74,784!
764
    return TSDB_CODE_INVALID_PARA;
×
765
  }
766
  int32_t         code = 0;
74,784✔
767
  SMqConsumerObj *pConsumerNew = NULL;
74,784✔
768
  int32_t         consumerNum = taosArrayGetSize(consumers);
74,784✔
769
  for (int32_t i = 0; i < consumerNum; i++) {
102,338✔
770
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
27,554✔
771
    MND_TMQ_NULL_CHECK(consumerId);
27,554!
772
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
27,554!
773
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
27,554!
774
    tDeleteSMqConsumerObj(pConsumerNew);
27,554✔
775
  }
776
  pConsumerNew = NULL;
74,784✔
777

778
END:
74,784✔
779
  tDeleteSMqConsumerObj(pConsumerNew);
74,784✔
780
  return code;
74,784✔
781
}
782

783
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
24,928✔
784
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
24,928!
785
    return TSDB_CODE_INVALID_PARA;
×
786
  }
787
  int32_t code = 0;
24,928✔
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
24,928!
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
24,928!
790
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
24,928!
791
END:
24,928✔
792
  return code;
24,928✔
793
}
794

795
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
24,928✔
796
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
24,928!
797
    return TSDB_CODE_INVALID_PARA;
×
798
  }
799
  struct SSubplan *pPlan = NULL;
24,928✔
800
  int32_t          code = 0;
24,928✔
801
  STrans          *pTrans = NULL;
24,928✔
802

803
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
24,928✔
804
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
20,187!
805
  }
806

807
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
24,928✔
808
  char cgroup[TSDB_CGROUP_LEN] = {0};
24,928✔
809
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
24,928✔
810

811
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
24,928✔
812
  if (pTrans == NULL) {
24,928!
813
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
814
    if (terrno != 0) code = terrno;
×
815
    goto END;
×
816
  }
817

818
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
24,928✔
819
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
24,928!
820

821
  // 1. redo action: action to all vg
822
  const SArray *rebVgs = pOutput->rebVgs;
24,928✔
823
  int32_t       vgNum = taosArrayGetSize(rebVgs);
24,928✔
824
  for (int32_t i = 0; i < vgNum; i++) {
96,609✔
825
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
71,681✔
826
    MND_TMQ_NULL_CHECK(pRebVg);
71,681!
827
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
71,681!
828
  }
829

830
  // 2. commit log: subscribe and vg assignment
831
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
24,928!
832

833
  // 3. commit log: consumer to update status and epoch
834
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
24,928!
835

836
  // 4. set cb
837
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
24,928✔
838

839
  // 5. execution
840
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
24,928!
841

842
END:
24,928✔
843
  nodesDestroyNode((SNode *)pPlan);
24,928✔
844
  mndTransDrop(pTrans);
24,928✔
845
  TAOS_RETURN(code);
24,928✔
846
}
847

848
// type = 0 remove  type = 1 add
849
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
49,061✔
850
  if (rebSubHash == NULL || topicList == NULL) {
49,061!
851
    return TSDB_CODE_INVALID_PARA;
×
852
  }
853
  taosRLockLatch(&pConsumer->lock);
49,061✔
854
  int32_t code = 0;
49,061✔
855
  int32_t topicNum = taosArrayGetSize(topicList);
49,061✔
856
  for (int32_t i = 0; i < topicNum; i++) {
74,864✔
857
    char *removedTopic = taosArrayGetP(topicList, i);
25,803✔
858
    MND_TMQ_NULL_CHECK(removedTopic);
25,803!
859
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
25,803✔
860
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
25,803✔
861
    SMqRebInfo *pRebSub = NULL;
25,803✔
862
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
25,803!
863
    if (type == 0)
25,803✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
25,792!
865
    else if (type == 1)
12,907!
866
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
25,814!
867
  }
868

869
END:
49,061✔
870
  taosRUnLockLatch(&pConsumer->lock);
49,061✔
871
  return code;
49,061✔
872
}
873

874
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
88,013✔
875
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
88,013!
876
    return;
×
877
  }
878
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
88,013✔
879
  for (int32_t i = 0; i < newTopicNum; i++) {
177,376✔
880
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
89,363✔
881
    if (topic == NULL){
89,363!
882
      continue;
×
883
    }
884
    SMqSubscribeObj *pSub = NULL;
89,363✔
885
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
89,363✔
886
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
89,363!
887
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
89,363✔
888
    if (code != 0) {
89,363!
889
      continue;
×
890
    }
891
    taosRLockLatch(&pSub->lock);
89,363✔
892

893
    // iterate all vg assigned to the consumer of that topic
894
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
89,363✔
895
    if (pConsumerEp == NULL){
89,363!
896
      taosRUnLockLatch(&pSub->lock);
×
897
      mndReleaseSubscribe(pMnode, pSub);
×
898
      continue;
×
899
    }
900
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
89,363✔
901
    for (int32_t j = 0; j < vgNum; j++) {
357,261✔
902
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
267,898✔
903
      if (pVgEp == NULL) {
267,898!
904
        continue;
×
905
      }
906
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
267,898✔
907
      if (!pVgroup) {
267,898!
UNCOV
908
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
×
UNCOV
909
        if (code != 0){
×
910
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
911
        }else{
UNCOV
912
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
×
913
        }
914
      }
915
      mndReleaseVgroup(pMnode, pVgroup);
267,898✔
916
    }
917
    taosRUnLockLatch(&pSub->lock);
89,363✔
918
    mndReleaseSubscribe(pMnode, pSub);
89,363✔
919
  }
920
}
921

922
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
3,017,935✔
923
  if (pMsg == NULL || rebSubHash == NULL) {
3,017,935!
924
    return TSDB_CODE_INVALID_PARA;
×
925
  }
926
  SMnode         *pMnode = pMsg->info.node;
3,017,935✔
927
  SSdb           *pSdb = pMnode->pSdb;
3,017,935✔
928
  SMqConsumerObj *pConsumer = NULL;
3,017,935✔
929
  void           *pIter = NULL;
3,017,935✔
930
  int32_t         code = 0;
3,017,935✔
931

932
  // iterate all consumers, find all modification
933
  while (1) {
125,363✔
934
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
3,143,298✔
935
    if (pIter == NULL) {
3,143,298✔
936
      break;
3,017,935✔
937
    }
938

939
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
125,363✔
940
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
125,363✔
941
    int32_t status = atomic_load_32(&pConsumer->status);
125,363✔
942

943
    mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
125,363!
944
           ", hbstatus:%d, pollStatus:%d",
945
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
946
           pConsumer->createTime, hbStatus, pollStatus);
947

948
    if (status == MQ_CONSUMER_STATUS_READY) {
125,363✔
949
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
101,333✔
950
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
12,319!
951
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
89,014✔
952
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
88,651✔
953
        mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,001!
954
           ", hb lost cnt:%d, or long time no poll cnt:%d",
955
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
956
           pConsumer->createTime, hbStatus, pollStatus);
957
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
1,001!
958
      } else {
959
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
88,013✔
960
      }
961
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
24,030!
962
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
24,030!
963
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
24,030!
964
    } else {
965
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
966
    }
967

968
    mndReleaseConsumer(pMnode, pConsumer);
125,363✔
969
    pConsumer = NULL;
125,363✔
970
  }
971
END:
3,017,935✔
972
  mndReleaseConsumer(pMnode, pConsumer);
3,017,935✔
973
  return code;
3,017,935✔
974
}
975

976
bool mndRebTryStart() {
3,024,302✔
977
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
3,024,302✔
978
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
3,024,302!
979
}
980

981
void mndRebCntInc() {
24,928✔
982
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
24,928✔
983
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
24,928!
984
}
24,928✔
985

986
void mndRebCntDec() {
3,042,863✔
987
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
3,042,863✔
988
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
3,042,863!
989
}
3,042,863✔
990

991
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
24,928✔
992
  if (rebOutput == NULL) {
24,928!
993
    return;
×
994
  }
995
  taosArrayDestroy(rebOutput->newConsumers);
24,928✔
996
  rebOutput->newConsumers = NULL;
24,928✔
997
  taosArrayDestroy(rebOutput->modifyConsumers);
24,928✔
998
  rebOutput->modifyConsumers = NULL;
24,928✔
999
  taosArrayDestroy(rebOutput->removedConsumers);
24,928✔
1000
  rebOutput->removedConsumers = NULL;
24,928✔
1001
  taosArrayDestroy(rebOutput->rebVgs);
24,928✔
1002
  rebOutput->rebVgs = NULL;
24,928✔
1003
  tDeleteSubscribeObj(rebOutput->pSub);
24,928✔
1004
  taosMemoryFree(rebOutput->pSub);
24,928!
1005
  rebOutput->pSub = NULL;
24,928✔
1006
}
1007

1008
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
24,928✔
1009
  if (rebOutput == NULL) {
24,928!
1010
    return TSDB_CODE_INVALID_PARA;
×
1011
  }
1012
  int32_t code = 0;
24,928✔
1013
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
24,928✔
1014
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
24,928!
1015
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
24,928✔
1016
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
24,928!
1017
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
24,928✔
1018
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
24,928!
1019
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
24,928✔
1020
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
24,928!
1021
  return code;
24,928✔
1022

1023
END:
×
1024
  clearRebOutput(rebOutput);
×
1025
  return code;
×
1026
}
1027

1028
// This function only works when there are dirty consumers
1029
// static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
1030
//   if (pMnode == NULL || pSub == NULL) {
1031
//     return TSDB_CODE_INVALID_PARA;
1032
//   }
1033
//   int32_t code = 0;
1034
//   void   *pIter = NULL;
1035
//   while (1) {
1036
//     pIter = taosHashIterate(pSub->consumerHash, pIter);
1037
//     if (pIter == NULL) {
1038
//       break;
1039
//     }
1040

1041
//     SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
1042
//     SMqConsumerObj *pConsumer = NULL;
1043
//     code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
1044
//     if (code == 0) {
1045
//       mndReleaseConsumer(pMnode, pConsumer);
1046
//       continue;
1047
//     }
1048
//     mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
1049
//     MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
1050

1051
//     MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
1052
//     pIter = NULL;  // restart iterate since hash changed
1053
//   }
1054
// END:
1055
//   taosHashCancelIterate(pSub->consumerHash, pIter);
1056
//   return code;
1057
// }
1058

1059
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
24,928✔
1060
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
24,928!
1061
    return TSDB_CODE_INVALID_PARA;
×
1062
  }
1063
  const char      *key = rebInput->pRebInfo->key;
24,928✔
1064
  SMqSubscribeObj *pSub = NULL;
24,928✔
1065
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
24,928✔
1066

1067
  if (code != 0) {
24,928✔
1068
    // split sub key and extract topic
1069
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
10,322✔
1070
    char cgroup[TSDB_CGROUP_LEN] = {0};
10,322✔
1071
    mndSplitSubscribeKey(key, topic, cgroup, true);
10,322✔
1072
    SMqTopicObj *pTopic = NULL;
10,322✔
1073
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
10,322!
1074
    taosRLockLatch(&pTopic->lock);
10,322✔
1075

1076
    rebInput->oldConsumerNum = 0;
10,322✔
1077
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
10,322✔
1078
    if (code != 0) {
10,322!
1079
      mError("tmq rebalance mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
1080
      taosRUnLockLatch(&pTopic->lock);
×
1081
      mndReleaseTopic(pMnode, pTopic);
×
1082
      return code;
×
1083
    }
1084

1085
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
10,322!
1086
    taosRUnLockLatch(&pTopic->lock);
10,322✔
1087
    mndReleaseTopic(pMnode, pTopic);
10,322✔
1088

1089
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
10,322!
1090
  } else {
1091
    taosRLockLatch(&pSub->lock);
14,606✔
1092
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
14,606✔
1093
    if(code != 0){
14,606!
1094
      taosRUnLockLatch(&pSub->lock);
×
1095
      goto END;
×
1096
    }
1097
    // code = checkConsumer(pMnode, rebOutput->pSub);
1098
    // if(code != 0){
1099
    //   taosRUnLockLatch(&pSub->lock);
1100
    //   goto END;
1101
    // }
1102
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
14,606✔
1103
    taosRUnLockLatch(&pSub->lock);
14,606✔
1104

1105
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key, taosHashGetSize(rebOutput->pSub->consumerHash));
14,606!
1106
  }
1107

1108
END:
24,928✔
1109
  mndReleaseSubscribe(pMnode, pSub);
24,928✔
1110
  return code;
24,928✔
1111
}
1112

1113
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
3,024,302✔
1114
  if (pMsg == NULL) {
3,024,302!
1115
    return TSDB_CODE_INVALID_PARA;
×
1116
  }
1117
  int     code = 0;
3,024,302✔
1118
  void   *pIter = NULL;
3,024,302✔
1119
  SMnode *pMnode = pMsg->info.node;
3,024,302✔
1120
  PRINT_LOG_START;
3,024,302✔
1121
  if (!mndRebTryStart()) {
3,024,302✔
1122
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
6,367!
1123
    return code;
6,367✔
1124
  }
1125

1126
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
3,017,935✔
1127
  MND_TMQ_NULL_CHECK(rebSubHash);
3,017,935!
1128

1129
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
3,017,935✔
1130

1131
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
3,017,935!
1132
  if (taosHashGetSize(rebSubHash) > 0) {
3,017,935✔
1133
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
23,262!
1134
  }
1135

1136
  while (1) {
24,928✔
1137
    pIter = taosHashIterate(rebSubHash, pIter);
3,042,863✔
1138
    if (pIter == NULL) {
3,042,863✔
1139
      break;
3,017,935✔
1140
    }
1141

1142
    SMqRebInputObj  rebInput = {0};
24,928✔
1143
    SMqRebOutputObj rebOutput = {0};
24,928✔
1144
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
24,928!
1145
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
24,928✔
1146
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
24,928✔
1147
    if (code != 0) {
24,928!
1148
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1149
    }
1150

1151
    if (code == 0){
24,928!
1152
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
24,928✔
1153
      if (code != 0) {
24,928!
1154
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1155
      }
1156
    }
1157

1158
    if (code == 0){
24,928!
1159
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
24,928✔
1160
      if (code != 0) {
24,928!
UNCOV
1161
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
×
1162
      }
1163
    }
1164

1165
    clearRebOutput(&rebOutput);
24,928✔
1166
  }
1167

1168
  if (taosHashGetSize(rebSubHash) > 0) {
3,017,935✔
1169
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
23,262!
1170
  }
1171

1172
END:
2,994,673✔
1173
  taosHashCancelIterate(rebSubHash, pIter);
3,017,935✔
1174
  taosHashCleanup(rebSubHash);
3,017,935✔
1175
  mndRebCntDec();
3,017,935✔
1176

1177
  PRINT_LOG_END(code);
3,017,935!
1178
  TAOS_RETURN(code);
3,017,935✔
1179
}
1180

1181
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
8,594✔
1182
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
8,594!
1183
    return TSDB_CODE_INVALID_PARA;
×
1184
  }
1185
  void   *pIter = NULL;
8,594✔
1186
  SVgObj *pVgObj = NULL;
8,594✔
1187
  int32_t code = 0;
8,594✔
1188
  while (1) {
44,376✔
1189
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
52,970✔
1190
    if (pIter == NULL) {
52,970✔
1191
      break;
8,594✔
1192
    }
1193
    if (pVgObj->mountVgId) {
44,376!
1194
      sdbRelease(pMnode->pSdb, pVgObj);
×
1195
      continue;
×
1196
    }
1197

1198
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
44,376✔
1199
      sdbRelease(pMnode->pSdb, pVgObj);
17,544✔
1200
      continue;
17,544✔
1201
    }
1202
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
26,832!
1203
    MND_TMQ_NULL_CHECK(pReq);
26,832!
1204
    pReq->head.vgId = htonl(pVgObj->vgId);
26,832✔
1205
    pReq->vgId = pVgObj->vgId;
26,832✔
1206
    pReq->consumerId = -1;
26,832✔
1207
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
26,832!
1208

1209
    STransAction action = {0};
26,832✔
1210
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
26,832✔
1211
    action.pCont = pReq;
26,832✔
1212
    action.contLen = sizeof(SMqVDeleteReq);
26,832✔
1213
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
26,832✔
1214
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
26,832✔
1215

1216
    sdbRelease(pMnode->pSdb, pVgObj);
26,832✔
1217
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
26,832!
1218
  }
1219

1220
END:
8,594✔
1221
  sdbRelease(pMnode->pSdb, pVgObj);
8,594✔
1222
  sdbCancelFetch(pMnode->pSdb, pIter);
8,594✔
1223
  return code;
8,594✔
1224
}
1225

UNCOV
1226
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
×
UNCOV
1227
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
×
1228
    return TSDB_CODE_INVALID_PARA;
×
1229
  }
UNCOV
1230
  void           *pIter = NULL;
×
UNCOV
1231
  SMqConsumerObj *pConsumer = NULL;
×
UNCOV
1232
  SMqConsumerObj *pConsumerNew = NULL;
×
UNCOV
1233
  int             code = 0;
×
UNCOV
1234
  while (1) {
×
UNCOV
1235
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
×
UNCOV
1236
    if (pIter == NULL) {
×
UNCOV
1237
      break;
×
1238
    }
1239

UNCOV
1240
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
×
1241
      sdbRelease(pMnode->pSdb, pConsumer);
×
1242
      continue;
×
1243
    }
1244

UNCOV
1245
    bool found1 = checkTopic(pConsumer->assignedTopics, topic);
×
UNCOV
1246
    bool found2 = checkTopic(pConsumer->rebRemovedTopics, topic);
×
UNCOV
1247
    bool found3 = checkTopic(pConsumer->rebNewTopics, topic);
×
UNCOV
1248
    if (found1 || found2 || found3) {
×
UNCOV
1249
      if (deleteConsumer) {
×
UNCOV
1250
        MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
×
UNCOV
1251
        MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
×
UNCOV
1252
        tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
1253
        pConsumerNew = NULL;
×
1254
      } else {
1255
        mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1256
              topic, pConsumer->consumerId, pConsumer->cgroup);
1257
        code = TSDB_CODE_MND_CGROUP_USED;
×
1258
        goto END;
×
1259
      }
1260
    }
1261

UNCOV
1262
    sdbRelease(pMnode->pSdb, pConsumer);
×
1263
  }
1264

UNCOV
1265
END:
×
UNCOV
1266
  tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
1267
  sdbRelease(pMnode->pSdb, pConsumer);
×
UNCOV
1268
  sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1269
  return code;
×
1270
}
1271

UNCOV
1272
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
×
UNCOV
1273
  if (pMsg == NULL) {
×
1274
    return TSDB_CODE_INVALID_PARA;
×
1275
  }
UNCOV
1276
  SMnode         *pMnode = pMsg->info.node;
×
UNCOV
1277
  SMDropCgroupReq dropReq = {0};
×
UNCOV
1278
  STrans         *pTrans = NULL;
×
UNCOV
1279
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1280
  SMqSubscribeObj *pSub = NULL;
×
1281

UNCOV
1282
  PRINT_LOG_START
×
UNCOV
1283
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
×
UNCOV
1284
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
1285
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
×
UNCOV
1286
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
UNCOV
1287
  if (code != 0) {
×
1288
    if (dropReq.igNotExists) {
×
1289
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1290
      return 0;
×
1291
    } else {
1292
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1293
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1294
      return code;
×
1295
    }
1296
  }
1297

UNCOV
1298
  taosWLockLatch(&pSub->lock);
×
UNCOV
1299
  if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
×
1300
    code = TSDB_CODE_MND_CGROUP_USED;
×
1301
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1302
    goto END;
×
1303
  }
1304

UNCOV
1305
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
×
UNCOV
1306
  MND_TMQ_NULL_CHECK(pTrans);
×
UNCOV
1307
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
×
UNCOV
1308
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
×
UNCOV
1309
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
UNCOV
1310
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
UNCOV
1311
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
×
UNCOV
1312
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
UNCOV
1313
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
1314

UNCOV
1315
END:
×
UNCOV
1316
  taosWUnLockLatch(&pSub->lock);
×
UNCOV
1317
  mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
1318
  mndTransDrop(pTrans);
×
UNCOV
1319
  PRINT_LOG_END(code);
×
1320

UNCOV
1321
  if (code != 0) {
×
1322
    TAOS_RETURN(code);
×
1323
  }
UNCOV
1324
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
×
1325
}
1326

1327
void mndCleanupSubscribe(SMnode *pMnode) {}
121,976✔
1328

1329
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
35,972✔
1330
  if (pSub == NULL) {
35,972!
1331
    return NULL;
×
1332
  }
1333
  int32_t code = 0;
35,972✔
1334
  int32_t lino = 0;
35,972✔
1335
  terrno = TSDB_CODE_OUT_OF_MEMORY;
35,972✔
1336
  void   *buf = NULL;
35,972✔
1337
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
35,972✔
1338
  if (tlen <= 0) goto SUB_ENCODE_OVER;
35,972!
1339
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
35,972✔
1340

1341
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
35,972✔
1342
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
35,972!
1343

1344
  buf = taosMemoryMalloc(tlen);
35,972!
1345
  if (buf == NULL) goto SUB_ENCODE_OVER;
35,972!
1346

1347
  void *abuf = buf;
35,972✔
1348
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
35,972!
1349
    goto SUB_ENCODE_OVER;
×
1350
  }
1351

1352
  int32_t dataPos = 0;
35,972✔
1353
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
35,972!
1354
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
35,972!
1355
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
35,972!
1356
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
35,972!
1357

1358
  terrno = TSDB_CODE_SUCCESS;
35,972✔
1359

1360
SUB_ENCODE_OVER:
35,972✔
1361
  taosMemoryFreeClear(buf);
35,972!
1362
  if (terrno != TSDB_CODE_SUCCESS) {
35,972!
1363
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1364
    sdbFreeRaw(pRaw);
×
1365
    return NULL;
×
1366
  }
1367

1368
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
35,972!
1369
  return pRaw;
35,972✔
1370
}
1371

1372
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
34,244✔
1373
  if (pRaw == NULL) {
34,244!
1374
    return NULL;
×
1375
  }
1376
  int32_t code = 0;
34,244✔
1377
  int32_t lino = 0;
34,244✔
1378
  terrno = TSDB_CODE_OUT_OF_MEMORY;
34,244✔
1379
  SSdbRow         *pRow = NULL;
34,244✔
1380
  SMqSubscribeObj *pSub = NULL;
34,244✔
1381
  void            *buf = NULL;
34,244✔
1382

1383
  int8_t sver = 0;
34,244✔
1384
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
34,244!
1385

1386
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
34,244!
1387
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1388
    goto SUB_DECODE_OVER;
×
1389
  }
1390

1391
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
34,244✔
1392
  if (pRow == NULL) goto SUB_DECODE_OVER;
34,244!
1393

1394
  pSub = sdbGetRowObj(pRow);
34,244✔
1395
  if (pSub == NULL) goto SUB_DECODE_OVER;
34,244!
1396

1397
  int32_t dataPos = 0;
34,244✔
1398
  int32_t tlen;
34,244✔
1399
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
34,244!
1400
  buf = taosMemoryMalloc(tlen);
34,244!
1401
  if (buf == NULL) goto SUB_DECODE_OVER;
34,244!
1402
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
34,244!
1403
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
34,244!
1404

1405
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
34,244!
1406
    goto SUB_DECODE_OVER;
×
1407
  }
1408

1409
  // update epset saved in mnode
1410
  if (pSub->unassignedVgs != NULL) {
34,244!
1411
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
34,244✔
1412
    for (int32_t i = 0; i < size; ++i) {
95,245✔
1413
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
61,001✔
1414
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
61,001✔
1415
    }
1416
  }
1417
  if (pSub->consumerHash != NULL) {
34,244!
1418
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
34,244✔
1419
    while (pIter) {
49,624✔
1420
      SMqConsumerEp *pConsumerEp = pIter;
15,380✔
1421
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
15,380✔
1422
      for (int32_t i = 0; i < size; ++i) {
57,476✔
1423
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
42,096✔
1424
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
42,096✔
1425
      }
1426
      pIter = taosHashIterate(pSub->consumerHash, pIter);
15,380✔
1427
    }
1428
  }
1429

1430
  terrno = TSDB_CODE_SUCCESS;
34,244✔
1431

1432
SUB_DECODE_OVER:
34,244✔
1433
  taosMemoryFreeClear(buf);
34,244!
1434
  if (terrno != TSDB_CODE_SUCCESS) {
34,244!
1435
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1436
    taosMemoryFreeClear(pRow);
×
1437
    return NULL;
×
1438
  }
1439

1440
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
34,244!
1441
  return pRow;
34,244✔
1442
}
1443

1444
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
11,044✔
1445
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
11,044!
1446
  return 0;
11,044✔
1447
}
1448

1449
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
34,244✔
1450
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
34,244!
1451
  tDeleteSubscribeObj(pSub);
34,244✔
1452
  return 0;
34,244✔
1453
}
1454

1455
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
14,606✔
1456
  if (pOldSub == NULL || pNewSub == NULL) return -1;
14,606!
1457
  mTrace("subscribe:%s, perform update action", pOldSub->key);
14,606!
1458
  taosWLockLatch(&pOldSub->lock);
14,606✔
1459

1460
  SHashObj *tmp = pOldSub->consumerHash;
14,606✔
1461
  pOldSub->consumerHash = pNewSub->consumerHash;
14,606✔
1462
  pNewSub->consumerHash = tmp;
14,606✔
1463

1464
  SArray *tmp1 = pOldSub->unassignedVgs;
14,606✔
1465
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
14,606✔
1466
  pNewSub->unassignedVgs = tmp1;
14,606✔
1467

1468
  SArray *tmp2 = pOldSub->offsetRows;
14,606✔
1469
  pOldSub->offsetRows = pNewSub->offsetRows;
14,606✔
1470
  pNewSub->offsetRows = tmp2;
14,606✔
1471

1472
  taosWUnLockLatch(&pOldSub->lock);
14,606✔
1473
  return 0;
14,606✔
1474
}
1475

1476
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
236,646✔
1477
  if (pMnode == NULL || key == NULL || pSub == NULL){
236,646!
1478
    return TSDB_CODE_INVALID_PARA;
×
1479
  }
1480
  SSdb            *pSdb = pMnode->pSdb;
236,646✔
1481
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
236,646✔
1482
  if (*pSub == NULL) {
236,646✔
1483
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
20,644✔
1484
  }
1485
  return 0;
216,002✔
1486
}
1487

1488
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
12,907✔
1489
  if (pMnode == NULL || topicName == NULL) return 0;
12,907!
1490
  int32_t num = 0;
12,907✔
1491
  SSdb   *pSdb = pMnode->pSdb;
12,907✔
1492

1493
  void            *pIter = NULL;
12,907✔
1494
  SMqSubscribeObj *pSub = NULL;
12,907✔
1495
  while (1) {
3,304✔
1496
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
16,211✔
1497
    if (pIter == NULL) break;
16,211✔
1498

1499
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
3,304✔
1500
    char cgroup[TSDB_CGROUP_LEN] = {0};
3,304✔
1501
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
3,304✔
1502
    if (strcmp(topic, topicName) != 0) {
3,304!
1503
      sdbRelease(pSdb, pSub);
1,082✔
1504
      continue;
1,082✔
1505
    }
1506

1507
    num++;
2,222✔
1508
    sdbRelease(pSdb, pSub);
2,222✔
1509
  }
1510

1511
  return num;
12,907✔
1512
}
1513

1514
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
226,324✔
1515
  if (pMnode == NULL || pSub == NULL) return;
226,324!
1516
  SSdb *pSdb = pMnode->pSdb;
216,002✔
1517
  sdbRelease(pSdb, pSub);
216,002✔
1518
}
1519

1520
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
8,594✔
1521
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
8,594!
1522
  int32_t  code = 0;
8,594✔
1523
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
8,594✔
1524
  MND_TMQ_NULL_CHECK(pCommitRaw);
8,594!
1525
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
8,594✔
1526
  if (code != 0){
8,594!
1527
    sdbFreeRaw(pCommitRaw);
×
1528
    goto END;
×
1529
  }
1530
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
8,594✔
1531
END:
8,594✔
1532
  return code;
8,594✔
1533
}
1534

1535
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
10,442✔
1536
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
10,442!
1537
  SSdb            *pSdb = pMnode->pSdb;
10,442✔
1538
  int32_t          code = 0;
10,442✔
1539
  void            *pIter = NULL;
10,442✔
1540
  SMqSubscribeObj *pSub = NULL;
10,442✔
1541
  while (1) {
9,550✔
1542
    sdbRelease(pSdb, pSub);
19,992✔
1543
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
19,992✔
1544
    if (pIter == NULL) break;
19,992✔
1545

1546
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
9,550✔
1547
    char cgroup[TSDB_CGROUP_LEN] = {0};
9,550✔
1548
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
9,550✔
1549
    if (strcmp(topic, topicName) != 0) {
9,550!
1550
      continue;
956✔
1551
    }
1552

1553
    // iter all vnode to delete handle
1554
    if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
8,594!
1555
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1556
      goto END;
×
1557
    }
1558

1559
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
8,594!
1560
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
8,594!
1561
  }
1562

1563
END:
10,442✔
1564
  sdbRelease(pSdb, pSub);
10,442✔
1565
  sdbCancelFetch(pSdb, pIter);
10,442✔
1566

1567
  TAOS_RETURN(code);
10,442✔
1568
}
1569

1570
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
3,512✔
1571
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1572
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
3,512!
1573
    return TSDB_CODE_INVALID_PARA;
×
1574
  }
1575
  int32_t code = 0;
3,512✔
1576
  int32_t sz = taosArrayGetSize(vgs);
3,512✔
1577
  for (int32_t j = 0; j < sz; j++) {
14,672✔
1578
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
11,160✔
1579
    MND_TMQ_NULL_CHECK(pVgEp);
11,160!
1580

1581
    SColumnInfoData *pColInfo = NULL;
11,160✔
1582
    int32_t          cols = 0;
11,160✔
1583

1584
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1585
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1586
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
11,160!
1587

1588
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1589
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1590
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
11,160!
1591

1592
    // vg id
1593
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1594
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1595
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
11,160!
1596

1597
    // consumer id
1598
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
11,160✔
1599
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
11,160✔
1600
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
11,160!
1601

1602
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1603
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1604
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
11,160!
1605

1606
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
11,160✔
1607
    if (user) STR_TO_VARSTR(userStr, user);
11,160!
1608
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1609
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1610
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
11,160!
1611

1612
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
11,160✔
1613
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
11,160!
1614
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1615
    MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1616
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
11,160!
1617

1618
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
11,160!
1619
          varDataVal(cgroup), pVgEp->vgId);
1620

1621
    // offset
1622
    OffsetRows *data = NULL;
11,160✔
1623
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
50,024✔
1624
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
38,864✔
1625
      MND_TMQ_NULL_CHECK(tmp);
38,864!
1626
      if (tmp->vgId != pVgEp->vgId) {
38,864✔
1627
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1628
        continue;
27,704✔
1629
      }
1630
      data = tmp;
11,160✔
1631
    }
1632
    if (data) {
11,160!
1633
      // vg id
1634
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
11,160✔
1635
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
11,160✔
1636
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
22,320!
1637
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
11,160!
1638
      varDataSetLen(buf, strlen(varDataVal(buf)));
11,160!
1639
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1640
      MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1641
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
11,160!
1642
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
11,160✔
1643
      MND_TMQ_NULL_CHECK(pColInfo);
11,160!
1644
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
11,160!
1645
    } else {
UNCOV
1646
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1647
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1648
      colDataSetNULL(pColInfo, *numOfRows);
×
UNCOV
1649
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1650
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1651
      colDataSetNULL(pColInfo, *numOfRows);
×
UNCOV
1652
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
×
1653
    }
1654
    (*numOfRows)++;
11,160✔
1655
  }
1656
  return 0;
3,512✔
1657
END:
×
1658
  return code;
×
1659
}
1660

1661
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
3,512✔
1662
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
3,512!
1663
    return TSDB_CODE_INVALID_PARA;
×
1664
  }
1665
  SMnode          *pMnode = pReq->info.node;
3,512✔
1666
  SSdb            *pSdb = pMnode->pSdb;
3,512✔
1667
  int32_t          numOfRows = 0;
3,512✔
1668
  SMqSubscribeObj *pSub = NULL;
3,512✔
1669
  int32_t          code = 0;
3,512✔
1670

1671
  mInfo("mnd show subscriptions begin");
3,512!
1672

1673
  while (numOfRows < rowsCapacity) {
7,024!
1674
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
7,024✔
1675
    if (pShow->pIter == NULL) {
7,024✔
1676
      break;
3,512✔
1677
    }
1678

1679
    taosRLockLatch(&pSub->lock);
3,512✔
1680

1681
    if (numOfRows + pSub->vgNum > rowsCapacity) {
3,512!
1682
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1683
    }
1684

1685
    // topic and cgroup
1686
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
3,512✔
1687
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
3,512✔
1688
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
3,512✔
1689
    varDataSetLen(topic, strlen(varDataVal(topic)));
3,512!
1690
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
3,512!
1691

1692
    SMqConsumerEp *pConsumerEp = NULL;
3,512✔
1693
    void          *pIter = NULL;
3,512✔
1694

UNCOV
1695
    while (1) {
×
1696
      pIter = taosHashIterate(pSub->consumerHash, pIter);
3,512✔
1697
      if (pIter == NULL) break;
3,512!
UNCOV
1698
      pConsumerEp = (SMqConsumerEp *)pIter;
×
1699

UNCOV
1700
      char          *user = NULL;
×
UNCOV
1701
      char          *fqdn = NULL;
×
UNCOV
1702
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
×
UNCOV
1703
      if (pConsumer != NULL) {
×
UNCOV
1704
        user = pConsumer->user;
×
UNCOV
1705
        fqdn = pConsumer->fqdn;
×
UNCOV
1706
        sdbRelease(pSdb, pConsumer);
×
1707
      }
UNCOV
1708
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
×
1709
                  pConsumerEp->offsetRows));
1710
    }
1711

1712
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
3,512!
1713

1714
    pBlock->info.rows = numOfRows;
3,512✔
1715

1716
    taosRUnLockLatch(&pSub->lock);
3,512✔
1717
    sdbRelease(pSdb, pSub);
3,512✔
1718
  }
1719

1720
  mInfo("mnd end show subscriptions");
3,512!
1721

1722
  pShow->numOfRows += numOfRows;
3,512✔
1723
  return numOfRows;
3,512✔
1724

1725
END:
×
1726
  taosRUnLockLatch(&pSub->lock);
×
1727
  sdbRelease(pSdb, pSub);
×
1728

1729
  return code;
×
1730
}
1731

1732
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1733
  if (pMnode == NULL) {
×
1734
    return;
×
1735
  }
1736
  SSdb *pSdb = pMnode->pSdb;
×
1737
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1738
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc