• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4936

23 Jan 2026 09:40AM UTC coverage: 66.746% (+0.04%) from 66.708%
#4936

push

travis-ci

web-flow
fix: case failuer caused by the modification of the error description (#34391)

204023 of 305671 relevant lines covered (66.75%)

124768167.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.89
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
255,731✔
45
  if (pTrans == NULL || pSub == NULL) {
255,731✔
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t code = 0;
255,731✔
49
  int32_t lino = 0;
255,731✔
50
  PRINT_LOG_START
255,731✔
51
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
255,731✔
52
  MND_TMQ_NULL_CHECK(pCommitRaw);
255,731✔
53
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
255,731✔
54
  if (code != 0) {
255,731✔
55
    sdbFreeRaw(pCommitRaw);
×
56
    goto END;
×
57
  }
58
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
255,731✔
59

60
END:
255,731✔
61
  PRINT_LOG_END
255,731✔
62
  return code;
255,731✔
63
}
64

65
int32_t mndInitSubscribe(SMnode *pMnode) {
397,560✔
66
  SSdbTable table = {
397,560✔
67
      .sdbType = SDB_SUBSCRIBE,
68
      .keyType = SDB_KEY_BINARY,
69
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
70
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
71
      .insertFp = (SdbInsertFp)mndSubActionInsert,
72
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
73
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
74
  };
75

76
  if (pMnode == NULL) {
397,560✔
77
    return TSDB_CODE_INVALID_PARA;
×
78
  }
79
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
397,560✔
80
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
397,560✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
397,560✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
397,560✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
397,560✔
84

85
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
397,560✔
86
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
397,560✔
87

88
  return sdbSetTable(pMnode->pSdb, table);
397,560✔
89
}
90

91
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
116,782✔
92
  int32_t code = 0;
116,782✔
93
  SSdb *  pSdb = pMnode->pSdb;
116,782✔
94
  SVgObj *pVgroup = NULL;
116,782✔
95

96
  void *pIter = NULL;
116,782✔
97
  while (1) {
682,545✔
98
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
799,327✔
99
    if (pIter == NULL) {
799,327✔
100
      break;
116,782✔
101
    }
102

103
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
682,545✔
104
      sdbRelease(pSdb, pVgroup);
383,129✔
105
      continue;
383,129✔
106
    }
107

108
    pSub->vgNum++;
299,416✔
109

110
    SMqVgEp pVgEp = {0};
299,416✔
111
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
299,416✔
112
    pVgEp.vgId = pVgroup->vgId;
299,416✔
113
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
598,832✔
114
      code = terrno;
×
115
      sdbRelease(pSdb, pVgroup);
×
116
      sdbCancelFetch(pSdb, pIter);
×
117
      goto END;
×
118
    }
119
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
299,416✔
120
    sdbRelease(pSdb, pVgroup);
299,416✔
121
  }
122

123
END:
116,782✔
124
  return code;
116,782✔
125
}
126

127
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
116,782✔
128
                                     SMqSubscribeObj **pSub) {
129
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
116,782✔
130
    return TSDB_CODE_INVALID_PARA;
×
131
  }
132
  int32_t lino = 0;
116,782✔
133
  int32_t code = 0;
116,782✔
134
  PRINT_LOG_START
116,782✔
135
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
116,782✔
136
  (*pSub)->dbUid = pTopic->dbUid;
116,782✔
137
  (*pSub)->stbUid = pTopic->stbUid;
116,782✔
138
  (*pSub)->subType = pTopic->subType;
116,782✔
139
  (*pSub)->withMeta = pTopic->withMeta;
116,782✔
140

141
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
116,782✔
142

143
END:
116,782✔
144
  PRINT_LOG_END
116,782✔
145
  return code;
116,782✔
146
}
147

148
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,663,066✔
149
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,663,066✔
150
    return;
×
151
  }
152
  int32_t i = 0;
1,663,066✔
153
  while (key[i] != TMQ_SEPARATOR_CHAR) {
11,825,598✔
154
    i++;
10,162,532✔
155
  }
156
  (void)memcpy(cgroup, key, i);
1,663,066✔
157
  cgroup[i] = 0;
1,663,066✔
158
  if (fullName) {
1,663,066✔
159
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,552,179✔
160
  } else {
161
    while (key[i] != '.') {
332,661✔
162
      i++;
221,774✔
163
    }
164
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
110,887✔
165
  }
166
}
167

168
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
780,022✔
169
                                    const SMqRebOutputVg *pRebVg) {
170
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
780,022✔
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  SMqRebVgReq  req = {0};
780,022✔
174
  int32_t      code = 0;
780,022✔
175
  int32_t      lino = 0;
780,022✔
176
  SEncoder     encoder = {0};
780,022✔
177
  SMqTopicObj *pTopic = NULL;
780,022✔
178
  void *       buf = NULL;
780,022✔
179

180
  PRINT_LOG_START
780,022✔
181
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
780,022✔
182
  char cgroup[TSDB_CGROUP_LEN] = {0};
780,022✔
183
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
780,022✔
184
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
780,022✔
185
  taosRLockLatch(&pTopic->lock);
780,022✔
186
  req.oldConsumerId = pRebVg->oldConsumerId;
780,022✔
187
  req.newConsumerId = pRebVg->newConsumerId;
780,022✔
188
  req.vgId = pRebVg->pVgEp.vgId;
780,022✔
189
  req.qmsg = pTopic->physicalPlan;
780,022✔
190
  req.schema = pTopic->schema;
780,022✔
191
  req.subType = pSub->subType;
780,022✔
192
  req.withMeta = pSub->withMeta;
780,022✔
193
  req.suid = pSub->stbUid;
780,022✔
194
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
780,022✔
195

196
  int32_t tlen = 0;
780,022✔
197
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
780,022✔
198
  if (code < 0) {
780,022✔
199
    goto END;
×
200
  }
201

202
  tlen += sizeof(SMsgHead);
780,022✔
203
  buf = taosMemoryMalloc(tlen);
780,022✔
204
  MND_TMQ_NULL_CHECK(buf);
780,022✔
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
780,022✔
206
  pMsgHead->contLen = htonl(tlen);
780,022✔
207
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
780,022✔
208

209
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
780,022✔
210
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
780,022✔
211
  *pBuf = buf;
780,022✔
212
  buf = NULL;
780,022✔
213
  *pLen = tlen;
780,022✔
214

215
END:
780,022✔
216
  PRINT_LOG_END
780,022✔
217
  taosMemoryFree(buf);
780,022✔
218
  if (pTopic != NULL) {
780,022✔
219
    taosRUnLockLatch(&pTopic->lock);
780,022✔
220
  }
221
  mndReleaseTopic(pMnode, pTopic);
780,022✔
222
  tEncoderClear(&encoder);
780,022✔
223
  return code;
780,022✔
224
}
225

226
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
780,022✔
227
                                        const SMqRebOutputVg *pRebVg) {
228
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
780,022✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t code = 0;
780,022✔
232
  int32_t lino = 0;
780,022✔
233
  void *  buf = NULL;
780,022✔
234
  PRINT_LOG_START
780,022✔
235
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
780,022✔
236
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
237
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
238
    goto END;
×
239
  }
240

241
  int32_t tlen = 0;
780,022✔
242
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
780,022✔
243
  int32_t vgId = pRebVg->pVgEp.vgId;
780,022✔
244
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
780,022✔
245
  if (pVgObj == NULL) {
780,022✔
246
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
247
    goto END;
×
248
  }
249

250
  STransAction action = {0};
780,022✔
251
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
780,022✔
252
  action.pCont = buf;
780,022✔
253
  buf = NULL;
780,022✔
254
  action.contLen = tlen;
780,022✔
255
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
780,022✔
256

257
  mndReleaseVgroup(pMnode, pVgObj);
780,022✔
258
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
780,022✔
259

260
END:
780,022✔
261
  PRINT_LOG_END
780,022✔
262
  taosMemoryFree(buf);
780,022✔
263
  return code;
780,022✔
264
}
265

266
static void freeRebalanceItem(void *param) {
322,287✔
267
  if (param == NULL) return;
322,287✔
268
  SMqRebInfo *pInfo = param;
322,287✔
269
  taosArrayDestroy(pInfo->newConsumers);
322,287✔
270
  taosArrayDestroy(pInfo->removedConsumers);
322,287✔
271
}
272

273
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
327,868✔
274
  if (pHash == NULL || key == NULL) {
327,868✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t code = 0;
327,868✔
278
  int32_t lino = 0;
327,868✔
279
  PRINT_LOG_START
327,868✔
280
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
327,868✔
281
  if (pRebInfo == NULL) {
327,868✔
282
    pRebInfo = tNewSMqRebSubscribe(key);
322,287✔
283
    if (pRebInfo == NULL) {
322,287✔
284
      code = terrno;
×
285
      goto END;
×
286
    }
287
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
322,287✔
288
    if (code != 0) {
322,287✔
289
      freeRebalanceItem(pRebInfo);
×
290
      taosMemoryFreeClear(pRebInfo);
×
291
      goto END;
×
292
    }
293
    taosMemoryFreeClear(pRebInfo);
322,287✔
294

295
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
322,287✔
296
    MND_TMQ_NULL_CHECK(pRebInfo);
322,287✔
297
  }
298
  if (pReb) {
327,868✔
299
    *pReb = pRebInfo;
263,764✔
300
  }
301

302
END:
64,104✔
303
  PRINT_LOG_END
327,868✔
304
  return code;
327,868✔
305
}
306

307
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
907,736✔
308
  if (vgs == NULL || pHash == NULL || key == NULL) {
907,736✔
309
    return TSDB_CODE_INVALID_PARA;
×
310
  }
311
  int32_t code = 0;
907,736✔
312
  int32_t lino = 0;
907,736✔
313
  PRINT_LOG_START
907,736✔
314
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
907,736✔
315
  MND_TMQ_NULL_CHECK(pVgEp);
907,736✔
316
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
907,736✔
317
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
907,736✔
318
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
907,736✔
319

320
END:
907,388✔
321
  PRINT_LOG_END
907,736✔
322
  return code;
907,736✔
323
}
324

325
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
322,287✔
326
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
322,287✔
327
    return TSDB_CODE_INVALID_PARA;
×
328
  }
329
  int32_t code = 0;
322,287✔
330
  int32_t lino = 0;
322,287✔
331
  PRINT_LOG_START
322,287✔
332
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
322,287✔
333
  int32_t actualRemoved = 0;
322,287✔
334
  for (int32_t i = 0; i < numOfRemoved; i++) {
439,452✔
335
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
117,165✔
336
    MND_TMQ_NULL_CHECK(consumerId);
117,165✔
337
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
117,165✔
338
    if (pConsumerEp == NULL) {
117,165✔
339
      continue;
×
340
    }
341

342
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
117,165✔
343
    for (int32_t j = 0; j < consumerVgNum; j++) {
491,402✔
344
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
374,237✔
345
    }
346

347
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
117,165✔
348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
234,330✔
349
    actualRemoved++;
117,165✔
350
  }
351

352
  if (numOfRemoved != actualRemoved) {
322,287✔
353
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
354
           numOfRemoved, actualRemoved);
355
  } else {
356
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
322,287✔
357
  }
358
END:
×
359
  PRINT_LOG_END
322,287✔
360
  return code;
322,287✔
361
}
362

363
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
322,287✔
364
  if (pOutput == NULL || pInput == NULL) {
322,287✔
365
    return TSDB_CODE_INVALID_PARA;
×
366
  }
367
  int32_t code = 0;
322,287✔
368
  int32_t lino = 0;
322,287✔
369
  PRINT_LOG_START
322,287✔
370
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
322,287✔
371

372
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
468,886✔
373
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
146,599✔
374
    MND_TMQ_NULL_CHECK(consumerId);
146,599✔
375
    SMqConsumerEp newConsumerEp = {0};
146,599✔
376
    newConsumerEp.consumerId = *consumerId;
146,599✔
377
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
146,599✔
378
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
146,599✔
379
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
146,599✔
380
        0) {
381
      freeSMqConsumerEp(&newConsumerEp);
×
382
      code = terrno;
×
383
      goto END;
×
384
    }
385
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
293,198✔
386
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
146,599✔
387
  }
388
END:
322,287✔
389
  PRINT_LOG_END
322,287✔
390
  return code;
322,287✔
391
}
392

393
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
322,287✔
394
  if (pOutput == NULL || pHash == NULL) {
322,287✔
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
322,287✔
398
  int32_t lino = 0;
322,287✔
399
  PRINT_LOG_START
322,287✔
400
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
322,287✔
401
  for (int32_t i = 0; i < numOfVgroups; i++) {
852,622✔
402
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
530,335✔
403
  }
404
END:
322,287✔
405
  PRINT_LOG_END
322,287✔
406
  return code;
322,287✔
407
}
408

409
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
322,287✔
410
                                        int32_t remainderVgCnt) {
411
  if (pOutput == NULL || pHash == NULL) {
322,287✔
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414
  int32_t code = 0;
322,287✔
415
  int32_t lino = 0;
322,287✔
416
  int32_t cnt = 0;
322,287✔
417
  void *  pIter = NULL;
322,287✔
418
  PRINT_LOG_START
322,287✔
419

420
  while (1) {
70,337✔
421
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
392,624✔
422
    if (pIter == NULL) {
392,624✔
423
      break;
322,287✔
424
    }
425

426
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
70,337✔
427
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
70,337✔
428

429
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
140,674✔
430
    if (consumerVgNum > minVgCnt) {
70,337✔
431
      if (cnt < remainderVgCnt) {
2,189✔
432
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
607✔
433
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
434
        }
435
        cnt++;
607✔
436
      } else {
437
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
4,746✔
438
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
3,164✔
439
        }
440
      }
441
    }
442
  }
443
END:
322,287✔
444
  PRINT_LOG_END
322,287✔
445
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
322,287✔
446
  return code;
322,287✔
447
}
448

449
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
322,287✔
450
  if (pMnode == NULL || pOutput == NULL) {
322,287✔
451
    return TSDB_CODE_INVALID_PARA;
×
452
  }
453
  int32_t code = 0;
322,287✔
454
  int32_t lino = 0;
322,287✔
455
  int32_t totalVgNum = 0;
322,287✔
456
  SVgObj *pVgroup = NULL;
322,287✔
457
  void *  pIter = NULL;
322,287✔
458
  void *  pIterHash = NULL;
322,287✔
459
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
322,287✔
460
  MND_TMQ_NULL_CHECK(newVgs);
322,287✔
461
  while (1) {
1,843,360✔
462
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,165,647✔
463
    if (pIter == NULL) {
2,165,647✔
464
      break;
322,287✔
465
    }
466
    if (pVgroup->mountVgId) {
1,843,360✔
467
      sdbRelease(pMnode->pSdb, pVgroup);
×
468
      continue;
×
469
    }
470

471
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,843,360✔
472
      sdbRelease(pMnode->pSdb, pVgroup);
926,807✔
473
      continue;
926,807✔
474
    }
475

476
    totalVgNum++;
916,553✔
477
    SMqVgEp pVgEp = {0};
916,553✔
478
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
916,553✔
479
    pVgEp.vgId = pVgroup->vgId;
916,553✔
480
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
916,553✔
481
    sdbRelease(pMnode->pSdb, pVgroup);
916,553✔
482
  }
483

484
  while (1) {
187,502✔
485
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
509,789✔
486
    if (pIterHash == NULL) break;
509,789✔
487
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
187,502✔
488
    int32_t        j = 0;
187,502✔
489
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
637,824✔
490
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
450,322✔
491
      MND_TMQ_NULL_CHECK(pVgEpTmp);
450,322✔
492
      bool find = false;
450,322✔
493
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
592,374✔
494
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
528,270✔
495
        MND_TMQ_NULL_CHECK(pnewVgEp);
528,270✔
496
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
528,270✔
497
          taosArrayRemove(newVgs, k);
386,218✔
498
          find = true;
386,218✔
499
          break;
386,218✔
500
        }
501
      }
502
      if (!find) {
450,322✔
503
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
64,104✔
504
        taosArrayRemove(pConsumerEp->vgs, j);
64,104✔
505
        continue;
64,104✔
506
      }
507
      j++;
386,218✔
508
    }
509
  }
510

511
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
322,287✔
512
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
64,104✔
513
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
64,104✔
514
  }
515

516
END:
322,287✔
517
  sdbRelease(pMnode->pSdb, pVgroup);
322,287✔
518
  sdbCancelFetch(pMnode->pSdb, pIter);
322,287✔
519
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
322,287✔
520
  taosArrayDestroy(newVgs);
322,287✔
521
  if (code != 0) {
322,287✔
522
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
523
    return code;
×
524
  } else {
525
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
322,287✔
526
    return totalVgNum;
322,287✔
527
  }
528
}
529

530
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
322,287✔
531
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
322,287✔
532
    return TSDB_CODE_INVALID_PARA;
×
533
  }
534
  void *           pIter = NULL;
322,287✔
535
  SMqSubscribeObj *pSub = NULL;
322,287✔
536
  int32_t          lino = 0;
322,287✔
537
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
322,287✔
538
  if (code != 0) {
322,287✔
539
    return 0;
116,782✔
540
  }
541
  taosRLockLatch(&pSub->lock);
205,505✔
542
  PRINT_LOG_START
205,505✔
543
  if (pOutput->pSub->offsetRows == NULL) {
205,505✔
544
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
153,498✔
545
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
153,498✔
546
  }
547
  while (1) {
187,502✔
548
    pIter = taosHashIterate(pSub->consumerHash, pIter);
393,007✔
549
    if (pIter == NULL) break;
393,007✔
550
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
187,502✔
551
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
187,502✔
552

553
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
625,513✔
554
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
438,011✔
555
      MND_TMQ_NULL_CHECK(d1);
438,011✔
556
      bool jump = false;
438,011✔
557
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
568,742✔
558
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
135,777✔
559
        MND_TMQ_NULL_CHECK(pVgEp);
135,777✔
560
        if (pVgEp->vgId == d1->vgId) {
135,777✔
561
          jump = true;
5,046✔
562
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
5,046✔
563
                pConsumerEp->consumerId, pVgEp->vgId);
564
          break;
5,046✔
565
        }
566
      }
567
      if (jump) continue;
438,011✔
568
      bool find = false;
432,965✔
569
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,106,146✔
570
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
777,627✔
571
        MND_TMQ_NULL_CHECK(d2);
777,627✔
572
        if (d1->vgId == d2->vgId) {
777,627✔
573
          d2->rows += d1->rows;
104,446✔
574
          d2->offset = d1->offset;
104,446✔
575
          d2->ever = d1->ever;
104,446✔
576
          find = true;
104,446✔
577
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
104,446✔
578
          break;
104,446✔
579
        }
580
      }
581
      if (!find) {
432,965✔
582
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
657,038✔
583
      }
584
    }
585
  }
586

587
END:
205,505✔
588
  taosRUnLockLatch(&pSub->lock);
205,505✔
589
  taosHashCancelIterate(pSub->consumerHash, pIter);
205,505✔
590
  mndReleaseSubscribe(pMnode, pSub);
205,505✔
591
  PRINT_LOG_END
205,505✔
592
  return code;
205,505✔
593
}
594

595
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
322,287✔
596
  if (pOutput == NULL) return;
322,287✔
597
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
322,287✔
598
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,230,023✔
599
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
907,736✔
600
    if (pOutputRebVg == NULL) continue;
907,736✔
601
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
907,736✔
602
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
603
  }
604

605
  void *pIter = NULL;
322,287✔
606
  while (1) {
216,936✔
607
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
539,223✔
608
    if (pIter == NULL) break;
539,223✔
609
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
216,936✔
610
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
216,936✔
611
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
216,936✔
612
          pConsumerEp->consumerId, sz);
613
    for (int32_t i = 0; i < sz; i++) {
766,363✔
614
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
549,427✔
615
      if (pVgEp == NULL) continue;
549,427✔
616
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
549,427✔
617
            pConsumerEp->consumerId);
618
    }
619
  }
620
}
621

622
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
322,287✔
623
                           int32_t *remainderVgCnt) {
624
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
322,287✔
625
    return;
×
626
  }
627
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
322,287✔
628
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
322,287✔
629
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
322,287✔
630

631
  // calc num
632
  if (numOfFinal != 0) {
322,287✔
633
    *minVgCnt = totalVgNum / numOfFinal;
211,301✔
634
    *remainderVgCnt = totalVgNum % numOfFinal;
211,301✔
635
  } else {
636
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
110,986✔
637
  }
638
  mInfo(
322,287✔
639
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
640
      "remainderVg:%d",
641
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
642
}
643

644
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
322,287✔
645
  if (pOutput == NULL || pHash == NULL) {
322,287✔
646
    return TSDB_CODE_INVALID_PARA;
×
647
  }
648
  SMqRebOutputVg *pRebVg = NULL;
322,287✔
649
  void *          pAssignIter = NULL;
322,287✔
650
  void *          pIter = NULL;
322,287✔
651
  int32_t         code = 0;
322,287✔
652
  int32_t         lino = 0;
322,287✔
653
  PRINT_LOG_START
322,287✔
654

655
  while (1) {
216,936✔
656
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
539,223✔
657
    if (pIter == NULL) {
539,223✔
658
      break;
322,287✔
659
    }
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
216,936✔
661
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
756,632✔
662
      pAssignIter = taosHashIterate(pHash, pAssignIter);
539,696✔
663
      if (pAssignIter == NULL) {
539,696✔
664
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
665
        break;
×
666
      }
667

668
      pRebVg = (SMqRebOutputVg *)pAssignIter;
539,696✔
669
      pRebVg->newConsumerId = pConsumerEp->consumerId;
539,696✔
670
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,079,392✔
671
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
539,696✔
672
            pConsumerEp->consumerId);
673
    }
674
  }
675

676
  while (1) {
1,211✔
677
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
323,498✔
678
    if (pIter == NULL) {
323,498✔
679
      break;
110,986✔
680
    }
681
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
212,512✔
682
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
212,512✔
683
      pAssignIter = taosHashIterate(pHash, pAssignIter);
212,215✔
684
      if (pAssignIter == NULL) {
212,215✔
685
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
211,301✔
686
        break;
211,301✔
687
      }
688

689
      pRebVg = (SMqRebOutputVg *)pAssignIter;
914✔
690
      pRebVg->newConsumerId = pConsumerEp->consumerId;
914✔
691
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,828✔
692
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
914✔
693
            pConsumerEp->consumerId);
694
    }
695
  }
696

697
  if (pAssignIter != NULL) {
322,287✔
698
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
699
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
700
    goto END;
×
701
  }
702
  while (1) {
907,736✔
703
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,230,023✔
704
    if (pAssignIter == NULL) {
1,230,023✔
705
      break;
322,287✔
706
    }
707

708
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
907,736✔
709
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,815,472✔
710
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
907,736✔
711
      MND_TMQ_NULL_CHECK(
734,252✔
712
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
713
    }
714
  }
715

716
END:
322,287✔
717
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
322,287✔
718
  taosHashCancelIterate(pHash, pAssignIter);
322,287✔
719
  PRINT_LOG_END
322,287✔
720
  return code;
322,287✔
721
}
722

723
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
322,287✔
724
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
322,287✔
725
    return TSDB_CODE_INVALID_PARA;
×
726
  }
727
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
322,287✔
728
  if (totalVgNum < 0) {
322,287✔
729
    return totalVgNum;
×
730
  }
731
  const char *pSubKey = pOutput->pSub->key;
322,287✔
732
  int32_t     minVgCnt = 0;
322,287✔
733
  int32_t     remainderVgCnt = 0;
322,287✔
734
  int32_t     code = 0;
322,287✔
735
  int32_t     lino = 0;
322,287✔
736
  PRINT_LOG_START
322,287✔
737
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
322,287✔
738
  MND_TMQ_NULL_CHECK(pHash);
322,287✔
739
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
322,287✔
740
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
322,287✔
741
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
322,287✔
742
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
322,287✔
743
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
322,287✔
744
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
322,287✔
745
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
322,287✔
746
  printRebalanceLog(pOutput);
322,287✔
747

748
END:
322,287✔
749
  taosHashCleanup(pHash);
322,287✔
750
  PRINT_LOG_END
322,287✔
751
  return code;
322,287✔
752
}
753

754
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
767,193✔
755
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
767,193✔
756
    return TSDB_CODE_INVALID_PARA;
×
757
  }
758
  int32_t code = 0;
767,193✔
759
  int32_t lino = 0;
767,193✔
760
  PRINT_LOG_START
767,193✔
761
  SMqConsumerObj *pConsumerNew = NULL;
767,193✔
762
  int32_t         consumerNum = taosArrayGetSize(consumers);
767,193✔
763
  for (int32_t i = 0; i < consumerNum; i++) {
1,034,738✔
764
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
267,545✔
765
    MND_TMQ_NULL_CHECK(consumerId);
267,545✔
766
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
267,545✔
767
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
267,545✔
768
    tDeleteSMqConsumerObj(pConsumerNew);
267,545✔
769
  }
770
  pConsumerNew = NULL;
767,193✔
771

772
END:
767,193✔
773
  PRINT_LOG_END
767,193✔
774
  tDeleteSMqConsumerObj(pConsumerNew);
767,193✔
775
  return code;
767,193✔
776
}
777

778
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
255,731✔
779
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
255,731✔
780
    return TSDB_CODE_INVALID_PARA;
×
781
  }
782
  int32_t code = 0;
255,731✔
783
  int32_t lino = 0;
255,731✔
784
  PRINT_LOG_START
255,731✔
785
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
255,731✔
786
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
255,731✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
255,731✔
788
END:
255,731✔
789
  PRINT_LOG_END
255,731✔
790
  return code;
255,731✔
791
}
792

793
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
322,287✔
794
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
322,287✔
795
    return TSDB_CODE_INVALID_PARA;
×
796
  }
797
  int32_t code = 0;
322,287✔
798
  int32_t lino = 0;
322,287✔
799
  STrans *pTrans = NULL;
322,287✔
800
  PRINT_LOG_START
322,287✔
801

802
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
322,287✔
803
  char cgroup[TSDB_CGROUP_LEN] = {0};
322,287✔
804
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
322,287✔
805

806
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
322,287✔
807
  if (pTrans == NULL) {
322,287✔
808
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
809
    if (terrno != 0) code = terrno;
×
810
    goto END;
×
811
  }
812

813
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
322,287✔
814
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
322,287✔
815

816
  // 1. redo action: action to all vg
817
  const SArray *rebVgs = pOutput->rebVgs;
255,731✔
818
  int32_t       vgNum = taosArrayGetSize(rebVgs);
255,731✔
819
  for (int32_t i = 0; i < vgNum; i++) {
1,035,753✔
820
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
780,022✔
821
    MND_TMQ_NULL_CHECK(pRebVg);
780,022✔
822
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
780,022✔
823
  }
824

825
  // 2. commit log: subscribe and vg assignment
826
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
255,731✔
827

828
  // 3. commit log: consumer to update status and epoch
829
  if (!pOutput->isReload) {
255,731✔
830
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
255,731✔
831
  }
832

833
  // 4. set cb
834
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
255,731✔
835

836
  // 5. execution
837
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
255,731✔
838

839
END:
322,287✔
840
  mndTransDrop(pTrans);
322,287✔
841
  PRINT_LOG_END
322,287✔
842
  TAOS_RETURN(code);
322,287✔
843
}
844

845
// type = 0 remove  type = 1 add
846
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
514,015✔
847
  if (rebSubHash == NULL || topicList == NULL) {
514,015✔
848
    return TSDB_CODE_INVALID_PARA;
×
849
  }
850
  int32_t code = 0;
514,015✔
851
  int32_t lino = 0;
514,015✔
852
  PRINT_LOG_START
514,015✔
853
  int32_t topicNum = taosArrayGetSize(topicList);
514,015✔
854
  for (int32_t i = 0; i < topicNum; i++) {
777,779✔
855
    char *removedTopic = taosArrayGetP(topicList, i);
263,764✔
856
    MND_TMQ_NULL_CHECK(removedTopic);
263,764✔
857
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
263,764✔
858
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
263,764✔
859
    SMqRebInfo *pRebSub = NULL;
263,764✔
860
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
263,764✔
861
    if (type == 0)
263,764✔
862
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
234,330✔
863
    else if (type == 1)
146,599✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
293,198✔
865
  }
866

867
END:
514,015✔
868
  PRINT_LOG_END
514,015✔
869
  return code;
514,015✔
870
}
871

872
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
976,012✔
873
  SMqSubscribeObj *pSub = NULL;
976,012✔
874
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
976,012✔
875
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
976,012✔
876
  int32_t code = 0;
976,012✔
877
  int32_t lino = 0;
976,012✔
878

879
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
976,012✔
880
  taosRLockLatch(&pSub->lock);
976,012✔
881
  // iterate all vg assigned to the consumer of that topic
882
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
976,012✔
883
  MND_TMQ_NULL_CHECK(pConsumerEp);
976,012✔
884
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
976,012✔
885
  for (int32_t j = 0; j < vgNum; j++) {
3,199,745✔
886
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
2,223,733✔
887
    if (pVgEp == NULL) {
2,223,733✔
888
      continue;
×
889
    }
890
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,223,733✔
891
    if (!pVgroup) {
2,223,733✔
892
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
64,104✔
893
      if (code != 0) {
64,104✔
894
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
895
      } else {
896
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
64,104✔
897
      }
898
    }
899
    mndReleaseVgroup(pMnode, pVgroup);
2,223,733✔
900
  }
901

902
END:
976,012✔
903
  if (pSub != NULL) {
976,012✔
904
    taosRUnLockLatch(&pSub->lock);
976,012✔
905
  }
906
  mndReleaseSubscribe(pMnode, pSub);
976,012✔
907
}
976,012✔
908

909
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
939,547✔
910
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
939,547✔
911
    return;
×
912
  }
913
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
939,547✔
914
  for (int32_t i = 0; i < newTopicNum; i++) {
1,915,559✔
915
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
976,012✔
916
    if (topic == NULL) {
976,012✔
917
      continue;
×
918
    }
919
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
976,012✔
920
  }
921
}
922

923
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,198,622✔
924
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,394,433✔
925
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,195,811✔
926
}
927

928
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,302,731✔
929
  int32_t code = 0;
1,302,731✔
930
  int32_t lino = 0;
1,302,731✔
931
  PRINT_LOG_START
1,302,731✔
932
  taosRLockLatch(&pConsumer->lock);
1,302,731✔
933

934
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,302,731✔
935
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,302,731✔
936
  int32_t status = atomic_load_32(&pConsumer->status);
1,302,731✔
937

938
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,302,731✔
939
         ", hbstatus:%d, pollStatus:%d",
940
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
941
         hbStatus, pollStatus);
942

943
  if (status == MQ_CONSUMER_STATUS_READY) {
1,302,731✔
944
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,047,161✔
945
      MND_TMQ_RETURN_CHECK(
104,109✔
946
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
947
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
943,052✔
948
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,505✔
949
            ", hb lost cnt:%d, or long time no poll cnt:%d",
950
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
951
            pConsumer->createTime, hbStatus, pollStatus);
952
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,505✔
953
    } else {
954
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
939,547✔
955
    }
956
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
255,570✔
957
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
255,255✔
958
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
255,255✔
959
  } else {
960
    MND_TMQ_RETURN_CHECK(
315✔
961
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
962
  }
963

964
END:
315✔
965
  taosRUnLockLatch(&pConsumer->lock);
1,302,731✔
966
  PRINT_LOG_END
1,302,731✔
967
  return code;
1,302,731✔
968
}
969

970
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
12,613,490✔
971
  if (pMsg == NULL || rebSubHash == NULL) {
12,613,490✔
972
    return TSDB_CODE_INVALID_PARA;
×
973
  }
974
  SMnode *        pMnode = pMsg->info.node;
12,613,490✔
975
  SSdb *          pSdb = pMnode->pSdb;
12,613,490✔
976
  SMqConsumerObj *pConsumer = NULL;
12,613,490✔
977
  void *          pIter = NULL;
12,613,490✔
978
  int32_t         code = 0;
12,613,490✔
979
  int32_t         lino = 0;
12,613,490✔
980
  PRINT_LOG_START
12,613,490✔
981

982
  // iterate all consumers, find all modification
983
  while (1) {
984
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
13,916,221✔
985
    if (pIter == NULL) {
13,916,221✔
986
      break;
12,613,490✔
987
    }
988
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,302,731✔
989
    mndReleaseConsumer(pMnode, pConsumer);
1,302,731✔
990
  }
991
END:
12,613,490✔
992
  PRINT_LOG_END
12,613,490✔
993
  sdbCancelFetch(pSdb, pIter);
12,613,490✔
994
  mndReleaseConsumer(pMnode, pConsumer);
12,613,490✔
995
  return code;
12,613,490✔
996
}
997

998
bool mndRebTryStart() {
12,613,490✔
999
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
12,613,490✔
1000
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
12,613,490✔
1001
}
1002

1003
void mndRebCntInc() {
258,395✔
1004
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
258,395✔
1005
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
258,395✔
1006
}
258,395✔
1007

1008
void mndRebCntDec() {
12,871,885✔
1009
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
12,871,885✔
1010
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
12,871,885✔
1011
}
12,871,885✔
1012

1013
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
644,574✔
1014
  if (rebOutput == NULL) {
644,574✔
1015
    return;
322,287✔
1016
  }
1017
  taosArrayDestroy(rebOutput->newConsumers);
322,287✔
1018
  rebOutput->newConsumers = NULL;
322,287✔
1019
  taosArrayDestroy(rebOutput->modifyConsumers);
322,287✔
1020
  rebOutput->modifyConsumers = NULL;
322,287✔
1021
  taosArrayDestroy(rebOutput->removedConsumers);
322,287✔
1022
  rebOutput->removedConsumers = NULL;
322,287✔
1023
  taosArrayDestroy(rebOutput->rebVgs);
322,287✔
1024
  rebOutput->rebVgs = NULL;
322,287✔
1025
  tDeleteSubscribeObj(rebOutput->pSub);
322,287✔
1026
  taosMemoryFree(rebOutput->pSub);
322,287✔
1027
  rebOutput->pSub = NULL;
322,287✔
1028
}
1029

1030
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
322,287✔
1031
  if (rebOutput == NULL) {
322,287✔
1032
    return TSDB_CODE_INVALID_PARA;
×
1033
  }
1034
  int32_t code = 0;
322,287✔
1035
  int32_t lino = 0;
322,287✔
1036
  PRINT_LOG_START
322,287✔
1037
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
322,287✔
1038
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
322,287✔
1039
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
322,287✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
322,287✔
1041
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
322,287✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
322,287✔
1043
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
322,287✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
322,287✔
1045
  rebOutput = NULL;
322,287✔
1046

1047
END:
322,287✔
1048
  PRINT_LOG_END
322,287✔
1049
  clearRebOutput(rebOutput);
322,287✔
1050
  return code;
322,287✔
1051
}
1052

1053
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
322,287✔
1054
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
322,287✔
1055
    return TSDB_CODE_INVALID_PARA;
×
1056
  }
1057
  const char *     key = rebInput->pRebInfo->key;
322,287✔
1058
  SMqSubscribeObj *pSub = NULL;
322,287✔
1059
  SMqTopicObj *    pTopic = NULL;
322,287✔
1060
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
322,287✔
1061
  int32_t          lino = 0;
322,287✔
1062
  PRINT_LOG_START
322,287✔
1063

1064
  if (code != 0) {
322,287✔
1065
    // split sub key and extract topic
1066
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
116,782✔
1067
    char cgroup[TSDB_CGROUP_LEN] = {0};
116,782✔
1068
    mndSplitSubscribeKey(key, topic, cgroup, true);
116,782✔
1069
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
116,782✔
1070
    taosRLockLatch(&pTopic->lock);
116,782✔
1071
    rebInput->oldConsumerNum = 0;
116,782✔
1072
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
116,782✔
1073
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
116,782✔
1074
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
116,782✔
1075
  } else {
1076
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
205,505✔
1077
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
205,505✔
1078

1079
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
205,505✔
1080
          taosHashGetSize(rebOutput->pSub->consumerHash));
1081
  }
1082

1083
END:
321,939✔
1084
  PRINT_LOG_END
322,287✔
1085
  if (pTopic != NULL) {
322,287✔
1086
    taosRUnLockLatch(&pTopic->lock);
116,782✔
1087
  }
1088
  mndReleaseTopic(pMnode, pTopic);
322,287✔
1089
  mndReleaseSubscribe(pMnode, pSub);
322,287✔
1090
  return code;
322,287✔
1091
}
1092

1093
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1094
  int32_t code = 0;
×
1095
  int32_t lino = 0;
×
1096

1097
  void *pIterConsumer = NULL;
×
1098

1099
  PRINT_LOG_START
×
1100
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1101
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1102

1103
  SMqConsumerEp *pConsumerEp = NULL;
×
1104

1105
  while (1) {
1106
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1107
    if (pIterConsumer == NULL) break;
×
1108
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1109

1110
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1111
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1112
      MND_TMQ_NULL_CHECK(pVgEp);
×
1113
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1114
      MND_TMQ_NULL_CHECK(vg);
×
1115
      vg->pVgEp = *pVgEp;
×
1116
      vg->oldConsumerId = -1;
×
1117
      vg->newConsumerId = pConsumerEp->consumerId;
×
1118
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1119
             vg->newConsumerId);
1120
    }
1121
  }
1122
END:
×
1123
  PRINT_LOG_END
×
1124
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1125
  return code;
×
1126
}
1127

1128
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1129
  int32_t code = 0;
×
1130
  int32_t lino = 0;
×
1131

1132
  SMnode *        pMnode = pMsg->info.node;
×
1133
  SMqRebOutputObj rebOutput = {0};
×
1134

1135
  PRINT_LOG_START
×
1136
  taosRLockLatch(&pSub->lock);
×
1137

1138
  // topic and cgroup
1139
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1140
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1141
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1142
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1143
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1144
    goto END;
×
1145
  }
1146

1147
  rebOutput.pSub = pSub;
×
1148
  rebOutput.isReload = true;
×
1149
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1150
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1151
  if (code != 0) {
×
1152
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1153
  }
1154

1155
END:
×
1156
  taosRUnLockLatch(&pSub->lock);
×
1157
  rebOutput.pSub = NULL;  // avoid double free
×
1158
  clearRebOutput(&rebOutput);
×
1159
  PRINT_LOG_END
×
1160
  return code;
×
1161
}
1162

1163
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1164
  SMnode *pMnode = pMsg->info.node;
×
1165

1166
  SSdb *           pSdb = pMnode->pSdb;
×
1167
  SMqSubscribeObj *pSub = NULL;
×
1168
  int32_t          code = 0;
×
1169
  int32_t          lino = 0;
×
1170

1171
  PRINT_LOG_START
×
1172
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1173
  void *pIter = NULL;
×
1174
  while (1) {
1175
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1176
    if (pIter == NULL) {
×
1177
      break;
×
1178
    }
1179

1180
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1181
    sdbRelease(pSdb, pSub);
×
1182
  }
1183
  taosHashClear(topicsToReload);
×
1184
END:
×
1185
  sdbCancelFetch(pSdb, pIter);
×
1186
  sdbRelease(pSdb, pSub);
×
1187
  PRINT_LOG_END
×
1188

1189
  return code;
×
1190
}
1191

1192
static int32_t normalRebalance(SRpcMsg *pMsg) {
12,613,490✔
1193
  int     code = 0;
12,613,490✔
1194
  int32_t lino = 0;
12,613,490✔
1195

1196
  void *  pIter = NULL;
12,613,490✔
1197
  SMnode *pMnode = pMsg->info.node;
12,613,490✔
1198

1199
  PRINT_LOG_START
12,613,490✔
1200
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
12,613,490✔
1201
  MND_TMQ_NULL_CHECK(rebSubHash);
12,613,490✔
1202

1203
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
12,613,490✔
1204

1205
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
12,613,490✔
1206
  if (taosHashGetSize(rebSubHash) > 0) {
12,613,490✔
1207
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
281,458✔
1208
  }
1209

1210
  while (1) {
322,287✔
1211
    pIter = taosHashIterate(rebSubHash, pIter);
12,935,777✔
1212
    if (pIter == NULL) {
12,935,777✔
1213
      break;
12,613,490✔
1214
    }
1215

1216
    SMqRebInputObj  rebInput = {0};
322,287✔
1217
    SMqRebOutputObj rebOutput = {0};
322,287✔
1218
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
322,287✔
1219
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
322,287✔
1220
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
322,287✔
1221
    if (code != 0) {
322,287✔
1222
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1223
    }
1224

1225
    if (code == 0) {
322,287✔
1226
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
322,287✔
1227
      if (code != 0) {
322,287✔
1228
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1229
      }
1230
    }
1231

1232
    if (code == 0) {
322,287✔
1233
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
322,287✔
1234
      if (code != 0) {
322,287✔
1235
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
66,556✔
1236
      }
1237
    }
1238

1239
    clearRebOutput(&rebOutput);
322,287✔
1240
  }
1241

1242
  if (taosHashGetSize(rebSubHash) > 0) {
12,613,490✔
1243
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
281,458✔
1244
  }
1245

1246
END:
12,332,032✔
1247
  PRINT_LOG_END
12,613,490✔
1248
  taosHashCancelIterate(rebSubHash, pIter);
12,613,490✔
1249
  taosHashCleanup(rebSubHash);
12,613,490✔
1250
  return code;
12,613,490✔
1251
}
1252

1253
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
12,613,490✔
1254
  if (pMsg == NULL) {
12,613,490✔
1255
    return TSDB_CODE_INVALID_PARA;
×
1256
  }
1257
  int     code = 0;
12,613,490✔
1258
  int32_t lino = 0;
12,613,490✔
1259

1260
  void *  pIter = NULL;
12,613,490✔
1261
  SMnode *pMnode = pMsg->info.node;
12,613,490✔
1262
  PRINT_LOG_START;
12,613,490✔
1263
  if (!mndRebTryStart()) {
12,613,490✔
1264
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
1265
    goto END;
×
1266
  }
1267

1268
  if (taosHashGetSize(topicsToReload) > 0) {
12,613,490✔
1269
    code = reloadRebalance(pMsg);
×
1270
  } else {
1271
    code = normalRebalance(pMsg);
12,613,490✔
1272
  }
1273

1274
  mndRebCntDec();
12,613,490✔
1275

1276
END:
12,613,490✔
1277
  PRINT_LOG_END
12,613,490✔
1278
  TAOS_RETURN(code);
12,613,490✔
1279
}
1280

1281
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
71,832✔
1282
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
71,832✔
1283
    return TSDB_CODE_INVALID_PARA;
×
1284
  }
1285
  void *  pIter = NULL;
71,832✔
1286
  SVgObj *pVgObj = NULL;
71,832✔
1287
  int32_t code = 0;
71,832✔
1288
  int32_t lino = 0;
71,832✔
1289
  PRINT_LOG_START
71,832✔
1290
  while (1) {
524,339✔
1291
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
596,171✔
1292
    if (pIter == NULL) {
596,171✔
1293
      break;
71,832✔
1294
    }
1295
    if (pVgObj->mountVgId) {
524,339✔
1296
      sdbRelease(pMnode->pSdb, pVgObj);
×
1297
      continue;
×
1298
    }
1299

1300
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
524,339✔
1301
      sdbRelease(pMnode->pSdb, pVgObj);
285,313✔
1302
      continue;
285,313✔
1303
    }
1304
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
239,026✔
1305
    MND_TMQ_NULL_CHECK(pReq);
239,026✔
1306
    pReq->head.vgId = htonl(pVgObj->vgId);
239,026✔
1307
    pReq->vgId = pVgObj->vgId;
239,026✔
1308
    pReq->consumerId = -1;
239,026✔
1309
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
239,026✔
1310

1311
    STransAction action = {0};
239,026✔
1312
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
239,026✔
1313
    action.pCont = pReq;
239,026✔
1314
    action.contLen = sizeof(SMqVDeleteReq);
239,026✔
1315
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
239,026✔
1316
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
239,026✔
1317

1318
    sdbRelease(pMnode->pSdb, pVgObj);
239,026✔
1319
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
239,026✔
1320
  }
1321

1322
END:
71,832✔
1323
  PRINT_LOG_END
71,832✔
1324
  sdbRelease(pMnode->pSdb, pVgObj);
71,832✔
1325
  sdbCancelFetch(pMnode->pSdb, pIter);
71,832✔
1326
  return code;
71,832✔
1327
}
1328

1329
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
564✔
1330
                                   char *cgroup) {
1331
  int32_t         code = 0;
564✔
1332
  int32_t         lino = 0;
564✔
1333
  SMqConsumerObj *pConsumerNew = NULL;
564✔
1334

1335
  taosRLockLatch(&pConsumer->lock);
564✔
1336

1337
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
564✔
1338
    goto END;
×
1339
  }
1340

1341
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
564✔
1342
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
564✔
1343
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
564✔
1344
  if (found1 || found2 || found3) {
564✔
1345
    if (deleteConsumer) {
315✔
1346
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
315✔
1347
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
315✔
1348
      tDeleteSMqConsumerObj(pConsumerNew);
315✔
1349
      pConsumerNew = NULL;
315✔
1350
    } else {
1351
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1352
             pConsumer->consumerId, pConsumer->cgroup);
1353
      code = TSDB_CODE_MND_CGROUP_USED;
×
1354
      goto END;
×
1355
    }
1356
  }
1357

1358
END:
564✔
1359
  tDeleteSMqConsumerObj(pConsumerNew);
564✔
1360
  taosRUnLockLatch(&pConsumer->lock);
564✔
1361
  return code;
564✔
1362
}
1363

1364
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
564✔
1365
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
564✔
1366
    return TSDB_CODE_INVALID_PARA;
×
1367
  }
1368
  void *          pIter = NULL;
564✔
1369
  SMqConsumerObj *pConsumer = NULL;
564✔
1370
  int             code = 0;
564✔
1371
  int32_t         lino = 0;
564✔
1372
  PRINT_LOG_START
564✔
1373
  while (1) {
1374
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,128✔
1375
    if (pIter == NULL) {
1,128✔
1376
      break;
564✔
1377
    }
1378
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
564✔
1379
    sdbRelease(pMnode->pSdb, pConsumer);
564✔
1380
  }
1381

1382
END:
564✔
1383
  sdbRelease(pMnode->pSdb, pConsumer);
564✔
1384
  sdbCancelFetch(pMnode->pSdb, pIter);
564✔
1385
  return code;
564✔
1386
}
1387

1388
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
564✔
1389
  int32_t code = 0;
564✔
1390
  int32_t lino = 0;
564✔
1391
  STrans *pTrans = NULL;
564✔
1392
  SMnode *pMnode = pMsg->info.node;
564✔
1393
  PRINT_LOG_START
564✔
1394
  taosRLockLatch(&pSub->lock);
564✔
1395
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
564✔
1396
    code = TSDB_CODE_MND_CGROUP_USED;
×
1397
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1398
    goto END;
×
1399
  }
1400

1401
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
564✔
1402
  MND_TMQ_NULL_CHECK(pTrans);
564✔
1403
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
564✔
1404
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
564✔
1405
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
564✔
1406
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
564✔
1407
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
564✔
1408
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
564✔
1409
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
564✔
1410

1411
END:
564✔
1412
  taosRUnLockLatch(&pSub->lock);
564✔
1413
  mndTransDrop(pTrans);
564✔
1414
  PRINT_LOG_END
564✔
1415
  return code;
564✔
1416
}
1417

1418
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
564✔
1419
  if (pMsg == NULL) {
564✔
1420
    return TSDB_CODE_INVALID_PARA;
×
1421
  }
1422
  SMnode *        pMnode = pMsg->info.node;
564✔
1423
  SMDropCgroupReq dropReq = {0};
564✔
1424
  int32_t         code = 0;
564✔
1425
  int32_t         lino = 0;
564✔
1426

1427
  SMqSubscribeObj *pSub = NULL;
564✔
1428

1429
  PRINT_LOG_START
564✔
1430
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
564✔
1431
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
564✔
1432
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
564✔
1433
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
564✔
1434
  if (code != 0) {
564✔
1435
    if (dropReq.igNotExists) {
×
1436
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1437
      mndReleaseSubscribe(pMnode, pSub);
×
1438
      return 0;
×
1439
    } else {
1440
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1441
      goto END;
×
1442
    }
1443
  }
1444
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
564✔
1445

1446
END:
564✔
1447
  mndReleaseSubscribe(pMnode, pSub);
564✔
1448
  PRINT_LOG_END
564✔
1449

1450
  if (code != 0) {
564✔
1451
    TAOS_RETURN(code);
×
1452
  }
1453
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
564✔
1454
}
1455

1456
void mndCleanupSubscribe(SMnode *pMnode) {}
397,500✔
1457

1458
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
424,841✔
1459
  if (pSub == NULL) {
424,841✔
1460
    return NULL;
×
1461
  }
1462
  int32_t code = 0;
424,841✔
1463
  int32_t lino = 0;
424,841✔
1464
  terrno = TSDB_CODE_OUT_OF_MEMORY;
424,841✔
1465
  void *  buf = NULL;
424,841✔
1466
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
424,841✔
1467
  if (tlen <= 0) goto SUB_ENCODE_OVER;
424,841✔
1468
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
424,841✔
1469

1470
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
424,841✔
1471
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
424,841✔
1472

1473
  buf = taosMemoryMalloc(tlen);
424,841✔
1474
  if (buf == NULL) goto SUB_ENCODE_OVER;
424,841✔
1475

1476
  void *abuf = buf;
424,841✔
1477
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
424,841✔
1478
    goto SUB_ENCODE_OVER;
×
1479
  }
1480

1481
  int32_t dataPos = 0;
424,841✔
1482
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
424,841✔
1483
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
424,841✔
1484
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
424,841✔
1485
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
424,841✔
1486

1487
  terrno = TSDB_CODE_SUCCESS;
424,841✔
1488

1489
SUB_ENCODE_OVER:
424,841✔
1490
  taosMemoryFreeClear(buf);
424,841✔
1491
  if (terrno != TSDB_CODE_SUCCESS) {
424,841✔
1492
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1493
    sdbFreeRaw(pRaw);
×
1494
    return NULL;
×
1495
  }
1496

1497
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
424,841✔
1498
  return pRaw;
424,841✔
1499
}
1500

1501
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
341,907✔
1502
  if (pRaw == NULL) {
341,907✔
1503
    return NULL;
×
1504
  }
1505
  int32_t code = 0;
341,907✔
1506
  int32_t lino = 0;
341,907✔
1507
  terrno = TSDB_CODE_OUT_OF_MEMORY;
341,907✔
1508
  SSdbRow *        pRow = NULL;
341,907✔
1509
  SMqSubscribeObj *pSub = NULL;
341,907✔
1510
  void *           buf = NULL;
341,907✔
1511

1512
  int8_t sver = 0;
341,907✔
1513
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
341,907✔
1514

1515
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
341,907✔
1516
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1517
    goto SUB_DECODE_OVER;
×
1518
  }
1519

1520
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
341,907✔
1521
  if (pRow == NULL) goto SUB_DECODE_OVER;
341,907✔
1522

1523
  pSub = sdbGetRowObj(pRow);
341,907✔
1524
  if (pSub == NULL) goto SUB_DECODE_OVER;
341,907✔
1525

1526
  int32_t dataPos = 0;
341,907✔
1527
  int32_t tlen;
341,559✔
1528
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
341,907✔
1529
  buf = taosMemoryMalloc(tlen);
341,907✔
1530
  if (buf == NULL) goto SUB_DECODE_OVER;
341,907✔
1531
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
341,907✔
1532
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
341,907✔
1533

1534
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
341,907✔
1535
    goto SUB_DECODE_OVER;
×
1536
  }
1537

1538
  // update epset saved in mnode
1539
  if (pSub->unassignedVgs != NULL) {
341,907✔
1540
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
341,907✔
1541
    for (int32_t i = 0; i < size; ++i) {
947,255✔
1542
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
605,348✔
1543
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
605,348✔
1544
    }
1545
  }
1546
  if (pSub->consumerHash != NULL) {
341,907✔
1547
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
341,907✔
1548
    while (pIter) {
506,208✔
1549
      SMqConsumerEp *pConsumerEp = pIter;
164,301✔
1550
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
164,301✔
1551
      for (int32_t i = 0; i < size; ++i) {
611,646✔
1552
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
447,345✔
1553
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
447,345✔
1554
      }
1555
      pIter = taosHashIterate(pSub->consumerHash, pIter);
164,301✔
1556
    }
1557
  }
1558

1559
  terrno = TSDB_CODE_SUCCESS;
341,907✔
1560

1561
SUB_DECODE_OVER:
341,907✔
1562
  taosMemoryFreeClear(buf);
341,907✔
1563
  if (terrno != TSDB_CODE_SUCCESS) {
341,907✔
1564
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1565
    taosMemoryFreeClear(pRow);
×
1566
    return NULL;
×
1567
  }
1568

1569
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
341,907✔
1570
  return pRow;
341,907✔
1571
}
1572

1573
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
129,538✔
1574
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
129,538✔
1575
  return 0;
129,538✔
1576
}
1577

1578
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
341,907✔
1579
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
341,907✔
1580
  tDeleteSubscribeObj(pSub);
341,907✔
1581
  return 0;
341,907✔
1582
}
1583

1584
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
140,009✔
1585
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
140,009✔
1586
  mDebug("subscribe:%s, perform update action", pOldSub->key);
140,009✔
1587

1588
  taosWLockLatch(&pOldSub->lock);
140,009✔
1589
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
140,009✔
1590
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
140,009✔
1591
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
140,009✔
1592
  taosWUnLockLatch(&pOldSub->lock);
140,009✔
1593

1594
  return 0;
140,009✔
1595
}
1596

1597
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
3,377,583✔
1598
  if (pMnode == NULL || key == NULL || pSub == NULL) {
3,377,583✔
1599
    return TSDB_CODE_INVALID_PARA;
×
1600
  }
1601
  SSdb *pSdb = pMnode->pSdb;
3,377,583✔
1602
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
3,377,583✔
1603
  if (*pSub == NULL) {
3,377,583✔
1604
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
233,564✔
1605
  }
1606
  return 0;
3,143,682✔
1607
}
1608

1609
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
153,036✔
1610
  if (pMnode == NULL || topicName == NULL) return 0;
153,036✔
1611
  int32_t num = 0;
153,036✔
1612
  SSdb *  pSdb = pMnode->pSdb;
153,036✔
1613

1614
  void *           pIter = NULL;
153,036✔
1615
  SMqSubscribeObj *pSub = NULL;
153,036✔
1616
  while (1) {
196,576✔
1617
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
349,612✔
1618
    if (pIter == NULL) break;
349,612✔
1619

1620
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
196,576✔
1621
    char cgroup[TSDB_CGROUP_LEN] = {0};
196,576✔
1622
    taosRLockLatch(&pSub->lock);
196,576✔
1623
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
196,576✔
1624
    taosRUnLockLatch(&pSub->lock);
196,576✔
1625
    if (strcmp(topic, topicName) != 0) {
196,576✔
1626
      sdbRelease(pSdb, pSub);
98,314✔
1627
      continue;
98,314✔
1628
    }
1629

1630
    num++;
98,262✔
1631
    sdbRelease(pSdb, pSub);
98,262✔
1632
  }
1633

1634
  return num;
153,036✔
1635
}
1636

1637
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
3,260,801✔
1638
  if (pMnode == NULL || pSub == NULL) return;
3,260,801✔
1639
  SSdb *pSdb = pMnode->pSdb;
3,144,019✔
1640
  sdbRelease(pSdb, pSub);
3,144,019✔
1641
}
1642

1643
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
71,832✔
1644
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
71,832✔
1645
  int32_t code = 0;
71,832✔
1646
  int32_t lino = 0;
71,832✔
1647
  PRINT_LOG_START
71,832✔
1648
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
71,832✔
1649
  MND_TMQ_NULL_CHECK(pCommitRaw);
71,832✔
1650
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
71,832✔
1651
  if (code != 0) {
71,832✔
1652
    sdbFreeRaw(pCommitRaw);
×
1653
    goto END;
×
1654
  }
1655
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
71,832✔
1656
END:
71,832✔
1657
  PRINT_LOG_END
71,832✔
1658
  return code;
71,832✔
1659
}
1660

1661
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
136,512✔
1662
  int32_t code = 0;
136,512✔
1663
  int32_t lino = 0;
136,512✔
1664
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
136,512✔
1665
  char    cgroup[TSDB_CGROUP_LEN] = {0};
136,512✔
1666
  taosRLockLatch(&pSub->lock);
136,512✔
1667
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
136,512✔
1668
  if (strcmp(topic, topicName) != 0) {
136,512✔
1669
    goto END;
65,244✔
1670
  }
1671

1672
  // iter all vnode to delete handle
1673
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
71,268✔
1674
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1675
    goto END;
×
1676
  }
1677

1678
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
71,268✔
1679
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
71,268✔
1680

1681
END:
136,512✔
1682
  taosRUnLockLatch(&pSub->lock);
136,512✔
1683

1684
  return code;
136,512✔
1685
}
1686

1687
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
94,541✔
1688
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
94,541✔
1689
  SSdb *           pSdb = pMnode->pSdb;
94,541✔
1690
  int32_t          code = 0;
94,541✔
1691
  int32_t          lino = 0;
94,541✔
1692
  void *           pIter = NULL;
94,541✔
1693
  SMqSubscribeObj *pSub = NULL;
94,541✔
1694
  PRINT_LOG_START
94,541✔
1695
  while (1) {
1696
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
231,053✔
1697
    if (pIter == NULL) break;
231,053✔
1698

1699
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
136,512✔
1700
    sdbRelease(pSdb, pSub);
136,512✔
1701
  }
1702

1703
END:
94,541✔
1704
  PRINT_LOG_END
94,541✔
1705
  sdbRelease(pSdb, pSub);
94,541✔
1706
  sdbCancelFetch(pSdb, pIter);
94,541✔
1707

1708
  TAOS_RETURN(code);
94,541✔
1709
}
1710

1711
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
204,052✔
1712
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1713
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
204,052✔
1714
    return TSDB_CODE_INVALID_PARA;
×
1715
  }
1716
  int32_t code = 0;
204,052✔
1717
  int32_t lino = 0;
204,052✔
1718
  PRINT_LOG_START
204,052✔
1719
  int32_t sz = taosArrayGetSize(vgs);
204,052✔
1720
  for (int32_t j = 0; j < sz; j++) {
359,925✔
1721
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
155,873✔
1722
    MND_TMQ_NULL_CHECK(pVgEp);
155,873✔
1723

1724
    SColumnInfoData *pColInfo = NULL;
155,873✔
1725
    int32_t          cols = 0;
155,873✔
1726

1727
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1728
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1729
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
155,873✔
1730

1731
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1732
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1733
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
155,873✔
1734

1735
    // vg id
1736
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1737
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1738
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
155,873✔
1739

1740
    // consumer id
1741
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
155,873✔
1742
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
155,873✔
1743
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
155,873✔
1744

1745
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1746
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1747
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
155,873✔
1748

1749
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
155,873✔
1750
    if (user) STR_TO_VARSTR(userStr, user);
155,873✔
1751
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1752
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1753
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
155,873✔
1754

1755
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
155,873✔
1756
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
155,873✔
1757
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
155,873✔
1758
    MND_TMQ_NULL_CHECK(pColInfo);
155,873✔
1759
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
155,873✔
1760

1761
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
155,873✔
1762
          varDataVal(cgroup), pVgEp->vgId);
1763

1764
    // offset
1765
    OffsetRows *data = NULL;
155,873✔
1766
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
386,026✔
1767
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
230,153✔
1768
      MND_TMQ_NULL_CHECK(tmp);
230,153✔
1769
      if (tmp->vgId != pVgEp->vgId) {
230,153✔
1770
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1771
        continue;
158,996✔
1772
      }
1773
      data = tmp;
71,157✔
1774
    }
1775
    if (data) {
155,873✔
1776
      // vg id
1777
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
71,157✔
1778
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
71,157✔
1779
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
142,314✔
1780
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
71,157✔
1781
      varDataSetLen(buf, strlen(varDataVal(buf)));
71,157✔
1782
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
71,157✔
1783
      MND_TMQ_NULL_CHECK(pColInfo);
71,157✔
1784
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
71,157✔
1785
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
71,157✔
1786
      MND_TMQ_NULL_CHECK(pColInfo);
71,157✔
1787
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
71,157✔
1788
    } else {
1789
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
84,716✔
1790
      MND_TMQ_NULL_CHECK(pColInfo);
84,716✔
1791
      colDataSetNULL(pColInfo, *numOfRows);
84,716✔
1792
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
84,716✔
1793
      MND_TMQ_NULL_CHECK(pColInfo);
84,716✔
1794
      colDataSetNULL(pColInfo, *numOfRows);
84,716✔
1795
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
84,716✔
1796
    }
1797
    (*numOfRows)++;
155,873✔
1798
  }
1799

1800
END:
204,052✔
1801
  PRINT_LOG_END
204,052✔
1802
  return code;
204,052✔
1803
}
1804

1805
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SSDataBlock *pBlock, int32_t *numOfRows,
110,887✔
1806
                           int32_t rowsCapacity) {
1807
  int32_t        code = 0;
110,887✔
1808
  int32_t        lino = 0;
110,887✔
1809
  SMnode *       pMnode = pReq->info.node;
110,887✔
1810
  SSdb *         pSdb = pMnode->pSdb;
110,887✔
1811
  SMqConsumerEp *pConsumerEp = NULL;
110,887✔
1812
  void *         pIter = NULL;
110,887✔
1813
  PRINT_LOG_START
110,887✔
1814

1815
  taosRLockLatch(&pSub->lock);
110,887✔
1816
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
110,887✔
1817
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1818
  }
1819

1820
  // topic and cgroup
1821
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
110,887✔
1822
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
110,887✔
1823
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
110,887✔
1824
  varDataSetLen(topic, strlen(varDataVal(topic)));
110,887✔
1825
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
110,887✔
1826

1827
  while (1) {
93,165✔
1828
    pIter = taosHashIterate(pSub->consumerHash, pIter);
204,052✔
1829
    if (pIter == NULL) break;
204,052✔
1830
    pConsumerEp = (SMqConsumerEp *)pIter;
93,165✔
1831

1832
    char *          user = NULL;
93,165✔
1833
    char *          fqdn = NULL;
93,165✔
1834
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
93,165✔
1835
    if (pConsumer != NULL) {
93,165✔
1836
      user = pConsumer->user;
93,165✔
1837
      fqdn = pConsumer->fqdn;
93,165✔
1838
      sdbRelease(pSdb, pConsumer);
93,165✔
1839
    }
1840
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
93,165✔
1841
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1842
  }
1843

1844
  MND_TMQ_RETURN_CHECK(
110,887✔
1845
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1846

1847
  pBlock->info.rows = *numOfRows;
110,887✔
1848

1849
END:
110,887✔
1850
  taosRUnLockLatch(&pSub->lock);
110,887✔
1851
  taosHashCancelIterate(pSub->consumerHash, pIter);
110,887✔
1852
  PRINT_LOG_END
110,887✔
1853
  return code;
110,887✔
1854
}
1855

1856
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
22,603✔
1857
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
22,603✔
1858
    return TSDB_CODE_INVALID_PARA;
×
1859
  }
1860
  SMnode *         pMnode = pReq->info.node;
22,603✔
1861
  SSdb *           pSdb = pMnode->pSdb;
22,603✔
1862
  int32_t          numOfRows = 0;
22,603✔
1863
  SMqSubscribeObj *pSub = NULL;
22,603✔
1864
  int32_t          code = 0;
22,603✔
1865
  int32_t          lino = 0;
22,603✔
1866

1867
  mInfo("mnd show subscriptions begin");
22,603✔
1868

1869
  while (numOfRows < rowsCapacity) {
133,490✔
1870
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
133,490✔
1871
    if (pShow->pIter == NULL) {
133,490✔
1872
      break;
22,603✔
1873
    }
1874

1875
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pBlock, &numOfRows, rowsCapacity));
110,887✔
1876

1877
    sdbRelease(pSdb, pSub);
110,887✔
1878
    pSub = NULL;
110,887✔
1879
  }
1880
  mInfo("mnd end show subscriptions");
22,603✔
1881
  pShow->numOfRows += numOfRows;
22,603✔
1882

1883
END:
22,603✔
1884
  sdbCancelFetch(pSdb, pShow->pIter);
22,603✔
1885
  sdbRelease(pSdb, pSub);
22,603✔
1886

1887
  if (code != 0) {
22,603✔
1888
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1889
    TAOS_RETURN(code);
×
1890
  } else {
1891
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
22,603✔
1892
    return numOfRows;
22,603✔
1893
  }
1894
}
1895

1896
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1897
  if (pMnode == NULL) {
×
1898
    return;
×
1899
  }
1900
  SSdb *pSdb = pMnode->pSdb;
×
1901
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc