• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5061

17 May 2026 01:15AM UTC coverage: 73.408% (-0.02%) from 73.425%
#5061

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281737 of 383795 relevant lines covered (73.41%)

134677323.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.32
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_SUBSCRIBE_VER_NUMBER   4
29
#define MND_SUBSCRIBE_RESERVE_SIZE 64
30

31
//#define MND_CONSUMER_LOST_HB_CNT          6
32

33
static int32_t mqRebInExecCnt = 0;
34

35
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
36
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
37
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
39
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
41
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
42
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
44
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
45

46
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
338,433✔
47
  if (pTrans == NULL || pSub == NULL) {
338,433✔
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  int32_t code = 0;
338,433✔
51
  int32_t lino = 0;
338,433✔
52
  PRINT_LOG_START
338,433✔
53
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
338,433✔
54
  MND_TMQ_NULL_CHECK(pCommitRaw);
338,433✔
55
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
338,433✔
56
  if (code != 0) {
338,433✔
57
    sdbFreeRaw(pCommitRaw);
×
58
    goto END;
×
59
  }
60
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
338,433✔
61

62
END:
338,433✔
63
  PRINT_LOG_END
338,433✔
64
  return code;
338,433✔
65
}
66

67
int32_t mndInitSubscribe(SMnode *pMnode) {
532,034✔
68
  SSdbTable table = {
532,034✔
69
      .sdbType = SDB_SUBSCRIBE,
70
      .keyType = SDB_KEY_BINARY,
71
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
72
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
73
      .insertFp = (SdbInsertFp)mndSubActionInsert,
74
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
75
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
76
  };
77

78
  if (pMnode == NULL) {
532,034✔
79
    return TSDB_CODE_INVALID_PARA;
×
80
  }
81
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
532,034✔
82
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
532,034✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
532,034✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
532,034✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
532,034✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
532,034✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
532,034✔
89

90
  return sdbSetTable(pMnode->pSdb, table);
532,034✔
91
}
92

93
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
156,617✔
94
  int32_t code = 0;
156,617✔
95
  SSdb *  pSdb = pMnode->pSdb;
156,617✔
96
  SVgObj *pVgroup = NULL;
156,617✔
97

98
  void *pIter = NULL;
156,617✔
99
  while (1) {
876,345✔
100
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,032,962✔
101
    if (pIter == NULL) {
1,032,962✔
102
      break;
156,617✔
103
    }
104

105
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
876,345✔
106
      sdbRelease(pSdb, pVgroup);
490,134✔
107
      continue;
490,134✔
108
    }
109

110
    pSub->vgNum++;
386,211✔
111

112
    int32_t vgId = pVgroup->vgId;
386,211✔
113
    if (taosArrayPush(pSub->unassignedVgs, &vgId) == NULL) {
772,422✔
114
      code = terrno;
×
115
      sdbRelease(pSdb, pVgroup);
×
116
      sdbCancelFetch(pSdb, pIter);
×
117
      goto END;
×
118
    }
119
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, vgId);
386,211✔
120
    sdbRelease(pSdb, pVgroup);
386,211✔
121
  }
122

123
END:
156,617✔
124
  return code;
156,617✔
125
}
126

127
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
156,617✔
128
                                     SMqSubscribeObj **pSub) {
129
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
156,617✔
130
    return TSDB_CODE_INVALID_PARA;
×
131
  }
132
  int32_t lino = 0;
156,617✔
133
  int32_t code = 0;
156,617✔
134
  PRINT_LOG_START
156,617✔
135
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
156,617✔
136
  (*pSub)->dbUid = pTopic->dbUid;
156,617✔
137
  (*pSub)->stbUid = pTopic->stbUid;
156,617✔
138
  (*pSub)->subType = pTopic->subType;
156,617✔
139
  (*pSub)->withMeta = pTopic->withMeta;
156,617✔
140

141
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
156,617✔
142

143
END:
156,617✔
144
  PRINT_LOG_END
156,617✔
145
  return code;
156,617✔
146
}
147

148
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
2,249,683✔
149
  if (key == NULL || topic == NULL || cgroup == NULL) {
2,249,683✔
150
    return;
×
151
  }
152
  int32_t i = 0;
2,249,683✔
153
  while (key[i] != TMQ_SEPARATOR_CHAR) {
16,501,503✔
154
    i++;
14,251,820✔
155
  }
156
  (void)memcpy(cgroup, key, i);
2,249,683✔
157
  cgroup[i] = 0;
2,249,683✔
158
  if (fullName) {
2,249,683✔
159
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
2,031,122✔
160
  } else {
161
    while (key[i] != '.') {
655,683✔
162
      i++;
437,122✔
163
    }
164
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
218,561✔
165
  }
166
}
167

168
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
960,584✔
169
                                    const SMqRebOutputVg *pRebVg) {
170
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
960,584✔
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  SMqRebVgReq  req = {0};
960,584✔
174
  int32_t      code = 0;
960,584✔
175
  int32_t      lino = 0;
960,584✔
176
  SEncoder     encoder = {0};
960,584✔
177
  SMqTopicObj *pTopic = NULL;
960,584✔
178
  void *       buf = NULL;
960,584✔
179

180
  PRINT_LOG_START
960,584✔
181
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
960,584✔
182
  char cgroup[TSDB_CGROUP_LEN] = {0};
960,584✔
183
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
960,584✔
184
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
960,584✔
185
  taosRLockLatch(&pTopic->lock);
960,584✔
186
  req.oldConsumerId = pRebVg->oldConsumerId;
960,584✔
187
  req.newConsumerId = pRebVg->newConsumerId;
960,584✔
188
  req.vgId = pRebVg->vgId;
960,584✔
189
  req.qmsg = pTopic->physicalPlan;
960,584✔
190
  req.schema = pTopic->schema;
960,584✔
191
  req.subType = pSub->subType;
960,584✔
192
  req.withMeta = pSub->withMeta;
960,584✔
193
  req.suid = pSub->stbUid;
960,584✔
194
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
960,584✔
195

196
  int32_t tlen = 0;
960,584✔
197
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
960,584✔
198
  if (code < 0) {
960,584✔
199
    goto END;
×
200
  }
201

202
  tlen += sizeof(SMsgHead);
960,584✔
203
  buf = taosMemoryMalloc(tlen);
960,584✔
204
  MND_TMQ_NULL_CHECK(buf);
960,584✔
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
960,584✔
206
  pMsgHead->contLen = htonl(tlen);
960,584✔
207
  pMsgHead->vgId = htonl(pRebVg->vgId);
960,584✔
208

209
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
960,584✔
210
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
960,584✔
211
  *pBuf = buf;
960,584✔
212
  buf = NULL;
960,584✔
213
  *pLen = tlen;
960,584✔
214

215
END:
960,584✔
216
  PRINT_LOG_END
960,584✔
217
  taosMemoryFree(buf);
960,584✔
218
  if (pTopic != NULL) {
960,584✔
219
    taosRUnLockLatch(&pTopic->lock);
960,584✔
220
  }
221
  mndReleaseTopic(pMnode, pTopic);
960,584✔
222
  tEncoderClear(&encoder);
960,584✔
223
  return code;
960,584✔
224
}
225

226
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
960,584✔
227
                                        const SMqRebOutputVg *pRebVg) {
228
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
960,584✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t code = 0;
960,584✔
232
  int32_t lino = 0;
960,584✔
233
  void *  buf = NULL;
960,584✔
234
  PRINT_LOG_START
960,584✔
235
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
960,584✔
236
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
237
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
238
    goto END;
×
239
  }
240

241
  int32_t tlen = 0;
960,584✔
242
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
960,584✔
243
  int32_t vgId = pRebVg->vgId;
960,584✔
244
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
960,584✔
245
  if (pVgObj == NULL) {
960,584✔
246
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
247
    goto END;
×
248
  }
249

250
  STransAction action = {0};
960,584✔
251
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
960,584✔
252
  action.pCont = buf;
960,584✔
253
  buf = NULL;
960,584✔
254
  action.contLen = tlen;
960,584✔
255
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
960,584✔
256

257
  mndReleaseVgroup(pMnode, pVgObj);
960,584✔
258
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
960,584✔
259

260
END:
960,584✔
261
  PRINT_LOG_END
960,584✔
262
  taosMemoryFree(buf);
960,584✔
263
  return code;
960,584✔
264
}
265

266
static void freeRebalanceItem(void *param) {
443,411✔
267
  if (param == NULL) return;
443,411✔
268
  SMqRebInfo *pInfo = param;
443,411✔
269
  taosArrayDestroy(pInfo->newConsumers);
443,411✔
270
  taosArrayDestroy(pInfo->removedConsumers);
443,411✔
271
}
272

273
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
449,230✔
274
  if (pHash == NULL || key == NULL) {
449,230✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t code = 0;
449,230✔
278
  int32_t lino = 0;
449,230✔
279
  PRINT_LOG_START
449,230✔
280
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
449,230✔
281
  if (pRebInfo == NULL) {
449,230✔
282
    pRebInfo = tNewSMqRebSubscribe(key);
443,411✔
283
    if (pRebInfo == NULL) {
443,411✔
284
      code = terrno;
×
285
      goto END;
×
286
    }
287
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
443,411✔
288
    if (code != 0) {
443,411✔
289
      freeRebalanceItem(pRebInfo);
×
290
      taosMemoryFreeClear(pRebInfo);
×
291
      goto END;
×
292
    }
293
    taosMemoryFreeClear(pRebInfo);
443,411✔
294

295
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
443,411✔
296
    MND_TMQ_NULL_CHECK(pRebInfo);
443,411✔
297
  }
298
  if (pReb) {
449,230✔
299
    *pReb = pRebInfo;
343,148✔
300
  }
301

302
END:
106,082✔
303
  PRINT_LOG_END
449,230✔
304
  return code;
449,230✔
305
}
306

307
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
1,168,188✔
308
  if (vgs == NULL || pHash == NULL || key == NULL) {
1,168,188✔
309
    return TSDB_CODE_INVALID_PARA;
×
310
  }
311
  int32_t code = 0;
1,168,188✔
312
  int32_t lino = 0;
1,168,188✔
313
  PRINT_LOG_START
1,168,188✔
314
  int32_t *pVgId = (int32_t *)taosArrayPop(vgs);
1,168,188✔
315
  MND_TMQ_NULL_CHECK(pVgId);
1,168,188✔
316
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgId};
1,168,188✔
317
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, pVgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
1,168,188✔
318
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, *pVgId, consumerId);
1,168,188✔
319

320
END:
1,167,846✔
321
  PRINT_LOG_END
1,168,188✔
322
  return code;
1,168,188✔
323
}
324

325
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
443,411✔
326
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
443,411✔
327
    return TSDB_CODE_INVALID_PARA;
×
328
  }
329
  int32_t code = 0;
443,411✔
330
  int32_t lino = 0;
443,411✔
331
  PRINT_LOG_START
443,411✔
332
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
443,411✔
333
  int32_t actualRemoved = 0;
443,411✔
334
  for (int32_t i = 0; i < numOfRemoved; i++) {
596,039✔
335
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
152,628✔
336
    MND_TMQ_NULL_CHECK(consumerId);
152,628✔
337
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
152,628✔
338
    if (pConsumerEp == NULL) {
152,628✔
339
      continue;
×
340
    }
341

342
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
152,628✔
343
    for (int32_t j = 0; j < consumerVgNum; j++) {
612,594✔
344
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
459,966✔
345
    }
346

347
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
152,628✔
348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
305,256✔
349
    actualRemoved++;
152,628✔
350
  }
351

352
  if (numOfRemoved != actualRemoved) {
443,411✔
353
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
354
           numOfRemoved, actualRemoved);
355
  } else {
356
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
443,411✔
357
  }
358
END:
×
359
  PRINT_LOG_END
443,411✔
360
  return code;
443,411✔
361
}
362

363
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
443,411✔
364
  if (pOutput == NULL || pInput == NULL) {
443,411✔
365
    return TSDB_CODE_INVALID_PARA;
×
366
  }
367
  int32_t code = 0;
443,411✔
368
  int32_t lino = 0;
443,411✔
369
  PRINT_LOG_START
443,411✔
370
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
443,411✔
371

372
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
633,931✔
373
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
190,520✔
374
    MND_TMQ_NULL_CHECK(consumerId);
190,520✔
375
    SMqConsumerEp newConsumerEp = {0};
190,520✔
376
    newConsumerEp.consumerId = *consumerId;
190,520✔
377
    newConsumerEp.vgs = taosArrayInit(0, sizeof(int32_t));
190,520✔
378
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
190,520✔
379
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
190,520✔
380
        0) {
381
      freeSMqConsumerEp(&newConsumerEp);
×
382
      code = terrno;
×
383
      goto END;
×
384
    }
385
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
381,040✔
386
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
190,520✔
387
  }
388
END:
443,411✔
389
  PRINT_LOG_END
443,411✔
390
  return code;
443,411✔
391
}
392

393
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
443,411✔
394
  if (pOutput == NULL || pHash == NULL) {
443,411✔
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
443,411✔
398
  int32_t lino = 0;
443,411✔
399
  PRINT_LOG_START
443,411✔
400
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
443,411✔
401
  for (int32_t i = 0; i < numOfVgroups; i++) {
1,148,515✔
402
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
705,104✔
403
  }
404
END:
443,411✔
405
  PRINT_LOG_END
443,411✔
406
  return code;
443,411✔
407
}
408

409
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
443,411✔
410
                                        int32_t remainderVgCnt) {
411
  if (pOutput == NULL || pHash == NULL) {
443,411✔
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414
  int32_t code = 0;
443,411✔
415
  int32_t lino = 0;
443,411✔
416
  int32_t cnt = 0;
443,411✔
417
  void *  pIter = NULL;
443,411✔
418
  PRINT_LOG_START
443,411✔
419

420
  while (1) {
113,985✔
421
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
557,396✔
422
    if (pIter == NULL) {
557,396✔
423
      break;
443,411✔
424
    }
425

426
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
113,985✔
427
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
113,985✔
428

429
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
227,970✔
430
    if (consumerVgNum > minVgCnt) {
113,985✔
431
      if (cnt < remainderVgCnt) {
2,623✔
432
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
1,064✔
433
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
434
        }
435
        cnt++;
1,064✔
436
      } else {
437
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
4,677✔
438
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
3,118✔
439
        }
440
      }
441
    }
442
  }
443
END:
443,411✔
444
  PRINT_LOG_END
443,411✔
445
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
443,411✔
446
  return code;
443,411✔
447
}
448

449
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
443,411✔
450
  if (pMnode == NULL || pOutput == NULL) {
443,411✔
451
    return TSDB_CODE_INVALID_PARA;
×
452
  }
453
  int32_t code = 0;
443,411✔
454
  int32_t lino = 0;
443,411✔
455
  int32_t totalVgNum = 0;
443,411✔
456
  SVgObj *pVgroup = NULL;
443,411✔
457
  void *  pIter = NULL;
443,411✔
458
  void *  pIterHash = NULL;
443,411✔
459
  SArray *newVgs = taosArrayInit(0, sizeof(int32_t));
443,411✔
460
  MND_TMQ_NULL_CHECK(newVgs);
443,411✔
461
  while (1) {
2,397,087✔
462
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,840,498✔
463
    if (pIter == NULL) {
2,840,498✔
464
      break;
443,411✔
465
    }
466
    if (pVgroup->mountVgId) {
2,397,087✔
467
      sdbRelease(pMnode->pSdb, pVgroup);
×
468
      continue;
×
469
    }
470

471
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
2,397,087✔
472
      sdbRelease(pMnode->pSdb, pVgroup);
1,218,481✔
473
      continue;
1,218,481✔
474
    }
475

476
    totalVgNum++;
1,178,606✔
477
    int32_t vgId = pVgroup->vgId;
1,178,606✔
478
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &vgId));
1,178,606✔
479
    sdbRelease(pMnode->pSdb, pVgroup);
1,178,606✔
480
  }
481

482
  while (1) {
266,613✔
483
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
710,024✔
484
    if (pIterHash == NULL) break;
710,024✔
485
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
266,613✔
486
    int32_t        j = 0;
266,613✔
487
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
846,197✔
488
      int32_t *pVgIdTmp = taosArrayGet(pConsumerEp->vgs, j);
579,584✔
489
      MND_TMQ_NULL_CHECK(pVgIdTmp);
579,584✔
490
      bool find = false;
579,584✔
491
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
805,836✔
492
        int32_t *pnewVgId = taosArrayGet(newVgs, k);
699,754✔
493
        MND_TMQ_NULL_CHECK(pnewVgId);
699,754✔
494
        if (*pVgIdTmp == *pnewVgId) {
699,754✔
495
          taosArrayRemove(newVgs, k);
473,502✔
496
          find = true;
473,502✔
497
          break;
473,502✔
498
        }
499
      }
500
      if (!find) {
579,584✔
501
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", *pVgIdTmp);
106,082✔
502
        taosArrayRemove(pConsumerEp->vgs, j);
106,082✔
503
        continue;
106,082✔
504
      }
505
      j++;
473,502✔
506
    }
507
  }
508

509
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
443,411✔
510
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
106,082✔
511
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
106,082✔
512
  }
513

514
END:
443,411✔
515
  sdbRelease(pMnode->pSdb, pVgroup);
443,411✔
516
  sdbCancelFetch(pMnode->pSdb, pIter);
443,411✔
517
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
443,411✔
518
  taosArrayDestroy(newVgs);
443,411✔
519
  if (code != 0) {
443,411✔
520
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
521
    return code;
×
522
  } else {
523
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
443,411✔
524
    return totalVgNum;
443,411✔
525
  }
526
}
527

528
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
443,411✔
529
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
443,411✔
530
    return TSDB_CODE_INVALID_PARA;
×
531
  }
532
  void *           pIter = NULL;
443,411✔
533
  SMqSubscribeObj *pSub = NULL;
443,411✔
534
  int32_t          lino = 0;
443,411✔
535
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
443,411✔
536
  if (code != 0) {
443,411✔
537
    return 0;
156,617✔
538
  }
539
  taosRLockLatch(&pSub->lock);
286,794✔
540
  PRINT_LOG_START
286,794✔
541
  if (pOutput->pSub->offsetRows == NULL) {
286,794✔
542
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
227,075✔
543
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
227,075✔
544
  }
545
  while (1) {
266,613✔
546
    pIter = taosHashIterate(pSub->consumerHash, pIter);
553,407✔
547
    if (pIter == NULL) break;
553,407✔
548
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
266,613✔
549
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
266,613✔
550

551
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
833,737✔
552
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
567,124✔
553
      MND_TMQ_NULL_CHECK(d1);
567,124✔
554
      bool jump = false;
567,124✔
555
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
784,326✔
556
        int32_t *pVgId = taosArrayGet(pConsumerEpNew->vgs, i);
224,924✔
557
        MND_TMQ_NULL_CHECK(pVgId);
224,924✔
558
        if (*pVgId == d1->vgId) {
224,924✔
559
          jump = true;
7,722✔
560
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
7,722✔
561
                pConsumerEp->consumerId, *pVgId);
562
          break;
7,722✔
563
        }
564
      }
565
      if (jump) continue;
567,124✔
566
      bool find = false;
559,402✔
567
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,363,933✔
568
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
913,430✔
569
        MND_TMQ_NULL_CHECK(d2);
913,430✔
570
        if (d1->vgId == d2->vgId) {
913,430✔
571
          d2->rows += d1->rows;
108,899✔
572
          d2->offset = d1->offset;
108,899✔
573
          d2->ever = d1->ever;
108,899✔
574
          find = true;
108,899✔
575
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
108,899✔
576
          break;
108,899✔
577
        }
578
      }
579
      if (!find) {
559,402✔
580
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
901,006✔
581
      }
582
    }
583
  }
584

585
END:
286,794✔
586
  taosRUnLockLatch(&pSub->lock);
286,794✔
587
  taosHashCancelIterate(pSub->consumerHash, pIter);
286,794✔
588
  mndReleaseSubscribe(pMnode, pSub);
286,794✔
589
  PRINT_LOG_END
286,794✔
590
  return code;
286,794✔
591
}
592

593
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
443,411✔
594
  if (pOutput == NULL) return;
443,411✔
595
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
443,411✔
596
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,611,599✔
597
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
1,168,188✔
598
    if (pOutputRebVg == NULL) continue;
1,168,188✔
599
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
1,168,188✔
600
          pOutputRebVg->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
601
  }
602

603
  void *pIter = NULL;
443,411✔
604
  while (1) {
304,505✔
605
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
747,916✔
606
    if (pIter == NULL) break;
747,916✔
607
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
304,505✔
608
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
304,505✔
609
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
304,505✔
610
          pConsumerEp->consumerId, sz);
611
    for (int32_t i = 0; i < sz; i++) {
1,031,543✔
612
      int32_t *pVgId = taosArrayGet(pConsumerEp->vgs, i);
727,038✔
613
      if (pVgId == NULL) continue;
727,038✔
614
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, *pVgId,
727,038✔
615
            pConsumerEp->consumerId);
616
    }
617
  }
618
}
619

620
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
443,411✔
621
                           int32_t *remainderVgCnt) {
622
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
443,411✔
623
    return;
×
624
  }
625
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
443,411✔
626
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
443,411✔
627
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
443,411✔
628

629
  // calc num
630
  if (numOfFinal != 0) {
443,411✔
631
    *minVgCnt = totalVgNum / numOfFinal;
297,644✔
632
    *remainderVgCnt = totalVgNum % numOfFinal;
297,644✔
633
  } else {
634
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
145,767✔
635
  }
636
  mInfo(
443,411✔
637
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
638
      "remainderVg:%d",
639
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
640
}
641

642
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
443,411✔
643
  if (pOutput == NULL || pHash == NULL) {
443,411✔
644
    return TSDB_CODE_INVALID_PARA;
×
645
  }
646
  SMqRebOutputVg *pRebVg = NULL;
443,411✔
647
  void *          pAssignIter = NULL;
443,411✔
648
  void *          pIter = NULL;
443,411✔
649
  int32_t         code = 0;
443,411✔
650
  int32_t         lino = 0;
443,411✔
651
  PRINT_LOG_START
443,411✔
652

653
  while (1) {
304,505✔
654
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
747,916✔
655
    if (pIter == NULL) {
747,916✔
656
      break;
443,411✔
657
    }
658
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
304,505✔
659
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
1,020,027✔
660
      pAssignIter = taosHashIterate(pHash, pAssignIter);
715,522✔
661
      if (pAssignIter == NULL) {
715,522✔
662
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
663
        break;
×
664
      }
665

666
      pRebVg = (SMqRebOutputVg *)pAssignIter;
715,522✔
667
      pRebVg->newConsumerId = pConsumerEp->consumerId;
715,522✔
668
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->vgId));
1,431,044✔
669
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->vgId,
715,522✔
670
            pConsumerEp->consumerId);
671
    }
672
  }
673

674
  while (1) {
1,444✔
675
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
444,855✔
676
    if (pIter == NULL) {
444,855✔
677
      break;
145,767✔
678
    }
679
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
299,088✔
680
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
299,088✔
681
      pAssignIter = taosHashIterate(pHash, pAssignIter);
298,742✔
682
      if (pAssignIter == NULL) {
298,742✔
683
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
297,644✔
684
        break;
297,644✔
685
      }
686

687
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,098✔
688
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,098✔
689
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->vgId));
2,196✔
690
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->vgId,
1,098✔
691
            pConsumerEp->consumerId);
692
    }
693
  }
694

695
  if (pAssignIter != NULL) {
443,411✔
696
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
697
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
698
    goto END;
×
699
  }
700
  while (1) {
1,168,188✔
701
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,611,599✔
702
    if (pAssignIter == NULL) {
1,611,599✔
703
      break;
443,411✔
704
    }
705

706
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
1,168,188✔
707
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
2,336,376✔
708
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
1,168,188✔
709
      MND_TMQ_NULL_CHECK(
903,136✔
710
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->vgId));  // put all vg into unassigned
711
    }
712
  }
713

714
END:
443,411✔
715
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
443,411✔
716
  taosHashCancelIterate(pHash, pAssignIter);
443,411✔
717
  PRINT_LOG_END
443,411✔
718
  return code;
443,411✔
719
}
720

721
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
443,411✔
722
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
443,411✔
723
    return TSDB_CODE_INVALID_PARA;
×
724
  }
725
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
443,411✔
726
  if (totalVgNum < 0) {
443,411✔
727
    return totalVgNum;
×
728
  }
729
  const char *pSubKey = pOutput->pSub->key;
443,411✔
730
  int32_t     minVgCnt = 0;
443,411✔
731
  int32_t     remainderVgCnt = 0;
443,411✔
732
  int32_t     code = 0;
443,411✔
733
  int32_t     lino = 0;
443,411✔
734
  PRINT_LOG_START
443,411✔
735
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
443,411✔
736
  MND_TMQ_NULL_CHECK(pHash);
443,411✔
737
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
443,411✔
738
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
443,411✔
739
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
443,411✔
740
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
443,411✔
741
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
443,411✔
742
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
443,411✔
743
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
443,411✔
744
  printRebalanceLog(pOutput);
443,411✔
745

746
END:
443,411✔
747
  taosHashCleanup(pHash);
443,411✔
748
  PRINT_LOG_END
443,411✔
749
  return code;
443,411✔
750
}
751

752
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
1,007,823✔
753
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
1,007,823✔
754
    return TSDB_CODE_INVALID_PARA;
×
755
  }
756
  int32_t code = 0;
1,007,823✔
757
  int32_t lino = 0;
1,007,823✔
758
  PRINT_LOG_START
1,007,823✔
759
  SMqConsumerObj *pConsumerNew = NULL;
1,007,823✔
760
  int32_t         consumerNum = taosArrayGetSize(consumers);
1,007,823✔
761
  for (int32_t i = 0; i < consumerNum; i++) {
1,357,486✔
762
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
349,663✔
763
    MND_TMQ_NULL_CHECK(consumerId);
349,663✔
764
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
349,663✔
765
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
349,663✔
766
    tDeleteSMqConsumerObj(pConsumerNew);
349,663✔
767
  }
768
  pConsumerNew = NULL;
1,007,823✔
769

770
END:
1,007,823✔
771
  PRINT_LOG_END
1,007,823✔
772
  tDeleteSMqConsumerObj(pConsumerNew);
1,007,823✔
773
  return code;
1,007,823✔
774
}
775

776
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
335,941✔
777
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
335,941✔
778
    return TSDB_CODE_INVALID_PARA;
×
779
  }
780
  int32_t code = 0;
335,941✔
781
  int32_t lino = 0;
335,941✔
782
  PRINT_LOG_START
335,941✔
783
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
335,941✔
784
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
335,941✔
785
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
335,941✔
786
END:
335,941✔
787
  PRINT_LOG_END
335,941✔
788
  return code;
335,941✔
789
}
790

791
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
445,903✔
792
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
445,903✔
793
    return TSDB_CODE_INVALID_PARA;
×
794
  }
795
  int32_t code = 0;
445,903✔
796
  int32_t lino = 0;
445,903✔
797
  STrans *pTrans = NULL;
445,903✔
798
  PRINT_LOG_START
445,903✔
799

800
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
445,903✔
801
  char cgroup[TSDB_CGROUP_LEN] = {0};
445,903✔
802
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
445,903✔
803

804
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
445,903✔
805
  if (pTrans == NULL) {
445,903✔
806
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
807
    if (terrno != 0) code = terrno;
×
808
    goto END;
×
809
  }
810

811
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
445,903✔
812
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
445,903✔
813

814
  // 1. redo action: action to all vg
815
  const SArray *rebVgs = pOutput->rebVgs;
338,433✔
816
  int32_t       vgNum = taosArrayGetSize(rebVgs);
338,433✔
817
  for (int32_t i = 0; i < vgNum; i++) {
1,299,017✔
818
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
960,584✔
819
    MND_TMQ_NULL_CHECK(pRebVg);
960,584✔
820
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
960,584✔
821
  }
822

823
  // 2. commit log: subscribe and vg assignment
824
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
338,433✔
825

826
  // 3. commit log: consumer to update status and epoch
827
  if (!pOutput->isReload) {
338,433✔
828
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
335,941✔
829
  }
830

831
  // 4. set cb
832
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
338,433✔
833

834
  // 5. execution
835
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
338,433✔
836

837
END:
445,903✔
838
  mndTransDrop(pTrans);
445,903✔
839
  PRINT_LOG_END
445,903✔
840
  TAOS_RETURN(code);
445,903✔
841
}
842

843
// type = 0 remove  type = 1 add
844
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
668,454✔
845
  if (rebSubHash == NULL || topicList == NULL) {
668,454✔
846
    return TSDB_CODE_INVALID_PARA;
×
847
  }
848
  int32_t code = 0;
668,454✔
849
  int32_t lino = 0;
668,454✔
850
  PRINT_LOG_START
668,454✔
851
  int32_t topicNum = taosArrayGetSize(topicList);
668,454✔
852
  for (int32_t i = 0; i < topicNum; i++) {
1,011,602✔
853
    char *removedTopic = taosArrayGetP(topicList, i);
343,148✔
854
    MND_TMQ_NULL_CHECK(removedTopic);
343,148✔
855
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
343,148✔
856
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
343,148✔
857
    SMqRebInfo *pRebSub = NULL;
343,148✔
858
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
343,148✔
859
    if (type == 0)
343,148✔
860
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
305,256✔
861
    else if (type == 1)
190,520✔
862
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
381,040✔
863
  }
864

865
END:
668,454✔
866
  PRINT_LOG_END
668,454✔
867
  return code;
668,454✔
868
}
869

870
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
1,770,484✔
871
  SMqSubscribeObj *pSub = NULL;
1,770,484✔
872
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,770,484✔
873
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,770,484✔
874
  int32_t code = 0;
1,770,484✔
875
  int32_t lino = 0;
1,770,484✔
876

877
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
1,770,484✔
878
  taosRLockLatch(&pSub->lock);
1,770,484✔
879
  // iterate all vg assigned to the consumer of that topic
880
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,770,484✔
881
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,770,484✔
882
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,770,484✔
883
  for (int32_t j = 0; j < vgNum; j++) {
5,264,406✔
884
    int32_t *pVgId = taosArrayGet(pConsumerEp->vgs, j);
3,493,922✔
885
    if (pVgId == NULL) {
3,493,922✔
886
      continue;
×
887
    }
888
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, *pVgId);
3,493,922✔
889
    if (!pVgroup) {
3,493,922✔
890
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
106,082✔
891
      if (code != 0) {
106,082✔
892
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", *pVgId, tstrerror(code))
×
893
      } else {
894
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", *pVgId);
106,082✔
895
      }
896
    }
897
    mndReleaseVgroup(pMnode, pVgroup);
3,493,922✔
898
  }
899

900
END:
1,770,484✔
901
  if (pSub != NULL) {
1,770,484✔
902
    taosRUnLockLatch(&pSub->lock);
1,770,484✔
903
  }
904
  mndReleaseSubscribe(pMnode, pSub);
1,770,484✔
905
}
1,770,484✔
906

907
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
1,722,167✔
908
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
1,722,167✔
909
    return;
×
910
  }
911
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
1,722,167✔
912
  for (int32_t i = 0; i < newTopicNum; i++) {
3,492,651✔
913
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,770,484✔
914
    if (topic == NULL) {
1,770,484✔
915
      continue;
×
916
    }
917
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
1,770,484✔
918
  }
919
}
920

921
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
2,059,498✔
922
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
4,114,742✔
923
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
2,055,244✔
924
}
925

926
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
2,197,730✔
927
  int32_t code = 0;
2,197,730✔
928
  int32_t lino = 0;
2,197,730✔
929
  PRINT_LOG_START
2,197,730✔
930
  taosRLockLatch(&pConsumer->lock);
2,197,730✔
931

932
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
2,197,730✔
933
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
2,197,730✔
934
  int32_t status = atomic_load_32(&pConsumer->status);
2,197,730✔
935

936
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
2,197,730✔
937
         ", hbstatus:%d, pollStatus:%d",
938
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
939
         hbStatus, pollStatus);
940

941
  if (status == MQ_CONSUMER_STATUS_READY) {
2,197,730✔
942
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,865,837✔
943
      MND_TMQ_RETURN_CHECK(
138,232✔
944
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
945
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
1,727,605✔
946
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
5,438✔
947
            ", hb lost cnt:%d, or long time no poll cnt:%d",
948
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
949
            pConsumer->createTime, hbStatus, pollStatus);
950
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
5,438✔
951
    } else {
952
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
1,722,167✔
953
    }
954
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
331,893✔
955
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
331,508✔
956
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
331,508✔
957
  } else {
958
    MND_TMQ_RETURN_CHECK(
385✔
959
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
960
  }
961

962
END:
385✔
963
  taosRUnLockLatch(&pConsumer->lock);
2,197,730✔
964
  PRINT_LOG_END
2,197,730✔
965
  return code;
2,197,730✔
966
}
967

968
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
18,545,307✔
969
  if (pMsg == NULL || rebSubHash == NULL) {
18,545,307✔
970
    return TSDB_CODE_INVALID_PARA;
×
971
  }
972
  SMnode *        pMnode = pMsg->info.node;
18,545,307✔
973
  SSdb *          pSdb = pMnode->pSdb;
18,545,307✔
974
  SMqConsumerObj *pConsumer = NULL;
18,545,307✔
975
  void *          pIter = NULL;
18,545,307✔
976
  int32_t         code = 0;
18,545,307✔
977
  int32_t         lino = 0;
18,545,307✔
978
  PRINT_LOG_START
18,545,307✔
979

980
  // iterate all consumers, find all modification
981
  while (1) {
982
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
20,743,037✔
983
    if (pIter == NULL) {
20,743,037✔
984
      break;
18,545,307✔
985
    }
986
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
2,197,730✔
987
    mndReleaseConsumer(pMnode, pConsumer);
2,197,730✔
988
  }
989
END:
18,545,307✔
990
  PRINT_LOG_END
18,545,307✔
991
  sdbCancelFetch(pSdb, pIter);
18,545,307✔
992
  mndReleaseConsumer(pMnode, pConsumer);
18,545,307✔
993
  return code;
18,545,307✔
994
}
995

996
bool mndRebTryStart() {
18,552,199✔
997
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
18,552,199✔
998
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
18,552,199✔
999
}
1000

1001
void mndRebCntInc() {
341,833✔
1002
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
341,833✔
1003
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
341,833✔
1004
}
341,833✔
1005

1006
void mndRebCntDec() {
18,889,632✔
1007
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
18,889,632✔
1008
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
18,889,632✔
1009
}
18,889,632✔
1010

1011
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
897,502✔
1012
  if (rebOutput == NULL) {
897,502✔
1013
    return;
443,411✔
1014
  }
1015
  taosArrayDestroy(rebOutput->newConsumers);
454,091✔
1016
  rebOutput->newConsumers = NULL;
454,091✔
1017
  taosArrayDestroy(rebOutput->modifyConsumers);
454,091✔
1018
  rebOutput->modifyConsumers = NULL;
454,091✔
1019
  taosArrayDestroy(rebOutput->removedConsumers);
454,091✔
1020
  rebOutput->removedConsumers = NULL;
454,091✔
1021
  taosArrayDestroy(rebOutput->rebVgs);
454,091✔
1022
  rebOutput->rebVgs = NULL;
454,091✔
1023
  tDeleteSubscribeObj(rebOutput->pSub);
454,091✔
1024
  taosMemoryFree(rebOutput->pSub);
454,091✔
1025
  rebOutput->pSub = NULL;
454,091✔
1026
}
1027

1028
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
443,411✔
1029
  if (rebOutput == NULL) {
443,411✔
1030
    return TSDB_CODE_INVALID_PARA;
×
1031
  }
1032
  int32_t code = 0;
443,411✔
1033
  int32_t lino = 0;
443,411✔
1034
  PRINT_LOG_START
443,411✔
1035
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
443,411✔
1036
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
443,411✔
1037
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
443,411✔
1038
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
443,411✔
1039
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
443,411✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
443,411✔
1041
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
443,411✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
443,411✔
1043
  rebOutput = NULL;
443,411✔
1044

1045
END:
443,411✔
1046
  PRINT_LOG_END
443,411✔
1047
  clearRebOutput(rebOutput);
443,411✔
1048
  return code;
443,411✔
1049
}
1050

1051
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
443,411✔
1052
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
443,411✔
1053
    return TSDB_CODE_INVALID_PARA;
×
1054
  }
1055
  const char *     key = rebInput->pRebInfo->key;
443,411✔
1056
  SMqSubscribeObj *pSub = NULL;
443,411✔
1057
  SMqTopicObj *    pTopic = NULL;
443,411✔
1058
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
443,411✔
1059
  int32_t          lino = 0;
443,411✔
1060
  PRINT_LOG_START
443,411✔
1061

1062
  if (code != 0) {
443,411✔
1063
    // split sub key and extract topic
1064
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
156,617✔
1065
    char cgroup[TSDB_CGROUP_LEN] = {0};
156,617✔
1066
    mndSplitSubscribeKey(key, topic, cgroup, true);
156,617✔
1067
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
156,617✔
1068
    taosRLockLatch(&pTopic->lock);
156,617✔
1069
    rebInput->oldConsumerNum = 0;
156,617✔
1070
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
156,617✔
1071
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
156,617✔
1072
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
156,617✔
1073
  } else {
1074
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
286,794✔
1075
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
286,794✔
1076

1077
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
286,794✔
1078
          taosHashGetSize(rebOutput->pSub->consumerHash));
1079
  }
1080

1081
END:
443,069✔
1082
  PRINT_LOG_END
443,411✔
1083
  if (pTopic != NULL) {
443,411✔
1084
    taosRUnLockLatch(&pTopic->lock);
156,617✔
1085
  }
1086
  mndReleaseTopic(pMnode, pTopic);
443,411✔
1087
  mndReleaseSubscribe(pMnode, pSub);
443,411✔
1088
  return code;
443,411✔
1089
}
1090

1091
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
2,492✔
1092
  int32_t code = 0;
2,492✔
1093
  int32_t lino = 0;
2,492✔
1094

1095
  void *pIterConsumer = NULL;
2,492✔
1096

1097
  PRINT_LOG_START
2,492✔
1098
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
2,492✔
1099
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
2,492✔
1100

1101
  SMqConsumerEp *pConsumerEp = NULL;
2,492✔
1102

1103
  while (1) {
1104
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
4,984✔
1105
    if (pIterConsumer == NULL) break;
4,984✔
1106
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
2,492✔
1107

1108
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
4,984✔
1109
      int32_t *pVgId = taosArrayGet(pConsumerEp->vgs, i);
2,492✔
1110
      MND_TMQ_NULL_CHECK(pVgId);
2,492✔
1111
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
2,492✔
1112
      MND_TMQ_NULL_CHECK(vg);
2,492✔
1113
      vg->vgId = *pVgId;
2,492✔
1114
      vg->oldConsumerId = -1;
2,492✔
1115
      vg->newConsumerId = pConsumerEp->consumerId;
2,492✔
1116
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->vgId,
2,492✔
1117
             vg->newConsumerId);
1118
    }
1119
  }
1120
END:
2,492✔
1121
  PRINT_LOG_END
2,492✔
1122
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
2,492✔
1123
  return code;
2,492✔
1124
}
1125

1126
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
10,680✔
1127
  int32_t code = 0;
10,680✔
1128
  int32_t lino = 0;
10,680✔
1129

1130
  SMnode *        pMnode = pMsg->info.node;
10,680✔
1131
  SMqRebOutputObj rebOutput = {0};
10,680✔
1132

1133
  PRINT_LOG_START
10,680✔
1134
  taosRLockLatch(&pSub->lock);
10,680✔
1135

1136
  // topic and cgroup
1137
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
10,680✔
1138
  char cgroup[TSDB_CGROUP_LEN] = {0};
10,680✔
1139
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
10,680✔
1140
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
10,680✔
1141
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
8,188✔
1142
    goto END;
8,188✔
1143
  }
1144

1145
  rebOutput.pSub = pSub;
2,492✔
1146
  rebOutput.isReload = true;
2,492✔
1147
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
2,492✔
1148
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
2,492✔
1149
  if (code != 0) {
2,492✔
1150
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1151
  }
1152

1153
END:
10,680✔
1154
  taosRUnLockLatch(&pSub->lock);
10,680✔
1155
  rebOutput.pSub = NULL;  // avoid double free
10,680✔
1156
  clearRebOutput(&rebOutput);
10,680✔
1157
  PRINT_LOG_END
10,680✔
1158
  return code;
10,680✔
1159
}
1160

1161
static int32_t reloadRebalance(SRpcMsg *pMsg) {
2,492✔
1162
  SMnode *pMnode = pMsg->info.node;
2,492✔
1163

1164
  SSdb *           pSdb = pMnode->pSdb;
2,492✔
1165
  SMqSubscribeObj *pSub = NULL;
2,492✔
1166
  int32_t          code = 0;
2,492✔
1167
  int32_t          lino = 0;
2,492✔
1168

1169
  PRINT_LOG_START
2,492✔
1170
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
2,492✔
1171
  void *pIter = NULL;
2,492✔
1172
  while (1) {
1173
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
13,172✔
1174
    if (pIter == NULL) {
13,172✔
1175
      break;
2,492✔
1176
    }
1177

1178
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
10,680✔
1179
    sdbRelease(pSdb, pSub);
10,680✔
1180
  }
1181
  taosHashClear(topicsToReload);
2,492✔
1182
END:
2,492✔
1183
  sdbCancelFetch(pSdb, pIter);
2,492✔
1184
  sdbRelease(pSdb, pSub);
2,492✔
1185
  PRINT_LOG_END
2,492✔
1186

1187
  return code;
2,492✔
1188
}
1189

1190
static int32_t normalRebalance(SRpcMsg *pMsg) {
18,545,307✔
1191
  int     code = 0;
18,545,307✔
1192
  int32_t lino = 0;
18,545,307✔
1193

1194
  void *  pIter = NULL;
18,545,307✔
1195
  SMnode *pMnode = pMsg->info.node;
18,545,307✔
1196

1197
  PRINT_LOG_START
18,545,307✔
1198
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
18,545,307✔
1199
  MND_TMQ_NULL_CHECK(rebSubHash);
18,545,307✔
1200

1201
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
18,545,307✔
1202

1203
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
18,545,307✔
1204
  if (taosHashGetSize(rebSubHash) > 0) {
18,545,307✔
1205
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
392,839✔
1206
  }
1207

1208
  while (1) {
443,411✔
1209
    pIter = taosHashIterate(rebSubHash, pIter);
18,988,718✔
1210
    if (pIter == NULL) {
18,988,718✔
1211
      break;
18,545,307✔
1212
    }
1213

1214
    SMqRebInputObj  rebInput = {0};
443,411✔
1215
    SMqRebOutputObj rebOutput = {0};
443,411✔
1216
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
443,411✔
1217
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
443,411✔
1218
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
443,411✔
1219
    if (code != 0) {
443,411✔
1220
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1221
    }
1222

1223
    if (code == 0) {
443,411✔
1224
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
443,411✔
1225
      if (code != 0) {
443,411✔
1226
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1227
      }
1228
    }
1229

1230
    if (code == 0) {
443,411✔
1231
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
443,411✔
1232
      if (code != 0) {
443,411✔
1233
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
107,470✔
1234
      }
1235
    }
1236

1237
    clearRebOutput(&rebOutput);
443,411✔
1238
  }
1239

1240
  if (taosHashGetSize(rebSubHash) > 0) {
18,545,307✔
1241
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
392,839✔
1242
  }
1243

1244
END:
18,152,468✔
1245
  PRINT_LOG_END
18,545,307✔
1246
  taosHashCancelIterate(rebSubHash, pIter);
18,545,307✔
1247
  taosHashCleanup(rebSubHash);
18,545,307✔
1248
  return code;
18,545,307✔
1249
}
1250

1251
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
18,552,199✔
1252
  if (pMsg == NULL) {
18,552,199✔
1253
    return TSDB_CODE_INVALID_PARA;
×
1254
  }
1255
  int     code = 0;
18,552,199✔
1256
  int32_t lino = 0;
18,552,199✔
1257

1258
  void *  pIter = NULL;
18,552,199✔
1259
  SMnode *pMnode = pMsg->info.node;
18,552,199✔
1260
  PRINT_LOG_START;
18,552,199✔
1261
  if (!mndRebTryStart()) {
18,552,199✔
1262
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
4,400✔
1263
    goto END;
4,400✔
1264
  }
1265

1266
  if (taosHashGetSize(topicsToReload) > 0) {
18,547,799✔
1267
    code = reloadRebalance(pMsg);
2,492✔
1268
  } else {
1269
    code = normalRebalance(pMsg);
18,545,307✔
1270
  }
1271

1272
  mndRebCntDec();
18,547,799✔
1273

1274
END:
18,552,199✔
1275
  PRINT_LOG_END
18,552,199✔
1276
  TAOS_RETURN(code);
18,552,199✔
1277
}
1278

1279
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
94,236✔
1280
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
94,236✔
1281
    return TSDB_CODE_INVALID_PARA;
×
1282
  }
1283
  void *  pIter = NULL;
94,236✔
1284
  SVgObj *pVgObj = NULL;
94,236✔
1285
  int32_t code = 0;
94,236✔
1286
  int32_t lino = 0;
94,236✔
1287
  PRINT_LOG_START
94,236✔
1288
  while (1) {
660,685✔
1289
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
754,921✔
1290
    if (pIter == NULL) {
754,921✔
1291
      break;
94,236✔
1292
    }
1293
    if (pVgObj->mountVgId) {
660,685✔
1294
      sdbRelease(pMnode->pSdb, pVgObj);
×
1295
      continue;
×
1296
    }
1297

1298
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
660,685✔
1299
      sdbRelease(pMnode->pSdb, pVgObj);
355,784✔
1300
      continue;
355,784✔
1301
    }
1302
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
304,901✔
1303
    MND_TMQ_NULL_CHECK(pReq);
304,901✔
1304
    pReq->head.vgId = htonl(pVgObj->vgId);
304,901✔
1305
    pReq->vgId = pVgObj->vgId;
304,901✔
1306
    pReq->consumerId = -1;
304,901✔
1307
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
304,901✔
1308

1309
    STransAction action = {0};
304,901✔
1310
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
304,901✔
1311
    action.pCont = pReq;
304,901✔
1312
    action.contLen = sizeof(SMqVDeleteReq);
304,901✔
1313
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
304,901✔
1314
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
304,901✔
1315

1316
    sdbRelease(pMnode->pSdb, pVgObj);
304,901✔
1317
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
304,901✔
1318
  }
1319

1320
END:
94,236✔
1321
  PRINT_LOG_END
94,236✔
1322
  sdbRelease(pMnode->pSdb, pVgObj);
94,236✔
1323
  sdbCancelFetch(pMnode->pSdb, pIter);
94,236✔
1324
  return code;
94,236✔
1325
}
1326

1327
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
711✔
1328
                                   char *cgroup) {
1329
  int32_t         code = 0;
711✔
1330
  int32_t         lino = 0;
711✔
1331
  SMqConsumerObj *pConsumerNew = NULL;
711✔
1332

1333
  taosRLockLatch(&pConsumer->lock);
711✔
1334

1335
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
711✔
1336
    goto END;
×
1337
  }
1338

1339
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
711✔
1340
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
711✔
1341
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
711✔
1342
  if (found1 || found2 || found3) {
711✔
1343
    if (deleteConsumer) {
385✔
1344
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
385✔
1345
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
385✔
1346
      tDeleteSMqConsumerObj(pConsumerNew);
385✔
1347
      pConsumerNew = NULL;
385✔
1348
    } else {
1349
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1350
             pConsumer->consumerId, pConsumer->cgroup);
1351
      code = TSDB_CODE_MND_CGROUP_USED;
×
1352
      goto END;
×
1353
    }
1354
  }
1355

1356
END:
711✔
1357
  tDeleteSMqConsumerObj(pConsumerNew);
711✔
1358
  taosRUnLockLatch(&pConsumer->lock);
711✔
1359
  return code;
711✔
1360
}
1361

1362
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
711✔
1363
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
711✔
1364
    return TSDB_CODE_INVALID_PARA;
×
1365
  }
1366
  void *          pIter = NULL;
711✔
1367
  SMqConsumerObj *pConsumer = NULL;
711✔
1368
  int             code = 0;
711✔
1369
  int32_t         lino = 0;
711✔
1370
  PRINT_LOG_START
711✔
1371
  while (1) {
1372
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,422✔
1373
    if (pIter == NULL) {
1,422✔
1374
      break;
711✔
1375
    }
1376
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
711✔
1377
    sdbRelease(pMnode->pSdb, pConsumer);
711✔
1378
  }
1379

1380
END:
711✔
1381
  sdbRelease(pMnode->pSdb, pConsumer);
711✔
1382
  sdbCancelFetch(pMnode->pSdb, pIter);
711✔
1383
  return code;
711✔
1384
}
1385

1386
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
711✔
1387
  int32_t code = 0;
711✔
1388
  int32_t lino = 0;
711✔
1389
  STrans *pTrans = NULL;
711✔
1390
  SMnode *pMnode = pMsg->info.node;
711✔
1391
  PRINT_LOG_START
711✔
1392
  taosRLockLatch(&pSub->lock);
711✔
1393
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
711✔
1394
    code = TSDB_CODE_MND_CGROUP_USED;
×
1395
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1396
    goto END;
×
1397
  }
1398

1399
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
711✔
1400
  MND_TMQ_NULL_CHECK(pTrans);
711✔
1401
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
711✔
1402
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
711✔
1403
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
711✔
1404
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
711✔
1405
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
711✔
1406
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
711✔
1407
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
711✔
1408

1409
END:
711✔
1410
  taosRUnLockLatch(&pSub->lock);
711✔
1411
  mndTransDrop(pTrans);
711✔
1412
  PRINT_LOG_END
711✔
1413
  return code;
711✔
1414
}
1415

1416
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
711✔
1417
  if (pMsg == NULL) {
711✔
1418
    return TSDB_CODE_INVALID_PARA;
×
1419
  }
1420
  SMnode *        pMnode = pMsg->info.node;
711✔
1421
  SMDropCgroupReq dropReq = {0};
711✔
1422
  int32_t         code = 0;
711✔
1423
  int32_t         lino = 0;
711✔
1424

1425
  SMqSubscribeObj *pSub = NULL;
711✔
1426

1427
  PRINT_LOG_START
711✔
1428
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
711✔
1429
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
711✔
1430
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
711✔
1431
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
711✔
1432
  if (code != 0) {
711✔
1433
    if (dropReq.igNotExists) {
×
1434
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1435
      mndReleaseSubscribe(pMnode, pSub);
×
1436
      return 0;
×
1437
    } else {
1438
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1439
      goto END;
×
1440
    }
1441
  }
1442
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
711✔
1443

1444
END:
711✔
1445
  mndReleaseSubscribe(pMnode, pSub);
711✔
1446
  PRINT_LOG_END
711✔
1447

1448
  if (code != 0) {
711✔
1449
    TAOS_RETURN(code);
×
1450
  }
1451
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
711✔
1452
}
1453

1454
void mndCleanupSubscribe(SMnode *pMnode) {}
531,971✔
1455

1456
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
556,214✔
1457
  if (pSub == NULL) {
556,214✔
1458
    return NULL;
×
1459
  }
1460
  int32_t code = 0;
556,214✔
1461
  int32_t lino = 0;
556,214✔
1462
  terrno = TSDB_CODE_OUT_OF_MEMORY;
556,214✔
1463
  void *  buf = NULL;
556,214✔
1464
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
556,214✔
1465
  if (tlen <= 0) goto SUB_ENCODE_OVER;
556,214✔
1466
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
556,214✔
1467

1468
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
556,214✔
1469
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
556,214✔
1470

1471
  buf = taosMemoryMalloc(tlen);
556,214✔
1472
  if (buf == NULL) goto SUB_ENCODE_OVER;
556,214✔
1473

1474
  void *abuf = buf;
556,214✔
1475
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
556,214✔
1476
    goto SUB_ENCODE_OVER;
×
1477
  }
1478

1479
  int32_t dataPos = 0;
556,214✔
1480
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
556,214✔
1481
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
556,214✔
1482
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
556,214✔
1483
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
556,214✔
1484

1485
  terrno = TSDB_CODE_SUCCESS;
556,214✔
1486

1487
SUB_ENCODE_OVER:
556,214✔
1488
  taosMemoryFreeClear(buf);
556,214✔
1489
  if (terrno != TSDB_CODE_SUCCESS) {
556,214✔
1490
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1491
    sdbFreeRaw(pRaw);
×
1492
    return NULL;
×
1493
  }
1494

1495
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
556,214✔
1496
  return pRaw;
556,214✔
1497
}
1498

1499
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
450,679✔
1500
  if (pRaw == NULL) {
450,679✔
1501
    return NULL;
×
1502
  }
1503
  int32_t code = 0;
450,679✔
1504
  int32_t lino = 0;
450,679✔
1505
  terrno = TSDB_CODE_OUT_OF_MEMORY;
450,679✔
1506
  SSdbRow *        pRow = NULL;
450,679✔
1507
  SMqSubscribeObj *pSub = NULL;
450,679✔
1508
  void *           buf = NULL;
450,679✔
1509

1510
  int8_t sver = 0;
450,679✔
1511
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
450,679✔
1512

1513
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
450,679✔
1514
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1515
    goto SUB_DECODE_OVER;
×
1516
  }
1517

1518
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
450,679✔
1519
  if (pRow == NULL) goto SUB_DECODE_OVER;
450,679✔
1520

1521
  pSub = sdbGetRowObj(pRow);
450,679✔
1522
  if (pSub == NULL) goto SUB_DECODE_OVER;
450,679✔
1523

1524
  int32_t dataPos = 0;
450,679✔
1525
  int32_t tlen;
450,337✔
1526
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
450,679✔
1527
  buf = taosMemoryMalloc(tlen);
450,679✔
1528
  if (buf == NULL) goto SUB_DECODE_OVER;
450,679✔
1529
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
450,679✔
1530
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
450,679✔
1531

1532
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
450,679✔
1533
    goto SUB_DECODE_OVER;
×
1534
  }
1535

1536
  terrno = TSDB_CODE_SUCCESS;
450,679✔
1537

1538
SUB_DECODE_OVER:
450,679✔
1539
  taosMemoryFreeClear(buf);
450,679✔
1540
  if (terrno != TSDB_CODE_SUCCESS) {
450,679✔
1541
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1542
    taosMemoryFreeClear(pRow);
×
1543
    return NULL;
×
1544
  }
1545

1546
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
450,679✔
1547
  return pRow;
450,679✔
1548
}
1549

1550
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
172,265✔
1551
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
172,265✔
1552
  return 0;
172,265✔
1553
}
1554

1555
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
450,679✔
1556
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
450,679✔
1557
  tDeleteSubscribeObj(pSub);
450,679✔
1558
  return 0;
450,679✔
1559
}
1560

1561
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
183,170✔
1562
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
183,170✔
1563
  mDebug("subscribe:%s, perform update action", pOldSub->key);
183,170✔
1564

1565
  taosWLockLatch(&pOldSub->lock);
183,170✔
1566
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
183,170✔
1567
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
183,170✔
1568
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
183,170✔
1569
  taosWUnLockLatch(&pOldSub->lock);
183,170✔
1570

1571
  return 0;
183,170✔
1572
}
1573

1574
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
20,601,079✔
1575
  if (pMnode == NULL || key == NULL || pSub == NULL) {
20,601,079✔
1576
    return TSDB_CODE_INVALID_PARA;
×
1577
  }
1578
  SSdb *pSdb = pMnode->pSdb;
20,601,079✔
1579
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
20,601,079✔
1580
  if (*pSub == NULL) {
20,601,079✔
1581
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
313,234✔
1582
  }
1583
  return 0;
20,287,845✔
1584
}
1585

1586
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
202,775✔
1587
  if (pMnode == NULL || topicName == NULL) return 0;
202,775✔
1588
  int32_t num = 0;
202,775✔
1589
  SSdb *  pSdb = pMnode->pSdb;
202,775✔
1590

1591
  void *           pIter = NULL;
202,775✔
1592
  SMqSubscribeObj *pSub = NULL;
202,775✔
1593
  while (1) {
278,060✔
1594
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
480,835✔
1595
    if (pIter == NULL) break;
480,835✔
1596

1597
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
278,060✔
1598
    char cgroup[TSDB_CGROUP_LEN] = {0};
278,060✔
1599
    taosRLockLatch(&pSub->lock);
278,060✔
1600
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
278,060✔
1601
    taosRUnLockLatch(&pSub->lock);
278,060✔
1602
    if (strcmp(topic, topicName) != 0) {
278,060✔
1603
      sdbRelease(pSdb, pSub);
154,223✔
1604
      continue;
154,223✔
1605
    }
1606

1607
    num++;
123,837✔
1608
    sdbRelease(pSdb, pSub);
123,837✔
1609
  }
1610

1611
  return num;
202,775✔
1612
}
1613

1614
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
20,444,462✔
1615
  if (pMnode == NULL || pSub == NULL) return;
20,444,462✔
1616
  SSdb *pSdb = pMnode->pSdb;
20,287,845✔
1617
  sdbRelease(pSdb, pSub);
20,287,845✔
1618
}
1619

1620
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
94,236✔
1621
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
94,236✔
1622
  int32_t code = 0;
94,236✔
1623
  int32_t lino = 0;
94,236✔
1624
  PRINT_LOG_START
94,236✔
1625
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
94,236✔
1626
  MND_TMQ_NULL_CHECK(pCommitRaw);
94,236✔
1627
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
94,236✔
1628
  if (code != 0) {
94,236✔
1629
    sdbFreeRaw(pCommitRaw);
×
1630
    goto END;
×
1631
  }
1632
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
94,236✔
1633
END:
94,236✔
1634
  PRINT_LOG_END
94,236✔
1635
  return code;
94,236✔
1636
}
1637

1638
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
179,278✔
1639
  int32_t code = 0;
179,278✔
1640
  int32_t lino = 0;
179,278✔
1641
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
179,278✔
1642
  char    cgroup[TSDB_CGROUP_LEN] = {0};
179,278✔
1643
  taosRLockLatch(&pSub->lock);
179,278✔
1644
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
179,278✔
1645
  if (strcmp(topic, topicName) != 0) {
179,278✔
1646
    goto END;
85,753✔
1647
  }
1648

1649
  // iter all vnode to delete handle
1650
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
93,525✔
1651
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1652
    goto END;
×
1653
  }
1654

1655
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
93,525✔
1656
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
93,525✔
1657

1658
END:
179,278✔
1659
  taosRUnLockLatch(&pSub->lock);
179,278✔
1660

1661
  return code;
179,278✔
1662
}
1663

1664
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
122,708✔
1665
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
122,708✔
1666
  SSdb *           pSdb = pMnode->pSdb;
122,708✔
1667
  int32_t          code = 0;
122,708✔
1668
  int32_t          lino = 0;
122,708✔
1669
  void *           pIter = NULL;
122,708✔
1670
  SMqSubscribeObj *pSub = NULL;
122,708✔
1671
  PRINT_LOG_START
122,708✔
1672
  while (1) {
1673
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
301,986✔
1674
    if (pIter == NULL) break;
301,986✔
1675

1676
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
179,278✔
1677
    sdbRelease(pSdb, pSub);
179,278✔
1678
  }
1679

1680
END:
122,708✔
1681
  PRINT_LOG_END
122,708✔
1682
  sdbRelease(pSdb, pSub);
122,708✔
1683
  sdbCancelFetch(pSdb, pIter);
122,708✔
1684

1685
  TAOS_RETURN(code);
122,708✔
1686
}
1687

1688
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
414,508✔
1689
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1690
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
414,508✔
1691
    return TSDB_CODE_INVALID_PARA;
×
1692
  }
1693
  int32_t code = 0;
414,508✔
1694
  int32_t lino = 0;
414,508✔
1695
  PRINT_LOG_START
414,508✔
1696
  int32_t sz = taosArrayGetSize(vgs);
414,508✔
1697
  for (int32_t j = 0; j < sz; j++) {
689,653✔
1698
    int32_t *pVgId = taosArrayGet(vgs, j);
275,145✔
1699
    MND_TMQ_NULL_CHECK(pVgId);
275,145✔
1700

1701
    SColumnInfoData *pColInfo = NULL;
275,145✔
1702
    int32_t          cols = 0;
275,145✔
1703

1704
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1705
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1706
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
275,145✔
1707

1708
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1709
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1710
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
275,145✔
1711

1712
    // vg id
1713
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1714
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1715
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)pVgId, false));
275,145✔
1716

1717
    // consumer id
1718
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
275,145✔
1719
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
275,145✔
1720
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
275,145✔
1721

1722
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1723
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1724
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
275,145✔
1725

1726
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
275,145✔
1727
    if (user) STR_TO_VARSTR(userStr, user);
275,145✔
1728
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1729
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1730
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
275,145✔
1731

1732
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
275,145✔
1733
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
275,145✔
1734
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
275,145✔
1735
    MND_TMQ_NULL_CHECK(pColInfo);
275,145✔
1736
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
275,145✔
1737

1738
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
275,145✔
1739
          varDataVal(cgroup), *pVgId);
1740

1741
    // offset
1742
    OffsetRows *data = NULL;
275,145✔
1743
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
575,081✔
1744
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
299,936✔
1745
      MND_TMQ_NULL_CHECK(tmp);
299,936✔
1746
      if (tmp->vgId != *pVgId) {
299,936✔
1747
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, *pVgId);
1748
        continue;
201,668✔
1749
      }
1750
      data = tmp;
98,268✔
1751
    }
1752
    if (data) {
275,145✔
1753
      // vg id
1754
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
98,268✔
1755
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
98,268✔
1756
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
196,536✔
1757
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
98,268✔
1758
      varDataSetLen(buf, strlen(varDataVal(buf)));
98,268✔
1759
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
98,268✔
1760
      MND_TMQ_NULL_CHECK(pColInfo);
98,268✔
1761
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
98,268✔
1762
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
98,268✔
1763
      MND_TMQ_NULL_CHECK(pColInfo);
98,268✔
1764
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
98,268✔
1765
    } else {
1766
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
176,877✔
1767
      MND_TMQ_NULL_CHECK(pColInfo);
176,877✔
1768
      colDataSetNULL(pColInfo, *numOfRows);
176,877✔
1769
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
176,877✔
1770
      MND_TMQ_NULL_CHECK(pColInfo);
176,877✔
1771
      colDataSetNULL(pColInfo, *numOfRows);
176,877✔
1772
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", *pVgId);
176,877✔
1773
    }
1774
    (*numOfRows)++;
275,145✔
1775
  }
1776

1777
END:
414,508✔
1778
  PRINT_LOG_END
414,508✔
1779
  return code;
414,508✔
1780
}
1781

1782
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOperUser, bool showAll, SSDataBlock *pBlock,
218,561✔
1783
                           int32_t *numOfRows, int32_t rowsCapacity) {
1784
  int32_t        code = 0;
218,561✔
1785
  int32_t        lino = 0;
218,561✔
1786
  SMnode        *pMnode = pReq->info.node;
218,561✔
1787
  SSdb          *pSdb = pMnode->pSdb;
218,561✔
1788
  SMqConsumerEp *pConsumerEp = NULL;
218,561✔
1789
  SMqTopicObj   *pTopic = NULL;
218,561✔
1790
  void          *pIter = NULL;
218,561✔
1791
  bool           showTopic = false;
218,561✔
1792
  PRINT_LOG_START
218,561✔
1793

1794
  taosRLockLatch(&pSub->lock);
218,561✔
1795
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
218,561✔
1796
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1797
  }
1798

1799
  // topic and cgroup
1800
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
218,561✔
1801
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
218,561✔
1802
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
218,561✔
1803
  varDataSetLen(topic, strlen(varDataVal(topic)));
218,561✔
1804
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
218,561✔
1805

1806
  if (!showAll) {
218,561✔
1807
    char topicFName[TSDB_TOPIC_FNAME_LEN + 1] = {0};
338✔
1808
    (void)snprintf(topicFName, sizeof(topicFName), "%d.%s", pOperUser->acctId, varDataVal(topic));
338✔
1809
    (void)mndAcquireTopic(pMnode, topicFName, &pTopic);
338✔
1810
    if (pTopic) {
338✔
1811
      SName name = {0};  // 1.topic1
338✔
1812
      if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
338✔
1813
        if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_SUBSCRIPTION_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
338✔
1814
                                          pTopic->db, name.dbname)) {
338✔
1815
          showTopic = true;
169✔
1816
        }
1817
      }
1818
    }
1819
  }
1820

1821
  while (1) {
196,116✔
1822
    pIter = taosHashIterate(pSub->consumerHash, pIter);
414,677✔
1823
    if (pIter == NULL) break;
414,677✔
1824
    pConsumerEp = (SMqConsumerEp *)pIter;
196,116✔
1825

1826
    char           *user = NULL;
196,116✔
1827
    char           *fqdn = NULL;
196,116✔
1828
    bool            subscribeOwner = false;
196,116✔
1829
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
196,116✔
1830
    if (pConsumer != NULL) {
196,116✔
1831
      user = pConsumer->user;
196,116✔
1832
      fqdn = pConsumer->fqdn;
196,116✔
1833
      if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
196,116✔
1834
        subscribeOwner = true;
195,778✔
1835
      }
1836
      sdbRelease(pSdb, pConsumer);
196,116✔
1837
    }
1838
    if (!showAll && !showTopic && !subscribeOwner) {
196,116✔
1839
      continue;
169✔
1840
    }
1841
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
195,947✔
1842
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1843
  }
1844

1845
  MND_TMQ_RETURN_CHECK(
218,561✔
1846
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1847

1848
  pBlock->info.rows = *numOfRows;
218,561✔
1849

1850
END:
218,561✔
1851
  mndReleaseTopic(pMnode, pTopic);
218,561✔
1852
  taosRUnLockLatch(&pSub->lock);
218,561✔
1853
  taosHashCancelIterate(pSub->consumerHash, pIter);
218,561✔
1854
  PRINT_LOG_END
218,561✔
1855
  return code;
218,561✔
1856
}
1857

1858
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
29,727✔
1859
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
29,727✔
1860
    return TSDB_CODE_INVALID_PARA;
×
1861
  }
1862
  SMnode          *pMnode = pReq->info.node;
29,727✔
1863
  SSdb            *pSdb = pMnode->pSdb;
29,727✔
1864
  int32_t          numOfRows = 0;
29,727✔
1865
  SMqSubscribeObj *pSub = NULL;
29,727✔
1866
  SUserObj        *pOperUser = NULL;
29,727✔
1867
  int32_t          code = 0;
29,727✔
1868
  int32_t          lino = 0;
29,727✔
1869
  bool             showAll = false;
29,727✔
1870
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
29,727✔
1871

1872
  mInfo("mnd show subscriptions begin");
29,727✔
1873
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
29,727✔
1874
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
29,727✔
1875
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_SUBSCRIPTION_SHOW,
29,727✔
1876
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1877

1878
  while (numOfRows < rowsCapacity) {
248,288✔
1879
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
248,288✔
1880
    if (pShow->pIter == NULL) {
248,288✔
1881
      break;
29,727✔
1882
    }
1883

1884
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pOperUser, showAll, pBlock, &numOfRows, rowsCapacity));
218,561✔
1885

1886
    sdbRelease(pSdb, pSub);
218,561✔
1887
    pSub = NULL;
218,561✔
1888
  }
1889
  mInfo("mnd end show subscriptions");
29,727✔
1890
  pShow->numOfRows += numOfRows;
29,727✔
1891

1892
END:
29,727✔
1893
  sdbCancelFetch(pSdb, pShow->pIter);
29,727✔
1894
  sdbRelease(pSdb, pSub);
29,727✔
1895
  mndReleaseUser(pMnode, pOperUser);
29,727✔
1896

1897
  if (code != 0) {
29,727✔
1898
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1899
    TAOS_RETURN(code);
×
1900
  } else {
1901
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
29,727✔
1902
    return numOfRows;
29,727✔
1903
  }
1904
}
1905

1906
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1907
  if (pMnode == NULL) {
×
1908
    return;
×
1909
  }
1910
  SSdb *pSdb = pMnode->pSdb;
×
1911
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1912
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc