• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.89
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
254,009✔
45
  if (pTrans == NULL || pSub == NULL) {
254,009✔
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t code = 0;
254,009✔
49
  int32_t lino = 0;
254,009✔
50
  PRINT_LOG_START
254,009✔
51
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
254,009✔
52
  MND_TMQ_NULL_CHECK(pCommitRaw);
254,009✔
53
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
254,009✔
54
  if (code != 0) {
254,009✔
55
    sdbFreeRaw(pCommitRaw);
×
56
    goto END;
×
57
  }
58
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
254,009✔
59

60
END:
254,009✔
61
  PRINT_LOG_END
254,009✔
62
  return code;
254,009✔
63
}
64

65
int32_t mndInitSubscribe(SMnode *pMnode) {
396,676✔
66
  SSdbTable table = {
396,676✔
67
      .sdbType = SDB_SUBSCRIBE,
68
      .keyType = SDB_KEY_BINARY,
69
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
70
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
71
      .insertFp = (SdbInsertFp)mndSubActionInsert,
72
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
73
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
74
  };
75

76
  if (pMnode == NULL) {
396,676✔
77
    return TSDB_CODE_INVALID_PARA;
×
78
  }
79
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
396,676✔
80
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
396,676✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
396,676✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
396,676✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
396,676✔
84

85
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
396,676✔
86
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
396,676✔
87

88
  return sdbSetTable(pMnode->pSdb, table);
396,676✔
89
}
90

91
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
116,247✔
92
  int32_t code = 0;
116,247✔
93
  SSdb *  pSdb = pMnode->pSdb;
116,247✔
94
  SVgObj *pVgroup = NULL;
116,247✔
95

96
  void *pIter = NULL;
116,247✔
97
  while (1) {
671,959✔
98
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
788,206✔
99
    if (pIter == NULL) {
788,206✔
100
      break;
116,247✔
101
    }
102

103
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
671,959✔
104
      sdbRelease(pSdb, pVgroup);
376,382✔
105
      continue;
376,382✔
106
    }
107

108
    pSub->vgNum++;
295,577✔
109

110
    SMqVgEp pVgEp = {0};
295,577✔
111
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
295,577✔
112
    pVgEp.vgId = pVgroup->vgId;
295,577✔
113
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
591,154✔
114
      code = terrno;
×
115
      sdbRelease(pSdb, pVgroup);
×
116
      sdbCancelFetch(pSdb, pIter);
×
117
      goto END;
×
118
    }
119
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
295,577✔
120
    sdbRelease(pSdb, pVgroup);
295,577✔
121
  }
122

123
END:
116,247✔
124
  return code;
116,247✔
125
}
126

127
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
116,247✔
128
                                     SMqSubscribeObj **pSub) {
129
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
116,247✔
130
    return TSDB_CODE_INVALID_PARA;
×
131
  }
132
  int32_t lino = 0;
116,247✔
133
  int32_t code = 0;
116,247✔
134
  PRINT_LOG_START
116,247✔
135
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
116,247✔
136
  (*pSub)->dbUid = pTopic->dbUid;
116,247✔
137
  (*pSub)->stbUid = pTopic->stbUid;
116,247✔
138
  (*pSub)->subType = pTopic->subType;
116,247✔
139
  (*pSub)->withMeta = pTopic->withMeta;
116,247✔
140

141
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
116,247✔
142

143
END:
116,247✔
144
  PRINT_LOG_END
116,247✔
145
  return code;
116,247✔
146
}
147

148
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,666,364✔
149
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,666,364✔
150
    return;
×
151
  }
152
  int32_t i = 0;
1,666,364✔
153
  while (key[i] != TMQ_SEPARATOR_CHAR) {
11,887,958✔
154
    i++;
10,221,594✔
155
  }
156
  (void)memcpy(cgroup, key, i);
1,666,364✔
157
  cgroup[i] = 0;
1,666,364✔
158
  if (fullName) {
1,666,364✔
159
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,545,254✔
160
  } else {
161
    while (key[i] != '.') {
363,330✔
162
      i++;
242,220✔
163
    }
164
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
121,110✔
165
  }
166
}
167

168
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
768,152✔
169
                                    const SMqRebOutputVg *pRebVg) {
170
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
768,152✔
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  SMqRebVgReq  req = {0};
768,152✔
174
  int32_t      code = 0;
768,152✔
175
  int32_t      lino = 0;
768,152✔
176
  SEncoder     encoder = {0};
768,152✔
177
  SMqTopicObj *pTopic = NULL;
768,152✔
178
  void *       buf = NULL;
768,152✔
179

180
  PRINT_LOG_START
768,152✔
181
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
768,152✔
182
  char cgroup[TSDB_CGROUP_LEN] = {0};
768,152✔
183
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
768,152✔
184
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
768,152✔
185
  taosRLockLatch(&pTopic->lock);
768,152✔
186
  req.oldConsumerId = pRebVg->oldConsumerId;
768,152✔
187
  req.newConsumerId = pRebVg->newConsumerId;
768,152✔
188
  req.vgId = pRebVg->pVgEp.vgId;
768,152✔
189
  req.qmsg = pTopic->physicalPlan;
768,152✔
190
  req.schema = pTopic->schema;
768,152✔
191
  req.subType = pSub->subType;
768,152✔
192
  req.withMeta = pSub->withMeta;
768,152✔
193
  req.suid = pSub->stbUid;
768,152✔
194
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
768,152✔
195

196
  int32_t tlen = 0;
768,152✔
197
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
768,152✔
198
  if (code < 0) {
768,152✔
199
    goto END;
×
200
  }
201

202
  tlen += sizeof(SMsgHead);
768,152✔
203
  buf = taosMemoryMalloc(tlen);
768,152✔
204
  MND_TMQ_NULL_CHECK(buf);
768,152✔
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
768,152✔
206
  pMsgHead->contLen = htonl(tlen);
768,152✔
207
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
768,152✔
208

209
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
768,152✔
210
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
768,152✔
211
  *pBuf = buf;
768,152✔
212
  buf = NULL;
768,152✔
213
  *pLen = tlen;
768,152✔
214

215
END:
768,152✔
216
  PRINT_LOG_END
768,152✔
217
  taosMemoryFree(buf);
768,152✔
218
  if (pTopic != NULL) {
768,152✔
219
    taosRUnLockLatch(&pTopic->lock);
768,152✔
220
  }
221
  mndReleaseTopic(pMnode, pTopic);
768,152✔
222
  tEncoderClear(&encoder);
768,152✔
223
  return code;
768,152✔
224
}
225

226
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
768,152✔
227
                                        const SMqRebOutputVg *pRebVg) {
228
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
768,152✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t code = 0;
768,152✔
232
  int32_t lino = 0;
768,152✔
233
  void *  buf = NULL;
768,152✔
234
  PRINT_LOG_START
768,152✔
235
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
768,152✔
236
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
237
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
238
    goto END;
×
239
  }
240

241
  int32_t tlen = 0;
768,152✔
242
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
768,152✔
243
  int32_t vgId = pRebVg->pVgEp.vgId;
768,152✔
244
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
768,152✔
245
  if (pVgObj == NULL) {
768,152✔
246
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
247
    goto END;
×
248
  }
249

250
  STransAction action = {0};
768,152✔
251
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
768,152✔
252
  action.pCont = buf;
768,152✔
253
  buf = NULL;
768,152✔
254
  action.contLen = tlen;
768,152✔
255
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
768,152✔
256

257
  mndReleaseVgroup(pMnode, pVgObj);
768,152✔
258
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
768,152✔
259

260
END:
768,152✔
261
  PRINT_LOG_END
768,152✔
262
  taosMemoryFree(buf);
768,152✔
263
  return code;
768,152✔
264
}
265

266
static void freeRebalanceItem(void *param) {
313,467✔
267
  if (param == NULL) return;
313,467✔
268
  SMqRebInfo *pInfo = param;
313,467✔
269
  taosArrayDestroy(pInfo->newConsumers);
313,467✔
270
  taosArrayDestroy(pInfo->removedConsumers);
313,467✔
271
}
272

273
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
319,020✔
274
  if (pHash == NULL || key == NULL) {
319,020✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t code = 0;
319,020✔
278
  int32_t lino = 0;
319,020✔
279
  PRINT_LOG_START
319,020✔
280
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
319,020✔
281
  if (pRebInfo == NULL) {
319,020✔
282
    pRebInfo = tNewSMqRebSubscribe(key);
313,467✔
283
    if (pRebInfo == NULL) {
313,467✔
284
      code = terrno;
×
285
      goto END;
×
286
    }
287
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
313,467✔
288
    if (code != 0) {
313,467✔
289
      freeRebalanceItem(pRebInfo);
×
290
      taosMemoryFreeClear(pRebInfo);
×
291
      goto END;
×
292
    }
293
    taosMemoryFreeClear(pRebInfo);
313,467✔
294

295
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
313,467✔
296
    MND_TMQ_NULL_CHECK(pRebInfo);
313,467✔
297
  }
298
  if (pReb) {
319,020✔
299
    *pReb = pRebInfo;
262,206✔
300
  }
301

302
END:
56,814✔
303
  PRINT_LOG_END
319,020✔
304
  return code;
319,020✔
305
}
306

307
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
881,461✔
308
  if (vgs == NULL || pHash == NULL || key == NULL) {
881,461✔
309
    return TSDB_CODE_INVALID_PARA;
×
310
  }
311
  int32_t code = 0;
881,461✔
312
  int32_t lino = 0;
881,461✔
313
  PRINT_LOG_START
881,461✔
314
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
881,461✔
315
  MND_TMQ_NULL_CHECK(pVgEp);
881,461✔
316
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
881,461✔
317
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
881,461✔
318
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
881,461✔
319

320
END:
881,149✔
321
  PRINT_LOG_END
881,461✔
322
  return code;
881,461✔
323
}
324

325
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
313,467✔
326
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
313,467✔
327
    return TSDB_CODE_INVALID_PARA;
×
328
  }
329
  int32_t code = 0;
313,467✔
330
  int32_t lino = 0;
313,467✔
331
  PRINT_LOG_START
313,467✔
332
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
313,467✔
333
  int32_t actualRemoved = 0;
313,467✔
334
  for (int32_t i = 0; i < numOfRemoved; i++) {
430,296✔
335
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
116,829✔
336
    MND_TMQ_NULL_CHECK(consumerId);
116,829✔
337
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
116,829✔
338
    if (pConsumerEp == NULL) {
116,829✔
339
      continue;
×
340
    }
341

342
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
116,829✔
343
    for (int32_t j = 0; j < consumerVgNum; j++) {
487,062✔
344
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
370,233✔
345
    }
346

347
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
116,829✔
348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
233,658✔
349
    actualRemoved++;
116,829✔
350
  }
351

352
  if (numOfRemoved != actualRemoved) {
313,467✔
353
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
354
           numOfRemoved, actualRemoved);
355
  } else {
356
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
313,467✔
357
  }
358
END:
×
359
  PRINT_LOG_END
313,467✔
360
  return code;
313,467✔
361
}
362

363
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
313,467✔
364
  if (pOutput == NULL || pInput == NULL) {
313,467✔
365
    return TSDB_CODE_INVALID_PARA;
×
366
  }
367
  int32_t code = 0;
313,467✔
368
  int32_t lino = 0;
313,467✔
369
  PRINT_LOG_START
313,467✔
370
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
313,467✔
371

372
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
458,844✔
373
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
145,377✔
374
    MND_TMQ_NULL_CHECK(consumerId);
145,377✔
375
    SMqConsumerEp newConsumerEp = {0};
145,377✔
376
    newConsumerEp.consumerId = *consumerId;
145,377✔
377
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
145,377✔
378
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
145,377✔
379
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
145,377✔
380
        0) {
381
      freeSMqConsumerEp(&newConsumerEp);
×
382
      code = terrno;
×
383
      goto END;
×
384
    }
385
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
290,754✔
386
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
145,377✔
387
  }
388
END:
313,467✔
389
  PRINT_LOG_END
313,467✔
390
  return code;
313,467✔
391
}
392

393
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
313,467✔
394
  if (pOutput == NULL || pHash == NULL) {
313,467✔
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
313,467✔
398
  int32_t lino = 0;
313,467✔
399
  PRINT_LOG_START
313,467✔
400
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
313,467✔
401
  for (int32_t i = 0; i < numOfVgroups; i++) {
821,567✔
402
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
508,100✔
403
  }
404
END:
313,467✔
405
  PRINT_LOG_END
313,467✔
406
  return code;
313,467✔
407
}
408

409
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
313,467✔
410
                                        int32_t remainderVgCnt) {
411
  if (pOutput == NULL || pHash == NULL) {
313,467✔
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414
  int32_t code = 0;
313,467✔
415
  int32_t lino = 0;
313,467✔
416
  int32_t cnt = 0;
313,467✔
417
  void *  pIter = NULL;
313,467✔
418
  PRINT_LOG_START
313,467✔
419

420
  while (1) {
62,999✔
421
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
376,466✔
422
    if (pIter == NULL) {
376,466✔
423
      break;
313,467✔
424
    }
425

426
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
62,999✔
427
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
62,999✔
428

429
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
125,998✔
430
    if (consumerVgNum > minVgCnt) {
62,999✔
431
      if (cnt < remainderVgCnt) {
2,169✔
432
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
605✔
433
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
434
        }
435
        cnt++;
605✔
436
      } else {
437
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
4,692✔
438
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
3,128✔
439
        }
440
      }
441
    }
442
  }
443
END:
313,467✔
444
  PRINT_LOG_END
313,467✔
445
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
313,467✔
446
  return code;
313,467✔
447
}
448

449
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
313,467✔
450
  if (pMnode == NULL || pOutput == NULL) {
313,467✔
451
    return TSDB_CODE_INVALID_PARA;
×
452
  }
453
  int32_t code = 0;
313,467✔
454
  int32_t lino = 0;
313,467✔
455
  int32_t totalVgNum = 0;
313,467✔
456
  SVgObj *pVgroup = NULL;
313,467✔
457
  void *  pIter = NULL;
313,467✔
458
  void *  pIterHash = NULL;
313,467✔
459
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
313,467✔
460
  MND_TMQ_NULL_CHECK(newVgs);
313,467✔
461
  while (1) {
1,805,815✔
462
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,119,282✔
463
    if (pIter == NULL) {
2,119,282✔
464
      break;
313,467✔
465
    }
466
    if (pVgroup->mountVgId) {
1,805,815✔
467
      sdbRelease(pMnode->pSdb, pVgroup);
×
468
      continue;
×
469
    }
470

471
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,805,815✔
472
      sdbRelease(pMnode->pSdb, pVgroup);
915,623✔
473
      continue;
915,623✔
474
    }
475

476
    totalVgNum++;
890,192✔
477
    SMqVgEp pVgEp = {0};
890,192✔
478
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
890,192✔
479
    pVgEp.vgId = pVgroup->vgId;
890,192✔
480
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
890,192✔
481
    sdbRelease(pMnode->pSdb, pVgroup);
890,192✔
482
  }
483

484
  while (1) {
179,828✔
485
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
493,295✔
486
    if (pIterHash == NULL) break;
493,295✔
487
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
179,828✔
488
    int32_t        j = 0;
179,828✔
489
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
618,734✔
490
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
438,906✔
491
      MND_TMQ_NULL_CHECK(pVgEpTmp);
438,906✔
492
      bool find = false;
438,906✔
493
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
563,954✔
494
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
507,140✔
495
        MND_TMQ_NULL_CHECK(pnewVgEp);
507,140✔
496
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
507,140✔
497
          taosArrayRemove(newVgs, k);
382,092✔
498
          find = true;
382,092✔
499
          break;
382,092✔
500
        }
501
      }
502
      if (!find) {
438,906✔
503
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
56,814✔
504
        taosArrayRemove(pConsumerEp->vgs, j);
56,814✔
505
        continue;
56,814✔
506
      }
507
      j++;
382,092✔
508
    }
509
  }
510

511
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
313,467✔
512
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
56,814✔
513
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
56,814✔
514
  }
515

516
END:
313,467✔
517
  sdbRelease(pMnode->pSdb, pVgroup);
313,467✔
518
  sdbCancelFetch(pMnode->pSdb, pIter);
313,467✔
519
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
313,467✔
520
  taosArrayDestroy(newVgs);
313,467✔
521
  if (code != 0) {
313,467✔
522
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
523
    return code;
×
524
  } else {
525
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
313,467✔
526
    return totalVgNum;
313,467✔
527
  }
528
}
529

530
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
313,467✔
531
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
313,467✔
532
    return TSDB_CODE_INVALID_PARA;
×
533
  }
534
  void *           pIter = NULL;
313,467✔
535
  SMqSubscribeObj *pSub = NULL;
313,467✔
536
  int32_t          lino = 0;
313,467✔
537
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
313,467✔
538
  if (code != 0) {
313,467✔
539
    return 0;
116,247✔
540
  }
541
  taosRLockLatch(&pSub->lock);
197,220✔
542
  PRINT_LOG_START
197,220✔
543
  if (pOutput->pSub->offsetRows == NULL) {
197,220✔
544
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
145,842✔
545
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
145,842✔
546
  }
547
  while (1) {
179,828✔
548
    pIter = taosHashIterate(pSub->consumerHash, pIter);
377,048✔
549
    if (pIter == NULL) break;
377,048✔
550
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
179,828✔
551
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
179,828✔
552

553
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
608,523✔
554
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
428,695✔
555
      MND_TMQ_NULL_CHECK(d1);
428,695✔
556
      bool jump = false;
428,695✔
557
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
546,472✔
558
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
123,435✔
559
        MND_TMQ_NULL_CHECK(pVgEp);
123,435✔
560
        if (pVgEp->vgId == d1->vgId) {
123,435✔
561
          jump = true;
5,658✔
562
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
5,658✔
563
                pConsumerEp->consumerId, pVgEp->vgId);
564
          break;
5,658✔
565
        }
566
      }
567
      if (jump) continue;
428,695✔
568
      bool find = false;
423,037✔
569
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,085,796✔
570
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
764,701✔
571
        MND_TMQ_NULL_CHECK(d2);
764,701✔
572
        if (d1->vgId == d2->vgId) {
764,701✔
573
          d2->rows += d1->rows;
101,942✔
574
          d2->offset = d1->offset;
101,942✔
575
          d2->ever = d1->ever;
101,942✔
576
          find = true;
101,942✔
577
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
101,942✔
578
          break;
101,942✔
579
        }
580
      }
581
      if (!find) {
423,037✔
582
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
642,190✔
583
      }
584
    }
585
  }
586

587
END:
197,220✔
588
  taosRUnLockLatch(&pSub->lock);
197,220✔
589
  taosHashCancelIterate(pSub->consumerHash, pIter);
197,220✔
590
  mndReleaseSubscribe(pMnode, pSub);
197,220✔
591
  PRINT_LOG_END
197,220✔
592
  return code;
197,220✔
593
}
594

595
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
313,467✔
596
  if (pOutput == NULL) return;
313,467✔
597
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
313,467✔
598
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,194,928✔
599
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
881,461✔
600
    if (pOutputRebVg == NULL) continue;
881,461✔
601
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
881,461✔
602
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
603
  }
604

605
  void *pIter = NULL;
313,467✔
606
  while (1) {
208,376✔
607
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
521,843✔
608
    if (pIter == NULL) break;
521,843✔
609
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
208,376✔
610
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
208,376✔
611
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
208,376✔
612
          pConsumerEp->consumerId, sz);
613
    for (int32_t i = 0; i < sz; i++) {
735,392✔
614
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
527,016✔
615
      if (pVgEp == NULL) continue;
527,016✔
616
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
527,016✔
617
            pConsumerEp->consumerId);
618
    }
619
  }
620
}
621

622
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
313,467✔
623
                           int32_t *remainderVgCnt) {
624
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
313,467✔
625
    return;
×
626
  }
627
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
313,467✔
628
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
313,467✔
629
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
313,467✔
630

631
  // calc num
632
  if (numOfFinal != 0) {
313,467✔
633
    *minVgCnt = totalVgNum / numOfFinal;
202,778✔
634
    *remainderVgCnt = totalVgNum % numOfFinal;
202,778✔
635
  } else {
636
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
110,689✔
637
  }
638
  mInfo(
313,467✔
639
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
640
      "remainderVg:%d",
641
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
642
}
643

644
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
313,467✔
645
  if (pOutput == NULL || pHash == NULL) {
313,467✔
646
    return TSDB_CODE_INVALID_PARA;
×
647
  }
648
  SMqRebOutputVg *pRebVg = NULL;
313,467✔
649
  void *          pAssignIter = NULL;
313,467✔
650
  void *          pIter = NULL;
313,467✔
651
  int32_t         code = 0;
313,467✔
652
  int32_t         lino = 0;
313,467✔
653
  PRINT_LOG_START
313,467✔
654

655
  while (1) {
208,376✔
656
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
521,843✔
657
    if (pIter == NULL) {
521,843✔
658
      break;
313,467✔
659
    }
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
208,376✔
661
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
725,749✔
662
      pAssignIter = taosHashIterate(pHash, pAssignIter);
517,373✔
663
      if (pAssignIter == NULL) {
517,373✔
664
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
665
        break;
×
666
      }
667

668
      pRebVg = (SMqRebOutputVg *)pAssignIter;
517,373✔
669
      pRebVg->newConsumerId = pConsumerEp->consumerId;
517,373✔
670
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,034,746✔
671
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
517,373✔
672
            pConsumerEp->consumerId);
673
    }
674
  }
675

676
  while (1) {
912✔
677
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
314,379✔
678
    if (pIter == NULL) {
314,379✔
679
      break;
110,689✔
680
    }
681
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
203,690✔
682
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
203,690✔
683
      pAssignIter = taosHashIterate(pHash, pAssignIter);
203,690✔
684
      if (pAssignIter == NULL) {
203,690✔
685
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
202,778✔
686
        break;
202,778✔
687
      }
688

689
      pRebVg = (SMqRebOutputVg *)pAssignIter;
912✔
690
      pRebVg->newConsumerId = pConsumerEp->consumerId;
912✔
691
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,824✔
692
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
912✔
693
            pConsumerEp->consumerId);
694
    }
695
  }
696

697
  if (pAssignIter != NULL) {
313,467✔
698
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
699
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
700
    goto END;
×
701
  }
702
  while (1) {
881,461✔
703
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,194,928✔
704
    if (pAssignIter == NULL) {
1,194,928✔
705
      break;
313,467✔
706
    }
707

708
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
881,461✔
709
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,762,922✔
710
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
881,461✔
711
      MND_TMQ_NULL_CHECK(
726,352✔
712
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
713
    }
714
  }
715

716
END:
313,467✔
717
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
313,467✔
718
  taosHashCancelIterate(pHash, pAssignIter);
313,467✔
719
  PRINT_LOG_END
313,467✔
720
  return code;
313,467✔
721
}
722

723
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
313,467✔
724
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
313,467✔
725
    return TSDB_CODE_INVALID_PARA;
×
726
  }
727
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
313,467✔
728
  if (totalVgNum < 0) {
313,467✔
729
    return totalVgNum;
×
730
  }
731
  const char *pSubKey = pOutput->pSub->key;
313,467✔
732
  int32_t     minVgCnt = 0;
313,467✔
733
  int32_t     remainderVgCnt = 0;
313,467✔
734
  int32_t     code = 0;
313,467✔
735
  int32_t     lino = 0;
313,467✔
736
  PRINT_LOG_START
313,467✔
737
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
313,467✔
738
  MND_TMQ_NULL_CHECK(pHash);
313,467✔
739
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
313,467✔
740
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
313,467✔
741
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
313,467✔
742
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
313,467✔
743
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
313,467✔
744
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
313,467✔
745
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
313,467✔
746
  printRebalanceLog(pOutput);
313,467✔
747

748
END:
313,467✔
749
  taosHashCleanup(pHash);
313,467✔
750
  PRINT_LOG_END
313,467✔
751
  return code;
313,467✔
752
}
753

754
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
762,027✔
755
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
762,027✔
756
    return TSDB_CODE_INVALID_PARA;
×
757
  }
758
  int32_t code = 0;
762,027✔
759
  int32_t lino = 0;
762,027✔
760
  PRINT_LOG_START
762,027✔
761
  SMqConsumerObj *pConsumerNew = NULL;
762,027✔
762
  int32_t         consumerNum = taosArrayGetSize(consumers);
762,027✔
763
  for (int32_t i = 0; i < consumerNum; i++) {
1,027,774✔
764
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
265,747✔
765
    MND_TMQ_NULL_CHECK(consumerId);
265,747✔
766
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
265,747✔
767
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
265,747✔
768
    tDeleteSMqConsumerObj(pConsumerNew);
265,747✔
769
  }
770
  pConsumerNew = NULL;
762,027✔
771

772
END:
762,027✔
773
  PRINT_LOG_END
762,027✔
774
  tDeleteSMqConsumerObj(pConsumerNew);
762,027✔
775
  return code;
762,027✔
776
}
777

778
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
254,009✔
779
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
254,009✔
780
    return TSDB_CODE_INVALID_PARA;
×
781
  }
782
  int32_t code = 0;
254,009✔
783
  int32_t lino = 0;
254,009✔
784
  PRINT_LOG_START
254,009✔
785
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
254,009✔
786
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
254,009✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
254,009✔
788
END:
254,009✔
789
  PRINT_LOG_END
254,009✔
790
  return code;
254,009✔
791
}
792

793
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
313,467✔
794
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
313,467✔
795
    return TSDB_CODE_INVALID_PARA;
×
796
  }
797
  int32_t code = 0;
313,467✔
798
  int32_t lino = 0;
313,467✔
799
  STrans *pTrans = NULL;
313,467✔
800
  PRINT_LOG_START
313,467✔
801

802
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
313,467✔
803
  char cgroup[TSDB_CGROUP_LEN] = {0};
313,467✔
804
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
313,467✔
805

806
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
313,467✔
807
  if (pTrans == NULL) {
313,467✔
808
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
809
    if (terrno != 0) code = terrno;
×
810
    goto END;
×
811
  }
812

813
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
313,467✔
814
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
313,467✔
815

816
  // 1. redo action: action to all vg
817
  const SArray *rebVgs = pOutput->rebVgs;
254,009✔
818
  int32_t       vgNum = taosArrayGetSize(rebVgs);
254,009✔
819
  for (int32_t i = 0; i < vgNum; i++) {
1,022,161✔
820
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
768,152✔
821
    MND_TMQ_NULL_CHECK(pRebVg);
768,152✔
822
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
768,152✔
823
  }
824

825
  // 2. commit log: subscribe and vg assignment
826
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
254,009✔
827

828
  // 3. commit log: consumer to update status and epoch
829
  if (!pOutput->isReload) {
254,009✔
830
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
254,009✔
831
  }
832

833
  // 4. set cb
834
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
254,009✔
835

836
  // 5. execution
837
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
254,009✔
838

839
END:
313,467✔
840
  mndTransDrop(pTrans);
313,467✔
841
  PRINT_LOG_END
313,467✔
842
  TAOS_RETURN(code);
313,467✔
843
}
844

845
// type = 0 remove  type = 1 add
846
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
510,965✔
847
  if (rebSubHash == NULL || topicList == NULL) {
510,965✔
848
    return TSDB_CODE_INVALID_PARA;
×
849
  }
850
  int32_t code = 0;
510,965✔
851
  int32_t lino = 0;
510,965✔
852
  PRINT_LOG_START
510,965✔
853
  int32_t topicNum = taosArrayGetSize(topicList);
510,965✔
854
  for (int32_t i = 0; i < topicNum; i++) {
773,171✔
855
    char *removedTopic = taosArrayGetP(topicList, i);
262,206✔
856
    MND_TMQ_NULL_CHECK(removedTopic);
262,206✔
857
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
262,206✔
858
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
262,206✔
859
    SMqRebInfo *pRebSub = NULL;
262,206✔
860
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
262,206✔
861
    if (type == 0)
262,206✔
862
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
233,658✔
863
    else if (type == 1)
145,377✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
290,754✔
865
  }
866

867
END:
510,965✔
868
  PRINT_LOG_END
510,965✔
869
  return code;
510,965✔
870
}
871

872
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
992,795✔
873
  SMqSubscribeObj *pSub = NULL;
992,795✔
874
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
992,795✔
875
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
992,795✔
876
  int32_t code = 0;
992,795✔
877
  int32_t lino = 0;
992,795✔
878

879
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
992,795✔
880
  taosRLockLatch(&pSub->lock);
992,795✔
881
  // iterate all vg assigned to the consumer of that topic
882
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
992,795✔
883
  MND_TMQ_NULL_CHECK(pConsumerEp);
992,795✔
884
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
992,795✔
885
  for (int32_t j = 0; j < vgNum; j++) {
3,259,391✔
886
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
2,266,596✔
887
    if (pVgEp == NULL) {
2,266,596✔
888
      continue;
×
889
    }
890
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,266,596✔
891
    if (!pVgroup) {
2,266,596✔
892
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
56,814✔
893
      if (code != 0) {
56,814✔
894
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
895
      } else {
896
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
56,814✔
897
      }
898
    }
899
    mndReleaseVgroup(pMnode, pVgroup);
2,266,596✔
900
  }
901

902
END:
992,795✔
903
  if (pSub != NULL) {
992,795✔
904
    taosRUnLockLatch(&pSub->lock);
992,795✔
905
  }
906
  mndReleaseSubscribe(pMnode, pSub);
992,795✔
907
}
992,795✔
908

909
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
957,116✔
910
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
957,116✔
911
    return;
×
912
  }
913
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
957,116✔
914
  for (int32_t i = 0; i < newTopicNum; i++) {
1,949,911✔
915
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
992,795✔
916
    if (topic == NULL) {
992,795✔
917
      continue;
×
918
    }
919
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
992,795✔
920
  }
921
}
922

923
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,214,653✔
924
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,426,515✔
925
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,211,862✔
926
}
927

928
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,318,042✔
929
  int32_t code = 0;
1,318,042✔
930
  int32_t lino = 0;
1,318,042✔
931
  PRINT_LOG_START
1,318,042✔
932
  taosRLockLatch(&pConsumer->lock);
1,318,042✔
933

934
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,318,042✔
935
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,318,042✔
936
  int32_t status = atomic_load_32(&pConsumer->status);
1,318,042✔
937

938
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,318,042✔
939
         ", hbstatus:%d, pollStatus:%d",
940
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
941
         hbStatus, pollStatus);
942

943
  if (status == MQ_CONSUMER_STATUS_READY) {
1,318,042✔
944
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,063,984✔
945
      MND_TMQ_RETURN_CHECK(
103,389✔
946
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
947
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
960,595✔
948
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,479✔
949
            ", hb lost cnt:%d, or long time no poll cnt:%d",
950
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
951
            pConsumer->createTime, hbStatus, pollStatus);
952
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,479✔
953
    } else {
954
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
957,116✔
955
    }
956
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
254,058✔
957
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
253,743✔
958
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
253,743✔
959
  } else {
960
    MND_TMQ_RETURN_CHECK(
315✔
961
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
962
  }
963

964
END:
315✔
965
  taosRUnLockLatch(&pConsumer->lock);
1,318,042✔
966
  PRINT_LOG_END
1,318,042✔
967
  return code;
1,318,042✔
968
}
969

970
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
13,544,322✔
971
  if (pMsg == NULL || rebSubHash == NULL) {
13,544,322✔
972
    return TSDB_CODE_INVALID_PARA;
×
973
  }
974
  SMnode *        pMnode = pMsg->info.node;
13,544,322✔
975
  SSdb *          pSdb = pMnode->pSdb;
13,544,322✔
976
  SMqConsumerObj *pConsumer = NULL;
13,544,322✔
977
  void *          pIter = NULL;
13,544,322✔
978
  int32_t         code = 0;
13,544,322✔
979
  int32_t         lino = 0;
13,544,322✔
980
  PRINT_LOG_START
13,544,322✔
981

982
  // iterate all consumers, find all modification
983
  while (1) {
984
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
14,862,030✔
985
    if (pIter == NULL) {
14,862,030✔
986
      break;
13,543,988✔
987
    }
988
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,318,042✔
989
    mndReleaseConsumer(pMnode, pConsumer);
1,317,708✔
990
  }
991
END:
13,544,322✔
992
  PRINT_LOG_END
13,544,322✔
993
  sdbCancelFetch(pSdb, pIter);
13,544,322✔
994
  mndReleaseConsumer(pMnode, pConsumer);
13,544,322✔
995
  return code;
13,544,322✔
996
}
997

998
bool mndRebTryStart() {
13,544,322✔
999
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
13,544,322✔
1000
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
13,544,322✔
1001
}
1002

1003
void mndRebCntInc() {
256,664✔
1004
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
256,664✔
1005
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
256,664✔
1006
}
256,664✔
1007

1008
void mndRebCntDec() {
13,800,986✔
1009
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
13,800,986✔
1010
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
13,800,986✔
1011
}
13,800,986✔
1012

1013
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
626,934✔
1014
  if (rebOutput == NULL) {
626,934✔
1015
    return;
313,467✔
1016
  }
1017
  taosArrayDestroy(rebOutput->newConsumers);
313,467✔
1018
  rebOutput->newConsumers = NULL;
313,467✔
1019
  taosArrayDestroy(rebOutput->modifyConsumers);
313,467✔
1020
  rebOutput->modifyConsumers = NULL;
313,467✔
1021
  taosArrayDestroy(rebOutput->removedConsumers);
313,467✔
1022
  rebOutput->removedConsumers = NULL;
313,467✔
1023
  taosArrayDestroy(rebOutput->rebVgs);
313,467✔
1024
  rebOutput->rebVgs = NULL;
313,467✔
1025
  tDeleteSubscribeObj(rebOutput->pSub);
313,467✔
1026
  taosMemoryFree(rebOutput->pSub);
313,467✔
1027
  rebOutput->pSub = NULL;
313,467✔
1028
}
1029

1030
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
313,467✔
1031
  if (rebOutput == NULL) {
313,467✔
1032
    return TSDB_CODE_INVALID_PARA;
×
1033
  }
1034
  int32_t code = 0;
313,467✔
1035
  int32_t lino = 0;
313,467✔
1036
  PRINT_LOG_START
313,467✔
1037
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
313,467✔
1038
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
313,467✔
1039
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
313,467✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
313,467✔
1041
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
313,467✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
313,467✔
1043
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
313,467✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
313,467✔
1045
  rebOutput = NULL;
313,467✔
1046

1047
END:
313,467✔
1048
  PRINT_LOG_END
313,467✔
1049
  clearRebOutput(rebOutput);
313,467✔
1050
  return code;
313,467✔
1051
}
1052

1053
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
313,467✔
1054
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
313,467✔
1055
    return TSDB_CODE_INVALID_PARA;
×
1056
  }
1057
  const char *     key = rebInput->pRebInfo->key;
313,467✔
1058
  SMqSubscribeObj *pSub = NULL;
313,467✔
1059
  SMqTopicObj *    pTopic = NULL;
313,467✔
1060
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
313,467✔
1061
  int32_t          lino = 0;
313,467✔
1062
  PRINT_LOG_START
313,467✔
1063

1064
  if (code != 0) {
313,467✔
1065
    // split sub key and extract topic
1066
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
116,247✔
1067
    char cgroup[TSDB_CGROUP_LEN] = {0};
116,247✔
1068
    mndSplitSubscribeKey(key, topic, cgroup, true);
116,247✔
1069
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
116,247✔
1070
    taosRLockLatch(&pTopic->lock);
116,247✔
1071
    rebInput->oldConsumerNum = 0;
116,247✔
1072
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
116,247✔
1073
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
116,247✔
1074
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
116,247✔
1075
  } else {
1076
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
197,220✔
1077
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
197,220✔
1078

1079
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
197,220✔
1080
          taosHashGetSize(rebOutput->pSub->consumerHash));
1081
  }
1082

1083
END:
313,155✔
1084
  PRINT_LOG_END
313,467✔
1085
  if (pTopic != NULL) {
313,467✔
1086
    taosRUnLockLatch(&pTopic->lock);
116,247✔
1087
  }
1088
  mndReleaseTopic(pMnode, pTopic);
313,467✔
1089
  mndReleaseSubscribe(pMnode, pSub);
313,467✔
1090
  return code;
313,467✔
1091
}
1092

1093
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1094
  int32_t code = 0;
×
1095
  int32_t lino = 0;
×
1096

1097
  void *pIterConsumer = NULL;
×
1098

1099
  PRINT_LOG_START
×
1100
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1101
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1102

1103
  SMqConsumerEp *pConsumerEp = NULL;
×
1104

1105
  while (1) {
1106
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1107
    if (pIterConsumer == NULL) break;
×
1108
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1109

1110
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1111
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1112
      MND_TMQ_NULL_CHECK(pVgEp);
×
1113
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1114
      MND_TMQ_NULL_CHECK(vg);
×
1115
      vg->pVgEp = *pVgEp;
×
1116
      vg->oldConsumerId = -1;
×
1117
      vg->newConsumerId = pConsumerEp->consumerId;
×
1118
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1119
             vg->newConsumerId);
1120
    }
1121
  }
1122
END:
×
1123
  PRINT_LOG_END
×
1124
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1125
  return code;
×
1126
}
1127

1128
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1129
  int32_t code = 0;
×
1130
  int32_t lino = 0;
×
1131

1132
  SMnode *        pMnode = pMsg->info.node;
×
1133
  SMqRebOutputObj rebOutput = {0};
×
1134

1135
  PRINT_LOG_START
×
1136
  taosRLockLatch(&pSub->lock);
×
1137

1138
  // topic and cgroup
1139
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1140
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1141
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1142
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1143
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1144
    goto END;
×
1145
  }
1146

1147
  rebOutput.pSub = pSub;
×
1148
  rebOutput.isReload = true;
×
1149
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1150
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1151
  if (code != 0) {
×
1152
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1153
  }
1154

1155
END:
×
1156
  taosRUnLockLatch(&pSub->lock);
×
1157
  rebOutput.pSub = NULL;  // avoid double free
×
1158
  clearRebOutput(&rebOutput);
×
1159
  PRINT_LOG_END
×
1160
  return code;
×
1161
}
1162

1163
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1164
  SMnode *pMnode = pMsg->info.node;
×
1165

1166
  SSdb *           pSdb = pMnode->pSdb;
×
1167
  SMqSubscribeObj *pSub = NULL;
×
1168
  int32_t          code = 0;
×
1169
  int32_t          lino = 0;
×
1170

1171
  PRINT_LOG_START
×
1172
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1173
  void *pIter = NULL;
×
1174
  while (1) {
1175
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1176
    if (pIter == NULL) {
×
1177
      break;
×
1178
    }
1179

1180
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1181
    sdbRelease(pSdb, pSub);
×
1182
  }
1183
  taosHashClear(topicsToReload);
×
1184
END:
×
1185
  sdbCancelFetch(pSdb, pIter);
×
1186
  sdbRelease(pSdb, pSub);
×
1187
  PRINT_LOG_END
×
1188

1189
  return code;
×
1190
}
1191

1192
static int32_t normalRebalance(SRpcMsg *pMsg) {
13,544,322✔
1193
  int     code = 0;
13,544,322✔
1194
  int32_t lino = 0;
13,544,322✔
1195

1196
  void *  pIter = NULL;
13,544,322✔
1197
  SMnode *pMnode = pMsg->info.node;
13,544,322✔
1198

1199
  PRINT_LOG_START
13,544,322✔
1200
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
13,544,322✔
1201
  MND_TMQ_NULL_CHECK(rebSubHash);
13,544,322✔
1202

1203
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
13,544,322✔
1204

1205
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
13,544,322✔
1206
  if (taosHashGetSize(rebSubHash) > 0) {
13,543,988✔
1207
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
272,981✔
1208
  }
1209

1210
  while (1) {
313,467✔
1211
    pIter = taosHashIterate(rebSubHash, pIter);
13,857,455✔
1212
    if (pIter == NULL) {
13,857,455✔
1213
      break;
13,543,988✔
1214
    }
1215

1216
    SMqRebInputObj  rebInput = {0};
313,467✔
1217
    SMqRebOutputObj rebOutput = {0};
313,467✔
1218
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
313,467✔
1219
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
313,467✔
1220
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
313,467✔
1221
    if (code != 0) {
313,467✔
1222
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1223
    }
1224

1225
    if (code == 0) {
313,467✔
1226
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
313,467✔
1227
      if (code != 0) {
313,467✔
1228
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1229
      }
1230
    }
1231

1232
    if (code == 0) {
313,467✔
1233
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
313,467✔
1234
      if (code != 0) {
313,467✔
1235
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
59,458✔
1236
      }
1237
    }
1238

1239
    clearRebOutput(&rebOutput);
313,467✔
1240
  }
1241

1242
  if (taosHashGetSize(rebSubHash) > 0) {
13,543,988✔
1243
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
272,981✔
1244
  }
1245

1246
END:
13,271,007✔
1247
  PRINT_LOG_END
13,544,322✔
1248
  taosHashCancelIterate(rebSubHash, pIter);
13,544,322✔
1249
  taosHashCleanup(rebSubHash);
13,544,322✔
1250
  return code;
13,544,322✔
1251
}
1252

1253
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
13,544,322✔
1254
  if (pMsg == NULL) {
13,544,322✔
1255
    return TSDB_CODE_INVALID_PARA;
×
1256
  }
1257
  int     code = 0;
13,544,322✔
1258
  int32_t lino = 0;
13,544,322✔
1259

1260
  void *  pIter = NULL;
13,544,322✔
1261
  SMnode *pMnode = pMsg->info.node;
13,544,322✔
1262
  PRINT_LOG_START;
13,544,322✔
1263
  if (!mndRebTryStart()) {
13,544,322✔
UNCOV
1264
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
UNCOV
1265
    goto END;
×
1266
  }
1267

1268
  if (taosHashGetSize(topicsToReload) > 0) {
13,544,322✔
1269
    code = reloadRebalance(pMsg);
×
1270
  } else {
1271
    code = normalRebalance(pMsg);
13,544,322✔
1272
  }
1273

1274
  mndRebCntDec();
13,544,322✔
1275

1276
END:
13,544,322✔
1277
  PRINT_LOG_END
13,544,322✔
1278
  TAOS_RETURN(code);
13,544,322✔
1279
}
1280

1281
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
72,247✔
1282
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
72,247✔
1283
    return TSDB_CODE_INVALID_PARA;
×
1284
  }
1285
  void *  pIter = NULL;
72,247✔
1286
  SVgObj *pVgObj = NULL;
72,247✔
1287
  int32_t code = 0;
72,247✔
1288
  int32_t lino = 0;
72,247✔
1289
  PRINT_LOG_START
72,247✔
1290
  while (1) {
528,285✔
1291
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
600,532✔
1292
    if (pIter == NULL) {
600,532✔
1293
      break;
72,247✔
1294
    }
1295
    if (pVgObj->mountVgId) {
528,285✔
1296
      sdbRelease(pMnode->pSdb, pVgObj);
×
1297
      continue;
×
1298
    }
1299

1300
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
528,285✔
1301
      sdbRelease(pMnode->pSdb, pVgObj);
289,315✔
1302
      continue;
289,315✔
1303
    }
1304
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
238,970✔
1305
    MND_TMQ_NULL_CHECK(pReq);
238,970✔
1306
    pReq->head.vgId = htonl(pVgObj->vgId);
238,970✔
1307
    pReq->vgId = pVgObj->vgId;
238,970✔
1308
    pReq->consumerId = -1;
238,970✔
1309
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
238,970✔
1310

1311
    STransAction action = {0};
238,970✔
1312
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
238,970✔
1313
    action.pCont = pReq;
238,970✔
1314
    action.contLen = sizeof(SMqVDeleteReq);
238,970✔
1315
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
238,970✔
1316
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
238,970✔
1317

1318
    sdbRelease(pMnode->pSdb, pVgObj);
238,970✔
1319
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
238,970✔
1320
  }
1321

1322
END:
72,247✔
1323
  PRINT_LOG_END
72,247✔
1324
  sdbRelease(pMnode->pSdb, pVgObj);
72,247✔
1325
  sdbCancelFetch(pMnode->pSdb, pIter);
72,247✔
1326
  return code;
72,247✔
1327
}
1328

1329
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
579✔
1330
                                   char *cgroup) {
1331
  int32_t         code = 0;
579✔
1332
  int32_t         lino = 0;
579✔
1333
  SMqConsumerObj *pConsumerNew = NULL;
579✔
1334

1335
  taosRLockLatch(&pConsumer->lock);
579✔
1336

1337
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
579✔
1338
    goto END;
×
1339
  }
1340

1341
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
579✔
1342
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
579✔
1343
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
579✔
1344
  if (found1 || found2 || found3) {
579✔
1345
    if (deleteConsumer) {
315✔
1346
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
315✔
1347
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
315✔
1348
      tDeleteSMqConsumerObj(pConsumerNew);
315✔
1349
      pConsumerNew = NULL;
315✔
1350
    } else {
1351
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1352
             pConsumer->consumerId, pConsumer->cgroup);
1353
      code = TSDB_CODE_MND_CGROUP_USED;
×
1354
      goto END;
×
1355
    }
1356
  }
1357

1358
END:
579✔
1359
  tDeleteSMqConsumerObj(pConsumerNew);
579✔
1360
  taosRUnLockLatch(&pConsumer->lock);
579✔
1361
  return code;
579✔
1362
}
1363

1364
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
579✔
1365
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
579✔
1366
    return TSDB_CODE_INVALID_PARA;
×
1367
  }
1368
  void *          pIter = NULL;
579✔
1369
  SMqConsumerObj *pConsumer = NULL;
579✔
1370
  int             code = 0;
579✔
1371
  int32_t         lino = 0;
579✔
1372
  PRINT_LOG_START
579✔
1373
  while (1) {
1374
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,158✔
1375
    if (pIter == NULL) {
1,158✔
1376
      break;
579✔
1377
    }
1378
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
579✔
1379
    sdbRelease(pMnode->pSdb, pConsumer);
579✔
1380
  }
1381

1382
END:
579✔
1383
  sdbRelease(pMnode->pSdb, pConsumer);
579✔
1384
  sdbCancelFetch(pMnode->pSdb, pIter);
579✔
1385
  return code;
579✔
1386
}
1387

1388
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
579✔
1389
  int32_t code = 0;
579✔
1390
  int32_t lino = 0;
579✔
1391
  STrans *pTrans = NULL;
579✔
1392
  SMnode *pMnode = pMsg->info.node;
579✔
1393
  PRINT_LOG_START
579✔
1394
  taosRLockLatch(&pSub->lock);
579✔
1395
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
579✔
1396
    code = TSDB_CODE_MND_CGROUP_USED;
×
1397
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1398
    goto END;
×
1399
  }
1400

1401
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
579✔
1402
  MND_TMQ_NULL_CHECK(pTrans);
579✔
1403
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
579✔
1404
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
579✔
1405
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
579✔
1406
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
579✔
1407
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
579✔
1408
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
579✔
1409
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
579✔
1410

1411
END:
579✔
1412
  taosRUnLockLatch(&pSub->lock);
579✔
1413
  mndTransDrop(pTrans);
579✔
1414
  PRINT_LOG_END
579✔
1415
  return code;
579✔
1416
}
1417

1418
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
579✔
1419
  if (pMsg == NULL) {
579✔
1420
    return TSDB_CODE_INVALID_PARA;
×
1421
  }
1422
  SMnode *        pMnode = pMsg->info.node;
579✔
1423
  SMDropCgroupReq dropReq = {0};
579✔
1424
  int32_t         code = 0;
579✔
1425
  int32_t         lino = 0;
579✔
1426

1427
  SMqSubscribeObj *pSub = NULL;
579✔
1428

1429
  PRINT_LOG_START
579✔
1430
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
579✔
1431
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
579✔
1432
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
579✔
1433
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
579✔
1434
  if (code != 0) {
579✔
1435
    if (dropReq.igNotExists) {
×
1436
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1437
      mndReleaseSubscribe(pMnode, pSub);
×
1438
      return 0;
×
1439
    } else {
1440
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1441
      goto END;
×
1442
    }
1443
  }
1444
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
579✔
1445

1446
END:
579✔
1447
  mndReleaseSubscribe(pMnode, pSub);
579✔
1448
  PRINT_LOG_END
579✔
1449

1450
  if (code != 0) {
579✔
1451
    TAOS_RETURN(code);
×
1452
  }
1453
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
579✔
1454
}
1455

1456
void mndCleanupSubscribe(SMnode *pMnode) {}
396,618✔
1457

1458
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
421,456✔
1459
  if (pSub == NULL) {
421,456✔
1460
    return NULL;
×
1461
  }
1462
  int32_t code = 0;
421,456✔
1463
  int32_t lino = 0;
421,456✔
1464
  terrno = TSDB_CODE_OUT_OF_MEMORY;
421,456✔
1465
  void *  buf = NULL;
421,456✔
1466
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
421,456✔
1467
  if (tlen <= 0) goto SUB_ENCODE_OVER;
421,456✔
1468
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
421,456✔
1469

1470
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
421,456✔
1471
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
421,456✔
1472

1473
  buf = taosMemoryMalloc(tlen);
421,456✔
1474
  if (buf == NULL) goto SUB_ENCODE_OVER;
421,456✔
1475

1476
  void *abuf = buf;
421,456✔
1477
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
421,456✔
1478
    goto SUB_ENCODE_OVER;
×
1479
  }
1480

1481
  int32_t dataPos = 0;
421,456✔
1482
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
421,456✔
1483
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
421,456✔
1484
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
421,456✔
1485
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
421,456✔
1486

1487
  terrno = TSDB_CODE_SUCCESS;
421,456✔
1488

1489
SUB_ENCODE_OVER:
421,456✔
1490
  taosMemoryFreeClear(buf);
421,456✔
1491
  if (terrno != TSDB_CODE_SUCCESS) {
421,456✔
1492
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1493
    sdbFreeRaw(pRaw);
×
1494
    return NULL;
×
1495
  }
1496

1497
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
421,456✔
1498
  return pRaw;
421,456✔
1499
}
1500

1501
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
340,611✔
1502
  if (pRaw == NULL) {
340,611✔
1503
    return NULL;
×
1504
  }
1505
  int32_t code = 0;
340,611✔
1506
  int32_t lino = 0;
340,611✔
1507
  terrno = TSDB_CODE_OUT_OF_MEMORY;
340,611✔
1508
  SSdbRow *        pRow = NULL;
340,611✔
1509
  SMqSubscribeObj *pSub = NULL;
340,611✔
1510
  void *           buf = NULL;
340,611✔
1511

1512
  int8_t sver = 0;
340,611✔
1513
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
340,611✔
1514

1515
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
340,611✔
1516
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1517
    goto SUB_DECODE_OVER;
×
1518
  }
1519

1520
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
340,611✔
1521
  if (pRow == NULL) goto SUB_DECODE_OVER;
340,611✔
1522

1523
  pSub = sdbGetRowObj(pRow);
340,611✔
1524
  if (pSub == NULL) goto SUB_DECODE_OVER;
340,611✔
1525

1526
  int32_t dataPos = 0;
340,611✔
1527
  int32_t tlen;
340,299✔
1528
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
340,611✔
1529
  buf = taosMemoryMalloc(tlen);
340,611✔
1530
  if (buf == NULL) goto SUB_DECODE_OVER;
340,611✔
1531
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
340,611✔
1532
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
340,611✔
1533

1534
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
340,611✔
1535
    goto SUB_DECODE_OVER;
×
1536
  }
1537

1538
  // update epset saved in mnode
1539
  if (pSub->unassignedVgs != NULL) {
340,611✔
1540
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
340,611✔
1541
    for (int32_t i = 0; i < size; ++i) {
941,780✔
1542
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
601,169✔
1543
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
601,169✔
1544
    }
1545
  }
1546
  if (pSub->consumerHash != NULL) {
340,611✔
1547
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
340,611✔
1548
    while (pIter) {
503,647✔
1549
      SMqConsumerEp *pConsumerEp = pIter;
163,036✔
1550
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
163,036✔
1551
      for (int32_t i = 0; i < size; ++i) {
602,543✔
1552
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
439,507✔
1553
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
439,507✔
1554
      }
1555
      pIter = taosHashIterate(pSub->consumerHash, pIter);
163,036✔
1556
    }
1557
  }
1558

1559
  terrno = TSDB_CODE_SUCCESS;
340,611✔
1560

1561
SUB_DECODE_OVER:
340,611✔
1562
  taosMemoryFreeClear(buf);
340,611✔
1563
  if (terrno != TSDB_CODE_SUCCESS) {
340,611✔
1564
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1565
    taosMemoryFreeClear(pRow);
×
1566
    return NULL;
×
1567
  }
1568

1569
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
340,611✔
1570
  return pRow;
340,611✔
1571
}
1572

1573
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
129,020✔
1574
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
129,020✔
1575
  return 0;
129,020✔
1576
}
1577

1578
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
340,611✔
1579
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
340,611✔
1580
  tDeleteSubscribeObj(pSub);
340,611✔
1581
  return 0;
340,611✔
1582
}
1583

1584
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
138,820✔
1585
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
138,820✔
1586
  mDebug("subscribe:%s, perform update action", pOldSub->key);
138,820✔
1587

1588
  taosWLockLatch(&pOldSub->lock);
138,820✔
1589
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
138,820✔
1590
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
138,820✔
1591
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
138,820✔
1592
  taosWUnLockLatch(&pOldSub->lock);
138,820✔
1593

1594
  return 0;
138,820✔
1595
}
1596

1597
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
3,304,761✔
1598
  if (pMnode == NULL || key == NULL || pSub == NULL) {
3,304,761✔
1599
    return TSDB_CODE_INVALID_PARA;
×
1600
  }
1601
  SSdb *pSdb = pMnode->pSdb;
3,304,761✔
1602
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
3,304,761✔
1603
  if (*pSub == NULL) {
3,304,761✔
1604
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
232,494✔
1605
  }
1606
  return 0;
3,072,267✔
1607
}
1608

1609
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
156,576✔
1610
  if (pMnode == NULL || topicName == NULL) return 0;
156,576✔
1611
  int32_t num = 0;
156,576✔
1612
  SSdb *  pSdb = pMnode->pSdb;
156,576✔
1613

1614
  void *           pIter = NULL;
156,576✔
1615
  SMqSubscribeObj *pSub = NULL;
156,576✔
1616
  while (1) {
210,214✔
1617
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
366,790✔
1618
    if (pIter == NULL) break;
366,790✔
1619

1620
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
210,214✔
1621
    char cgroup[TSDB_CGROUP_LEN] = {0};
210,214✔
1622
    taosRLockLatch(&pSub->lock);
210,214✔
1623
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
210,214✔
1624
    taosRUnLockLatch(&pSub->lock);
210,214✔
1625
    if (strcmp(topic, topicName) != 0) {
210,214✔
1626
      sdbRelease(pSdb, pSub);
108,498✔
1627
      continue;
108,498✔
1628
    }
1629

1630
    num++;
101,716✔
1631
    sdbRelease(pSdb, pSub);
101,716✔
1632
  }
1633

1634
  return num;
156,576✔
1635
}
1636

1637
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
3,188,514✔
1638
  if (pMnode == NULL || pSub == NULL) return;
3,188,514✔
1639
  SSdb *pSdb = pMnode->pSdb;
3,072,267✔
1640
  sdbRelease(pSdb, pSub);
3,072,267✔
1641
}
1642

1643
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
72,247✔
1644
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
72,247✔
1645
  int32_t code = 0;
72,247✔
1646
  int32_t lino = 0;
72,247✔
1647
  PRINT_LOG_START
72,247✔
1648
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
72,247✔
1649
  MND_TMQ_NULL_CHECK(pCommitRaw);
72,247✔
1650
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
72,247✔
1651
  if (code != 0) {
72,247✔
1652
    sdbFreeRaw(pCommitRaw);
×
1653
    goto END;
×
1654
  }
1655
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
72,247✔
1656
END:
72,247✔
1657
  PRINT_LOG_END
72,247✔
1658
  return code;
72,247✔
1659
}
1660

1661
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
137,174✔
1662
  int32_t code = 0;
137,174✔
1663
  int32_t lino = 0;
137,174✔
1664
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
137,174✔
1665
  char    cgroup[TSDB_CGROUP_LEN] = {0};
137,174✔
1666
  taosRLockLatch(&pSub->lock);
137,174✔
1667
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
137,174✔
1668
  if (strcmp(topic, topicName) != 0) {
137,174✔
1669
    goto END;
65,506✔
1670
  }
1671

1672
  // iter all vnode to delete handle
1673
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
71,668✔
1674
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1675
    goto END;
×
1676
  }
1677

1678
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
71,668✔
1679
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
71,668✔
1680

1681
END:
137,174✔
1682
  taosRUnLockLatch(&pSub->lock);
137,174✔
1683

1684
  return code;
137,174✔
1685
}
1686

1687
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
94,417✔
1688
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
94,417✔
1689
  SSdb *           pSdb = pMnode->pSdb;
94,417✔
1690
  int32_t          code = 0;
94,417✔
1691
  int32_t          lino = 0;
94,417✔
1692
  void *           pIter = NULL;
94,417✔
1693
  SMqSubscribeObj *pSub = NULL;
94,417✔
1694
  PRINT_LOG_START
94,417✔
1695
  while (1) {
1696
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
231,591✔
1697
    if (pIter == NULL) break;
231,591✔
1698

1699
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
137,174✔
1700
    sdbRelease(pSdb, pSub);
137,174✔
1701
  }
1702

1703
END:
94,417✔
1704
  PRINT_LOG_END
94,417✔
1705
  sdbRelease(pSdb, pSub);
94,417✔
1706
  sdbCancelFetch(pSdb, pIter);
94,417✔
1707

1708
  TAOS_RETURN(code);
94,417✔
1709
}
1710

1711
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
224,510✔
1712
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1713
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
224,510✔
1714
    return TSDB_CODE_INVALID_PARA;
×
1715
  }
1716
  int32_t code = 0;
224,510✔
1717
  int32_t lino = 0;
224,510✔
1718
  PRINT_LOG_START
224,510✔
1719
  int32_t sz = taosArrayGetSize(vgs);
224,510✔
1720
  for (int32_t j = 0; j < sz; j++) {
390,584✔
1721
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
166,074✔
1722
    MND_TMQ_NULL_CHECK(pVgEp);
166,074✔
1723

1724
    SColumnInfoData *pColInfo = NULL;
166,074✔
1725
    int32_t          cols = 0;
166,074✔
1726

1727
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1728
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1729
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
166,074✔
1730

1731
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1732
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1733
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
166,074✔
1734

1735
    // vg id
1736
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1737
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1738
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
166,074✔
1739

1740
    // consumer id
1741
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
166,074✔
1742
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
166,074✔
1743
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
166,074✔
1744

1745
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1746
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1747
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
166,074✔
1748

1749
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
166,074✔
1750
    if (user) STR_TO_VARSTR(userStr, user);
166,074✔
1751
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1752
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1753
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
166,074✔
1754

1755
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
166,074✔
1756
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
166,074✔
1757
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
166,074✔
1758
    MND_TMQ_NULL_CHECK(pColInfo);
166,074✔
1759
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
166,074✔
1760

1761
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
166,074✔
1762
          varDataVal(cgroup), pVgEp->vgId);
1763

1764
    // offset
1765
    OffsetRows *data = NULL;
166,074✔
1766
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
432,564✔
1767
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
266,490✔
1768
      MND_TMQ_NULL_CHECK(tmp);
266,490✔
1769
      if (tmp->vgId != pVgEp->vgId) {
266,490✔
1770
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1771
        continue;
175,092✔
1772
      }
1773
      data = tmp;
91,398✔
1774
    }
1775
    if (data) {
166,074✔
1776
      // vg id
1777
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
91,398✔
1778
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
91,398✔
1779
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
182,796✔
1780
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
91,398✔
1781
      varDataSetLen(buf, strlen(varDataVal(buf)));
91,398✔
1782
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
91,398✔
1783
      MND_TMQ_NULL_CHECK(pColInfo);
91,398✔
1784
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
91,398✔
1785
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
91,398✔
1786
      MND_TMQ_NULL_CHECK(pColInfo);
91,398✔
1787
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
91,398✔
1788
    } else {
1789
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
74,676✔
1790
      MND_TMQ_NULL_CHECK(pColInfo);
74,676✔
1791
      colDataSetNULL(pColInfo, *numOfRows);
74,676✔
1792
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
74,676✔
1793
      MND_TMQ_NULL_CHECK(pColInfo);
74,676✔
1794
      colDataSetNULL(pColInfo, *numOfRows);
74,676✔
1795
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
74,676✔
1796
    }
1797
    (*numOfRows)++;
166,074✔
1798
  }
1799

1800
END:
224,510✔
1801
  PRINT_LOG_END
224,510✔
1802
  return code;
224,510✔
1803
}
1804

1805
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SSDataBlock *pBlock, int32_t *numOfRows,
121,110✔
1806
                           int32_t rowsCapacity) {
1807
  int32_t        code = 0;
121,110✔
1808
  int32_t        lino = 0;
121,110✔
1809
  SMnode *       pMnode = pReq->info.node;
121,110✔
1810
  SSdb *         pSdb = pMnode->pSdb;
121,110✔
1811
  SMqConsumerEp *pConsumerEp = NULL;
121,110✔
1812
  void *         pIter = NULL;
121,110✔
1813
  PRINT_LOG_START
121,110✔
1814

1815
  taosRLockLatch(&pSub->lock);
121,110✔
1816
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
121,110✔
1817
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1818
  }
1819

1820
  // topic and cgroup
1821
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
121,110✔
1822
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
121,110✔
1823
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
121,110✔
1824
  varDataSetLen(topic, strlen(varDataVal(topic)));
121,110✔
1825
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
121,110✔
1826

1827
  while (1) {
103,400✔
1828
    pIter = taosHashIterate(pSub->consumerHash, pIter);
224,510✔
1829
    if (pIter == NULL) break;
224,510✔
1830
    pConsumerEp = (SMqConsumerEp *)pIter;
103,400✔
1831

1832
    char *          user = NULL;
103,400✔
1833
    char *          fqdn = NULL;
103,400✔
1834
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
103,400✔
1835
    if (pConsumer != NULL) {
103,400✔
1836
      user = pConsumer->user;
103,400✔
1837
      fqdn = pConsumer->fqdn;
103,400✔
1838
      sdbRelease(pSdb, pConsumer);
103,400✔
1839
    }
1840
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
103,400✔
1841
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1842
  }
1843

1844
  MND_TMQ_RETURN_CHECK(
121,110✔
1845
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1846

1847
  pBlock->info.rows = *numOfRows;
121,110✔
1848

1849
END:
121,110✔
1850
  taosRUnLockLatch(&pSub->lock);
121,110✔
1851
  taosHashCancelIterate(pSub->consumerHash, pIter);
121,110✔
1852
  PRINT_LOG_END
121,110✔
1853
  return code;
121,110✔
1854
}
1855

1856
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
22,724✔
1857
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
22,724✔
1858
    return TSDB_CODE_INVALID_PARA;
×
1859
  }
1860
  SMnode *         pMnode = pReq->info.node;
22,724✔
1861
  SSdb *           pSdb = pMnode->pSdb;
22,724✔
1862
  int32_t          numOfRows = 0;
22,724✔
1863
  SMqSubscribeObj *pSub = NULL;
22,724✔
1864
  int32_t          code = 0;
22,724✔
1865
  int32_t          lino = 0;
22,724✔
1866

1867
  mInfo("mnd show subscriptions begin");
22,724✔
1868

1869
  while (numOfRows < rowsCapacity) {
143,834✔
1870
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
143,834✔
1871
    if (pShow->pIter == NULL) {
143,834✔
1872
      break;
22,724✔
1873
    }
1874

1875
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pBlock, &numOfRows, rowsCapacity));
121,110✔
1876

1877
    sdbRelease(pSdb, pSub);
121,110✔
1878
    pSub = NULL;
121,110✔
1879
  }
1880
  mInfo("mnd end show subscriptions");
22,724✔
1881
  pShow->numOfRows += numOfRows;
22,724✔
1882

1883
END:
22,724✔
1884
  sdbCancelFetch(pSdb, pShow->pIter);
22,724✔
1885
  sdbRelease(pSdb, pSub);
22,724✔
1886

1887
  if (code != 0) {
22,724✔
1888
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1889
    TAOS_RETURN(code);
×
1890
  } else {
1891
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
22,724✔
1892
    return numOfRows;
22,724✔
1893
  }
1894
}
1895

1896
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1897
  if (pMnode == NULL) {
×
1898
    return;
×
1899
  }
1900
  SSdb *pSdb = pMnode->pSdb;
×
1901
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc