• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4925

12 Jan 2026 09:34AM UTC coverage: 66.107% (+0.8%) from 65.354%
#4925

push

travis-ci

web-flow
merge: from main to 3.0 branch #34248

103 of 129 new or added lines in 9 files covered. (79.84%)

891 existing lines in 139 files now uncovered.

200488 of 303278 relevant lines covered (66.11%)

129810096.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.89
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndShow.h"
20
#include "mndTopic.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tcompare.h"
24
#include "tname.h"
25

26
#define MND_SUBSCRIBE_VER_NUMBER   3
27
#define MND_SUBSCRIBE_RESERVE_SIZE 64
28

29
//#define MND_CONSUMER_LOST_HB_CNT          6
30

31
static int32_t mqRebInExecCnt = 0;
32

33
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
34
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
36
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
38
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
39
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
40
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
41
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
42
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
43

44
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
252,337✔
45
  if (pTrans == NULL || pSub == NULL) {
252,337✔
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  int32_t code = 0;
252,337✔
49
  int32_t lino = 0;
252,337✔
50
  PRINT_LOG_START
252,337✔
51
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
252,337✔
52
  MND_TMQ_NULL_CHECK(pCommitRaw);
252,337✔
53
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
252,337✔
54
  if (code != 0) {
252,337✔
55
    sdbFreeRaw(pCommitRaw);
×
56
    goto END;
×
57
  }
58
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
252,337✔
59

60
END:
252,337✔
61
  PRINT_LOG_END
252,337✔
62
  return code;
252,337✔
63
}
64

65
int32_t mndInitSubscribe(SMnode *pMnode) {
398,919✔
66
  SSdbTable table = {
398,919✔
67
      .sdbType = SDB_SUBSCRIBE,
68
      .keyType = SDB_KEY_BINARY,
69
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
70
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
71
      .insertFp = (SdbInsertFp)mndSubActionInsert,
72
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
73
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
74
  };
75

76
  if (pMnode == NULL) {
398,919✔
77
    return TSDB_CODE_INVALID_PARA;
×
78
  }
79
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
398,919✔
80
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
398,919✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
398,919✔
82
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
398,919✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
398,919✔
84

85
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
398,919✔
86
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
398,919✔
87

88
  return sdbSetTable(pMnode->pSdb, table);
398,919✔
89
}
90

91
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
118,651✔
92
  int32_t code = 0;
118,651✔
93
  SSdb *  pSdb = pMnode->pSdb;
118,651✔
94
  SVgObj *pVgroup = NULL;
118,651✔
95

96
  void *pIter = NULL;
118,651✔
97
  while (1) {
674,377✔
98
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
793,028✔
99
    if (pIter == NULL) {
793,028✔
100
      break;
118,651✔
101
    }
102

103
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
674,377✔
104
      sdbRelease(pSdb, pVgroup);
376,285✔
105
      continue;
376,285✔
106
    }
107

108
    pSub->vgNum++;
298,092✔
109

110
    SMqVgEp pVgEp = {0};
298,092✔
111
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
298,092✔
112
    pVgEp.vgId = pVgroup->vgId;
298,092✔
113
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
596,184✔
114
      code = terrno;
×
115
      sdbRelease(pSdb, pVgroup);
×
116
      sdbCancelFetch(pSdb, pIter);
×
117
      goto END;
×
118
    }
119
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
298,092✔
120
    sdbRelease(pSdb, pVgroup);
298,092✔
121
  }
122

123
END:
118,651✔
124
  return code;
118,651✔
125
}
126

127
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
118,651✔
128
                                     SMqSubscribeObj **pSub) {
129
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
118,651✔
130
    return TSDB_CODE_INVALID_PARA;
×
131
  }
132
  int32_t lino = 0;
118,651✔
133
  int32_t code = 0;
118,651✔
134
  PRINT_LOG_START
118,651✔
135
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
118,651✔
136
  (*pSub)->dbUid = pTopic->dbUid;
118,651✔
137
  (*pSub)->stbUid = pTopic->stbUid;
118,651✔
138
  (*pSub)->subType = pTopic->subType;
118,651✔
139
  (*pSub)->withMeta = pTopic->withMeta;
118,651✔
140

141
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
118,651✔
142

143
END:
118,651✔
144
  PRINT_LOG_END
118,651✔
145
  return code;
118,651✔
146
}
147

148
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,667,241✔
149
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,667,241✔
150
    return;
×
151
  }
152
  int32_t i = 0;
1,667,241✔
153
  while (key[i] != TMQ_SEPARATOR_CHAR) {
12,085,196✔
154
    i++;
10,417,955✔
155
  }
156
  (void)memcpy(cgroup, key, i);
1,667,241✔
157
  cgroup[i] = 0;
1,667,241✔
158
  if (fullName) {
1,667,241✔
159
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,533,173✔
160
  } else {
161
    while (key[i] != '.') {
402,204✔
162
      i++;
268,136✔
163
    }
164
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
134,068✔
165
  }
166
}
167

168
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
764,554✔
169
                                    const SMqRebOutputVg *pRebVg) {
170
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
764,554✔
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  SMqRebVgReq  req = {0};
764,554✔
174
  int32_t      code = 0;
764,554✔
175
  int32_t      lino = 0;
764,554✔
176
  SEncoder     encoder = {0};
764,554✔
177
  SMqTopicObj *pTopic = NULL;
764,554✔
178
  void *       buf = NULL;
764,554✔
179

180
  PRINT_LOG_START
764,554✔
181
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
764,554✔
182
  char cgroup[TSDB_CGROUP_LEN] = {0};
764,554✔
183
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
764,554✔
184
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
764,554✔
185
  taosRLockLatch(&pTopic->lock);
764,554✔
186
  req.oldConsumerId = pRebVg->oldConsumerId;
764,554✔
187
  req.newConsumerId = pRebVg->newConsumerId;
764,554✔
188
  req.vgId = pRebVg->pVgEp.vgId;
764,554✔
189
  req.qmsg = pTopic->physicalPlan;
764,554✔
190
  req.schema = pTopic->schema;
764,554✔
191
  req.subType = pSub->subType;
764,554✔
192
  req.withMeta = pSub->withMeta;
764,554✔
193
  req.suid = pSub->stbUid;
764,554✔
194
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
764,554✔
195

196
  int32_t tlen = 0;
764,554✔
197
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
764,554✔
198
  if (code < 0) {
764,554✔
199
    goto END;
×
200
  }
201

202
  tlen += sizeof(SMsgHead);
764,554✔
203
  buf = taosMemoryMalloc(tlen);
764,554✔
204
  MND_TMQ_NULL_CHECK(buf);
764,554✔
205
  SMsgHead *pMsgHead = (SMsgHead *)buf;
764,554✔
206
  pMsgHead->contLen = htonl(tlen);
764,554✔
207
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
764,554✔
208

209
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
764,554✔
210
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
764,554✔
211
  *pBuf = buf;
764,554✔
212
  buf = NULL;
764,554✔
213
  *pLen = tlen;
764,554✔
214

215
END:
764,554✔
216
  PRINT_LOG_END
764,554✔
217
  taosMemoryFree(buf);
764,554✔
218
  if (pTopic != NULL) {
764,554✔
219
    taosRUnLockLatch(&pTopic->lock);
764,554✔
220
  }
221
  mndReleaseTopic(pMnode, pTopic);
764,554✔
222
  tEncoderClear(&encoder);
764,554✔
223
  return code;
764,554✔
224
}
225

226
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
764,554✔
227
                                        const SMqRebOutputVg *pRebVg) {
228
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
764,554✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t code = 0;
764,554✔
232
  int32_t lino = 0;
764,554✔
233
  void *  buf = NULL;
764,554✔
234
  PRINT_LOG_START
764,554✔
235
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
764,554✔
236
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
237
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
238
    goto END;
×
239
  }
240

241
  int32_t tlen = 0;
764,554✔
242
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
764,554✔
243
  int32_t vgId = pRebVg->pVgEp.vgId;
764,554✔
244
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
764,554✔
245
  if (pVgObj == NULL) {
764,554✔
246
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
247
    goto END;
×
248
  }
249

250
  STransAction action = {0};
764,554✔
251
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
764,554✔
252
  action.pCont = buf;
764,554✔
253
  buf = NULL;
764,554✔
254
  action.contLen = tlen;
764,554✔
255
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
764,554✔
256

257
  mndReleaseVgroup(pMnode, pVgObj);
764,554✔
258
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
764,554✔
259

260
END:
764,554✔
261
  PRINT_LOG_END
764,554✔
262
  taosMemoryFree(buf);
764,554✔
263
  return code;
764,554✔
264
}
265

266
static void freeRebalanceItem(void *param) {
317,225✔
267
  if (param == NULL) return;
317,225✔
268
  SMqRebInfo *pInfo = param;
317,225✔
269
  taosArrayDestroy(pInfo->newConsumers);
317,225✔
270
  taosArrayDestroy(pInfo->removedConsumers);
317,225✔
271
}
272

273
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
323,479✔
274
  if (pHash == NULL || key == NULL) {
323,479✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t code = 0;
323,479✔
278
  int32_t lino = 0;
323,479✔
279
  PRINT_LOG_START
323,479✔
280
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
323,479✔
281
  if (pRebInfo == NULL) {
323,479✔
282
    pRebInfo = tNewSMqRebSubscribe(key);
317,225✔
283
    if (pRebInfo == NULL) {
317,225✔
284
      code = terrno;
×
285
      goto END;
×
286
    }
287
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
317,225✔
288
    if (code != 0) {
317,225✔
289
      freeRebalanceItem(pRebInfo);
×
290
      taosMemoryFreeClear(pRebInfo);
×
291
      goto END;
×
292
    }
293
    taosMemoryFreeClear(pRebInfo);
317,225✔
294

295
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
317,225✔
296
    MND_TMQ_NULL_CHECK(pRebInfo);
317,225✔
297
  }
298
  if (pReb) {
323,479✔
299
    *pReb = pRebInfo;
261,810✔
300
  }
301

302
END:
61,669✔
303
  PRINT_LOG_END
323,479✔
304
  return code;
323,479✔
305
}
306

307
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
888,190✔
308
  if (vgs == NULL || pHash == NULL || key == NULL) {
888,190✔
309
    return TSDB_CODE_INVALID_PARA;
×
310
  }
311
  int32_t code = 0;
888,190✔
312
  int32_t lino = 0;
888,190✔
313
  PRINT_LOG_START
888,190✔
314
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
888,190✔
315
  MND_TMQ_NULL_CHECK(pVgEp);
888,190✔
316
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
888,190✔
317
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
888,190✔
318
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
888,190✔
319

320
END:
887,872✔
321
  PRINT_LOG_END
888,190✔
322
  return code;
888,190✔
323
}
324

325
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
317,225✔
326
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
317,225✔
327
    return TSDB_CODE_INVALID_PARA;
×
328
  }
329
  int32_t code = 0;
317,225✔
330
  int32_t lino = 0;
317,225✔
331
  PRINT_LOG_START
317,225✔
332
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
317,225✔
333
  int32_t actualRemoved = 0;
317,225✔
334
  for (int32_t i = 0; i < numOfRemoved; i++) {
431,239✔
335
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
114,014✔
336
    MND_TMQ_NULL_CHECK(consumerId);
114,014✔
337
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
114,014✔
338
    if (pConsumerEp == NULL) {
114,014✔
339
      continue;
×
340
    }
341

342
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
114,014✔
343
    for (int32_t j = 0; j < consumerVgNum; j++) {
479,522✔
344
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
365,508✔
345
    }
346

347
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
114,014✔
348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
228,028✔
349
    actualRemoved++;
114,014✔
350
  }
351

352
  if (numOfRemoved != actualRemoved) {
317,225✔
353
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
354
           numOfRemoved, actualRemoved);
355
  } else {
356
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
317,225✔
357
  }
358
END:
×
359
  PRINT_LOG_END
317,225✔
360
  return code;
317,225✔
361
}
362

363
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
317,225✔
364
  if (pOutput == NULL || pInput == NULL) {
317,225✔
365
    return TSDB_CODE_INVALID_PARA;
×
366
  }
367
  int32_t code = 0;
317,225✔
368
  int32_t lino = 0;
317,225✔
369
  PRINT_LOG_START
317,225✔
370
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
317,225✔
371

372
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
465,021✔
373
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
147,796✔
374
    MND_TMQ_NULL_CHECK(consumerId);
147,796✔
375
    SMqConsumerEp newConsumerEp = {0};
147,796✔
376
    newConsumerEp.consumerId = *consumerId;
147,796✔
377
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
147,796✔
378
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
147,796✔
379
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
147,796✔
380
        0) {
381
      freeSMqConsumerEp(&newConsumerEp);
×
382
      code = terrno;
×
383
      goto END;
×
384
    }
385
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
295,592✔
386
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
147,796✔
387
  }
388
END:
317,225✔
389
  PRINT_LOG_END
317,225✔
390
  return code;
317,225✔
391
}
392

393
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
317,225✔
394
  if (pOutput == NULL || pHash == NULL) {
317,225✔
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
317,225✔
398
  int32_t lino = 0;
317,225✔
399
  PRINT_LOG_START
317,225✔
400
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
317,225✔
401
  for (int32_t i = 0; i < numOfVgroups; i++) {
837,409✔
402
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
520,184✔
403
  }
404
END:
317,225✔
405
  PRINT_LOG_END
317,225✔
406
  return code;
317,225✔
407
}
408

409
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
317,225✔
410
                                        int32_t remainderVgCnt) {
411
  if (pOutput == NULL || pHash == NULL) {
317,225✔
412
    return TSDB_CODE_INVALID_PARA;
×
413
  }
414
  int32_t code = 0;
317,225✔
415
  int32_t lino = 0;
317,225✔
416
  int32_t cnt = 0;
317,225✔
417
  void *  pIter = NULL;
317,225✔
418
  PRINT_LOG_START
317,225✔
419

420
  while (1) {
67,167✔
421
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
384,392✔
422
    if (pIter == NULL) {
384,392✔
423
      break;
317,225✔
424
    }
425

426
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
67,167✔
427
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
67,167✔
428

429
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
134,334✔
430
    if (consumerVgNum > minVgCnt) {
67,167✔
431
      if (cnt < remainderVgCnt) {
1,854✔
432
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
605✔
433
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
434
        }
435
        cnt++;
605✔
436
      } else {
437
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
3,747✔
438
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
2,498✔
439
        }
440
      }
441
    }
442
  }
443
END:
317,225✔
444
  PRINT_LOG_END
317,225✔
445
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
317,225✔
446
  return code;
317,225✔
447
}
448

449
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
317,225✔
450
  if (pMnode == NULL || pOutput == NULL) {
317,225✔
451
    return TSDB_CODE_INVALID_PARA;
×
452
  }
453
  int32_t code = 0;
317,225✔
454
  int32_t lino = 0;
317,225✔
455
  int32_t totalVgNum = 0;
317,225✔
456
  SVgObj *pVgroup = NULL;
317,225✔
457
  void *  pIter = NULL;
317,225✔
458
  void *  pIterHash = NULL;
317,225✔
459
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
317,225✔
460
  MND_TMQ_NULL_CHECK(newVgs);
317,225✔
461
  while (1) {
1,806,249✔
462
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,123,474✔
463
    if (pIter == NULL) {
2,123,474✔
464
      break;
317,225✔
465
    }
466
    if (pVgroup->mountVgId) {
1,806,249✔
467
      sdbRelease(pMnode->pSdb, pVgroup);
×
468
      continue;
×
469
    }
470

471
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,806,249✔
472
      sdbRelease(pMnode->pSdb, pVgroup);
911,242✔
473
      continue;
911,242✔
474
    }
475

476
    totalVgNum++;
895,007✔
477
    SMqVgEp pVgEp = {0};
895,007✔
478
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
895,007✔
479
    pVgEp.vgId = pVgroup->vgId;
895,007✔
480
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
895,007✔
481
    sdbRelease(pMnode->pSdb, pVgroup);
895,007✔
482
  }
483

484
  while (1) {
181,181✔
485
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
498,406✔
486
    if (pIterHash == NULL) break;
498,406✔
487
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
181,181✔
488
    int32_t        j = 0;
181,181✔
489
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
617,673✔
490
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
436,492✔
491
      MND_TMQ_NULL_CHECK(pVgEpTmp);
436,492✔
492
      bool find = false;
436,492✔
493
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
567,246✔
494
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
505,577✔
495
        MND_TMQ_NULL_CHECK(pnewVgEp);
505,577✔
496
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
505,577✔
497
          taosArrayRemove(newVgs, k);
374,823✔
498
          find = true;
374,823✔
499
          break;
374,823✔
500
        }
501
      }
502
      if (!find) {
436,492✔
503
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
61,669✔
504
        taosArrayRemove(pConsumerEp->vgs, j);
61,669✔
505
        continue;
61,669✔
506
      }
507
      j++;
374,823✔
508
    }
509
  }
510

511
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
317,225✔
512
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
61,669✔
513
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
61,669✔
514
  }
515

516
END:
317,225✔
517
  sdbRelease(pMnode->pSdb, pVgroup);
317,225✔
518
  sdbCancelFetch(pMnode->pSdb, pIter);
317,225✔
519
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
317,225✔
520
  taosArrayDestroy(newVgs);
317,225✔
521
  if (code != 0) {
317,225✔
522
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
523
    return code;
×
524
  } else {
525
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
317,225✔
526
    return totalVgNum;
317,225✔
527
  }
528
}
529

530
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
317,225✔
531
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
317,225✔
532
    return TSDB_CODE_INVALID_PARA;
×
533
  }
534
  void *           pIter = NULL;
317,225✔
535
  SMqSubscribeObj *pSub = NULL;
317,225✔
536
  int32_t          lino = 0;
317,225✔
537
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
317,225✔
538
  if (code != 0) {
317,225✔
539
    return 0;
118,651✔
540
  }
541
  taosRLockLatch(&pSub->lock);
198,574✔
542
  PRINT_LOG_START
198,574✔
543
  if (pOutput->pSub->offsetRows == NULL) {
198,574✔
544
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
148,273✔
545
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
148,273✔
546
  }
547
  while (1) {
181,181✔
548
    pIter = taosHashIterate(pSub->consumerHash, pIter);
379,755✔
549
    if (pIter == NULL) break;
379,755✔
550
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
181,181✔
551
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
181,181✔
552

553
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
606,698✔
554
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
425,517✔
555
      MND_TMQ_NULL_CHECK(d1);
425,517✔
556
      bool jump = false;
425,517✔
557
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
550,712✔
558
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
128,909✔
559
        MND_TMQ_NULL_CHECK(pVgEp);
128,909✔
560
        if (pVgEp->vgId == d1->vgId) {
128,909✔
561
          jump = true;
3,714✔
562
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
3,714✔
563
                pConsumerEp->consumerId, pVgEp->vgId);
564
          break;
3,714✔
565
        }
566
      }
567
      if (jump) continue;
425,517✔
568
      bool find = false;
421,803✔
569
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,079,672✔
570
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
758,356✔
571
        MND_TMQ_NULL_CHECK(d2);
758,356✔
572
        if (d1->vgId == d2->vgId) {
758,356✔
573
          d2->rows += d1->rows;
100,487✔
574
          d2->offset = d1->offset;
100,487✔
575
          d2->ever = d1->ever;
100,487✔
576
          find = true;
100,487✔
577
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
100,487✔
578
          break;
100,487✔
579
        }
580
      }
581
      if (!find) {
421,803✔
582
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
642,632✔
583
      }
584
    }
585
  }
586

587
END:
198,574✔
588
  taosRUnLockLatch(&pSub->lock);
198,574✔
589
  taosHashCancelIterate(pSub->consumerHash, pIter);
198,574✔
590
  mndReleaseSubscribe(pMnode, pSub);
198,574✔
591
  PRINT_LOG_END
198,574✔
592
  return code;
198,574✔
593
}
594

595
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
317,225✔
596
  if (pOutput == NULL) return;
317,225✔
597
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
317,225✔
598
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,205,415✔
599
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
888,190✔
600
    if (pOutputRebVg == NULL) continue;
888,190✔
601
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
888,190✔
602
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
603
  }
604

605
  void *pIter = NULL;
317,225✔
606
  while (1) {
214,963✔
607
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
532,188✔
608
    if (pIter == NULL) break;
532,188✔
609
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
214,963✔
610
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
214,963✔
611
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
214,963✔
612
          pConsumerEp->consumerId, sz);
613
    for (int32_t i = 0; i < sz; i++) {
751,569✔
614
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
536,606✔
615
      if (pVgEp == NULL) continue;
536,606✔
616
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
536,606✔
617
            pConsumerEp->consumerId);
618
    }
619
  }
620
}
621

622
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
317,225✔
623
                           int32_t *remainderVgCnt) {
624
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
317,225✔
625
    return;
×
626
  }
627
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
317,225✔
628
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
317,225✔
629
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
317,225✔
630

631
  // calc num
632
  if (numOfFinal != 0) {
317,225✔
633
    *minVgCnt = totalVgNum / numOfFinal;
209,389✔
634
    *remainderVgCnt = totalVgNum % numOfFinal;
209,389✔
635
  } else {
636
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
107,836✔
637
  }
638
  mInfo(
317,225✔
639
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
640
      "remainderVg:%d",
641
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
642
}
643

644
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
317,225✔
645
  if (pOutput == NULL || pHash == NULL) {
317,225✔
646
    return TSDB_CODE_INVALID_PARA;
×
647
  }
648
  SMqRebOutputVg *pRebVg = NULL;
317,225✔
649
  void *          pAssignIter = NULL;
317,225✔
650
  void *          pIter = NULL;
317,225✔
651
  int32_t         code = 0;
317,225✔
652
  int32_t         lino = 0;
317,225✔
653
  PRINT_LOG_START
317,225✔
654

655
  while (1) {
214,963✔
656
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
532,188✔
657
    if (pIter == NULL) {
532,188✔
658
      break;
317,225✔
659
    }
660
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
214,963✔
661
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
743,570✔
662
      pAssignIter = taosHashIterate(pHash, pAssignIter);
528,607✔
663
      if (pAssignIter == NULL) {
528,607✔
664
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
665
        break;
×
666
      }
667

668
      pRebVg = (SMqRebOutputVg *)pAssignIter;
528,607✔
669
      pRebVg->newConsumerId = pConsumerEp->consumerId;
528,607✔
670
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,057,214✔
671
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
528,607✔
672
            pConsumerEp->consumerId);
673
    }
674
  }
675

676
  while (1) {
1,182✔
677
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
318,407✔
678
    if (pIter == NULL) {
318,407✔
679
      break;
107,836✔
680
    }
681
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
210,571✔
682
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
210,571✔
683
      pAssignIter = taosHashIterate(pHash, pAssignIter);
210,571✔
684
      if (pAssignIter == NULL) {
210,571✔
685
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
209,389✔
686
        break;
209,389✔
687
      }
688

689
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,182✔
690
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,182✔
691
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
2,364✔
692
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
1,182✔
693
            pConsumerEp->consumerId);
694
    }
695
  }
696

697
  if (pAssignIter != NULL) {
317,225✔
698
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
699
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
700
    goto END;
×
701
  }
702
  while (1) {
888,190✔
703
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,205,415✔
704
    if (pAssignIter == NULL) {
1,205,415✔
705
      break;
317,225✔
706
    }
707

708
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
888,190✔
709
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,776,380✔
710
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
888,190✔
711
      MND_TMQ_NULL_CHECK(
716,802✔
712
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
713
    }
714
  }
715

716
END:
317,225✔
717
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
317,225✔
718
  taosHashCancelIterate(pHash, pAssignIter);
317,225✔
719
  PRINT_LOG_END
317,225✔
720
  return code;
317,225✔
721
}
722

723
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
317,225✔
724
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
317,225✔
725
    return TSDB_CODE_INVALID_PARA;
×
726
  }
727
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
317,225✔
728
  if (totalVgNum < 0) {
317,225✔
729
    return totalVgNum;
×
730
  }
731
  const char *pSubKey = pOutput->pSub->key;
317,225✔
732
  int32_t     minVgCnt = 0;
317,225✔
733
  int32_t     remainderVgCnt = 0;
317,225✔
734
  int32_t     code = 0;
317,225✔
735
  int32_t     lino = 0;
317,225✔
736
  PRINT_LOG_START
317,225✔
737
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
317,225✔
738
  MND_TMQ_NULL_CHECK(pHash);
317,225✔
739
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
317,225✔
740
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
317,225✔
741
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
317,225✔
742
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
317,225✔
743
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
317,225✔
744
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
317,225✔
745
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
317,225✔
746
  printRebalanceLog(pOutput);
317,225✔
747

748
END:
317,225✔
749
  taosHashCleanup(pHash);
317,225✔
750
  PRINT_LOG_END
317,225✔
751
  return code;
317,225✔
752
}
753

754
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
757,011✔
755
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
757,011✔
756
    return TSDB_CODE_INVALID_PARA;
×
757
  }
758
  int32_t code = 0;
757,011✔
759
  int32_t lino = 0;
757,011✔
760
  PRINT_LOG_START
757,011✔
761
  SMqConsumerObj *pConsumerNew = NULL;
757,011✔
762
  int32_t         consumerNum = taosArrayGetSize(consumers);
757,011✔
763
  for (int32_t i = 0; i < consumerNum; i++) {
1,021,100✔
764
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
264,089✔
765
    MND_TMQ_NULL_CHECK(consumerId);
264,089✔
766
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
264,089✔
767
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
264,089✔
768
    tDeleteSMqConsumerObj(pConsumerNew);
264,089✔
769
  }
770
  pConsumerNew = NULL;
757,011✔
771

772
END:
757,011✔
773
  PRINT_LOG_END
757,011✔
774
  tDeleteSMqConsumerObj(pConsumerNew);
757,011✔
775
  return code;
757,011✔
776
}
777

778
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
252,337✔
779
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
252,337✔
780
    return TSDB_CODE_INVALID_PARA;
×
781
  }
782
  int32_t code = 0;
252,337✔
783
  int32_t lino = 0;
252,337✔
784
  PRINT_LOG_START
252,337✔
785
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
252,337✔
786
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
252,337✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
252,337✔
788
END:
252,337✔
789
  PRINT_LOG_END
252,337✔
790
  return code;
252,337✔
791
}
792

793
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
317,225✔
794
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
317,225✔
795
    return TSDB_CODE_INVALID_PARA;
×
796
  }
797
  int32_t code = 0;
317,225✔
798
  int32_t lino = 0;
317,225✔
799
  STrans *pTrans = NULL;
317,225✔
800
  PRINT_LOG_START
317,225✔
801

802
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
317,225✔
803
  char cgroup[TSDB_CGROUP_LEN] = {0};
317,225✔
804
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
317,225✔
805

806
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
317,225✔
807
  if (pTrans == NULL) {
317,225✔
808
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
809
    if (terrno != 0) code = terrno;
×
810
    goto END;
×
811
  }
812

813
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
317,225✔
814
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
317,225✔
815

816
  // 1. redo action: action to all vg
817
  const SArray *rebVgs = pOutput->rebVgs;
252,337✔
818
  int32_t       vgNum = taosArrayGetSize(rebVgs);
252,337✔
819
  for (int32_t i = 0; i < vgNum; i++) {
1,016,891✔
820
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
764,554✔
821
    MND_TMQ_NULL_CHECK(pRebVg);
764,554✔
822
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
764,554✔
823
  }
824

825
  // 2. commit log: subscribe and vg assignment
826
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
252,337✔
827

828
  // 3. commit log: consumer to update status and epoch
829
  if (!pOutput->isReload) {
252,337✔
830
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
252,337✔
831
  }
832

833
  // 4. set cb
834
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
252,337✔
835

836
  // 5. execution
837
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
252,337✔
838

839
END:
317,225✔
840
  mndTransDrop(pTrans);
317,225✔
841
  PRINT_LOG_END
317,225✔
842
  TAOS_RETURN(code);
317,225✔
843
}
844

845
// type = 0 remove  type = 1 add
846
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
510,121✔
847
  if (rebSubHash == NULL || topicList == NULL) {
510,121✔
848
    return TSDB_CODE_INVALID_PARA;
×
849
  }
850
  int32_t code = 0;
510,121✔
851
  int32_t lino = 0;
510,121✔
852
  PRINT_LOG_START
510,121✔
853
  int32_t topicNum = taosArrayGetSize(topicList);
510,121✔
854
  for (int32_t i = 0; i < topicNum; i++) {
771,931✔
855
    char *removedTopic = taosArrayGetP(topicList, i);
261,810✔
856
    MND_TMQ_NULL_CHECK(removedTopic);
261,810✔
857
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
261,810✔
858
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
261,810✔
859
    SMqRebInfo *pRebSub = NULL;
261,810✔
860
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
261,810✔
861
    if (type == 0)
261,810✔
862
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
228,028✔
863
    else if (type == 1)
147,796✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
295,592✔
865
  }
866

867
END:
510,121✔
868
  PRINT_LOG_END
510,121✔
869
  return code;
510,121✔
870
}
871

872
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
1,108,298✔
873
  SMqSubscribeObj *pSub = NULL;
1,108,298✔
874
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,108,298✔
875
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,108,298✔
876
  int32_t code = 0;
1,108,298✔
877
  int32_t lino = 0;
1,108,298✔
878

879
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
1,108,298✔
880
  taosRLockLatch(&pSub->lock);
1,108,298✔
881
  // iterate all vg assigned to the consumer of that topic
882
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,108,298✔
883
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,108,298✔
884
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,108,298✔
885
  for (int32_t j = 0; j < vgNum; j++) {
3,580,949✔
886
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
2,472,651✔
887
    if (pVgEp == NULL) {
2,472,651✔
888
      continue;
×
889
    }
890
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,472,651✔
891
    if (!pVgroup) {
2,472,651✔
892
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
61,669✔
893
      if (code != 0) {
61,669✔
894
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
895
      } else {
896
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
61,669✔
897
      }
898
    }
899
    mndReleaseVgroup(pMnode, pVgroup);
2,472,651✔
900
  }
901

902
END:
1,108,298✔
903
  if (pSub != NULL) {
1,108,298✔
904
    taosRUnLockLatch(&pSub->lock);
1,108,298✔
905
  }
906
  mndReleaseSubscribe(pMnode, pSub);
1,108,298✔
907
}
1,108,298✔
908

909
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
1,070,726✔
910
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
1,070,726✔
911
    return;
×
912
  }
913
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
1,070,726✔
914
  for (int32_t i = 0; i < newTopicNum; i++) {
2,179,024✔
915
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,108,298✔
916
    if (topic == NULL) {
1,108,298✔
917
      continue;
×
918
    }
919
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
1,108,298✔
920
  }
921
}
922

923
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,327,852✔
924
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,652,895✔
925
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,325,043✔
926
}
927

928
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,429,810✔
929
  int32_t code = 0;
1,429,810✔
930
  int32_t lino = 0;
1,429,810✔
931
  PRINT_LOG_START
1,429,810✔
932
  taosRLockLatch(&pConsumer->lock);
1,429,810✔
933

934
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,429,810✔
935
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,429,810✔
936
  int32_t status = atomic_load_32(&pConsumer->status);
1,429,810✔
937

938
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,429,810✔
939
         ", hbstatus:%d, pollStatus:%d",
940
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
941
         hbStatus, pollStatus);
942

943
  if (status == MQ_CONSUMER_STATUS_READY) {
1,429,810✔
944
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,176,187✔
945
      MND_TMQ_RETURN_CHECK(
101,958✔
946
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
947
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
1,074,229✔
948
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,503✔
949
            ", hb lost cnt:%d, or long time no poll cnt:%d",
950
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
951
            pConsumer->createTime, hbStatus, pollStatus);
952
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,503✔
953
    } else {
954
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
1,070,726✔
955
    }
956
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
253,623✔
957
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
253,309✔
958
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
253,309✔
959
  } else {
960
    MND_TMQ_RETURN_CHECK(
314✔
961
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
962
  }
963

964
END:
314✔
965
  taosRUnLockLatch(&pConsumer->lock);
1,429,810✔
966
  PRINT_LOG_END
1,429,810✔
967
  return code;
1,429,810✔
968
}
969

970
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
12,982,311✔
971
  if (pMsg == NULL || rebSubHash == NULL) {
12,982,311✔
972
    return TSDB_CODE_INVALID_PARA;
×
973
  }
974
  SMnode *        pMnode = pMsg->info.node;
12,982,311✔
975
  SSdb *          pSdb = pMnode->pSdb;
12,982,311✔
976
  SMqConsumerObj *pConsumer = NULL;
12,982,311✔
977
  void *          pIter = NULL;
12,982,311✔
978
  int32_t         code = 0;
12,982,311✔
979
  int32_t         lino = 0;
12,982,311✔
980
  PRINT_LOG_START
12,982,311✔
981

982
  // iterate all consumers, find all modification
983
  while (1) {
984
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
14,412,121✔
985
    if (pIter == NULL) {
14,412,121✔
986
      break;
12,982,311✔
987
    }
988
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,429,810✔
989
    mndReleaseConsumer(pMnode, pConsumer);
1,429,810✔
990
  }
991
END:
12,982,311✔
992
  PRINT_LOG_END
12,982,311✔
993
  sdbCancelFetch(pSdb, pIter);
12,982,311✔
994
  mndReleaseConsumer(pMnode, pConsumer);
12,982,311✔
995
  return code;
12,982,311✔
996
}
997

998
bool mndRebTryStart() {
12,982,311✔
999
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
12,982,311✔
1000
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
12,982,311✔
1001
}
1002

1003
void mndRebCntInc() {
254,993✔
1004
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
254,993✔
1005
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
254,993✔
1006
}
254,993✔
1007

1008
void mndRebCntDec() {
13,237,304✔
1009
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
13,237,304✔
1010
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
13,237,304✔
1011
}
13,237,304✔
1012

1013
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
634,450✔
1014
  if (rebOutput == NULL) {
634,450✔
1015
    return;
317,225✔
1016
  }
1017
  taosArrayDestroy(rebOutput->newConsumers);
317,225✔
1018
  rebOutput->newConsumers = NULL;
317,225✔
1019
  taosArrayDestroy(rebOutput->modifyConsumers);
317,225✔
1020
  rebOutput->modifyConsumers = NULL;
317,225✔
1021
  taosArrayDestroy(rebOutput->removedConsumers);
317,225✔
1022
  rebOutput->removedConsumers = NULL;
317,225✔
1023
  taosArrayDestroy(rebOutput->rebVgs);
317,225✔
1024
  rebOutput->rebVgs = NULL;
317,225✔
1025
  tDeleteSubscribeObj(rebOutput->pSub);
317,225✔
1026
  taosMemoryFree(rebOutput->pSub);
317,225✔
1027
  rebOutput->pSub = NULL;
317,225✔
1028
}
1029

1030
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
317,225✔
1031
  if (rebOutput == NULL) {
317,225✔
1032
    return TSDB_CODE_INVALID_PARA;
×
1033
  }
1034
  int32_t code = 0;
317,225✔
1035
  int32_t lino = 0;
317,225✔
1036
  PRINT_LOG_START
317,225✔
1037
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
317,225✔
1038
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
317,225✔
1039
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
317,225✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
317,225✔
1041
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
317,225✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
317,225✔
1043
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
317,225✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
317,225✔
1045
  rebOutput = NULL;
317,225✔
1046

1047
END:
317,225✔
1048
  PRINT_LOG_END
317,225✔
1049
  clearRebOutput(rebOutput);
317,225✔
1050
  return code;
317,225✔
1051
}
1052

1053
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
317,225✔
1054
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
317,225✔
1055
    return TSDB_CODE_INVALID_PARA;
×
1056
  }
1057
  const char *     key = rebInput->pRebInfo->key;
317,225✔
1058
  SMqSubscribeObj *pSub = NULL;
317,225✔
1059
  SMqTopicObj *    pTopic = NULL;
317,225✔
1060
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
317,225✔
1061
  int32_t          lino = 0;
317,225✔
1062
  PRINT_LOG_START
317,225✔
1063

1064
  if (code != 0) {
317,225✔
1065
    // split sub key and extract topic
1066
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
118,651✔
1067
    char cgroup[TSDB_CGROUP_LEN] = {0};
118,651✔
1068
    mndSplitSubscribeKey(key, topic, cgroup, true);
118,651✔
1069
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
118,651✔
1070
    taosRLockLatch(&pTopic->lock);
118,651✔
1071
    rebInput->oldConsumerNum = 0;
118,651✔
1072
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
118,651✔
1073
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
118,651✔
1074
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
118,651✔
1075
  } else {
1076
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
198,574✔
1077
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
198,574✔
1078

1079
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
198,574✔
1080
          taosHashGetSize(rebOutput->pSub->consumerHash));
1081
  }
1082

1083
END:
316,907✔
1084
  PRINT_LOG_END
317,225✔
1085
  if (pTopic != NULL) {
317,225✔
1086
    taosRUnLockLatch(&pTopic->lock);
118,651✔
1087
  }
1088
  mndReleaseTopic(pMnode, pTopic);
317,225✔
1089
  mndReleaseSubscribe(pMnode, pSub);
317,225✔
1090
  return code;
317,225✔
1091
}
1092

1093
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1094
  int32_t code = 0;
×
1095
  int32_t lino = 0;
×
1096

1097
  void *pIterConsumer = NULL;
×
1098

1099
  PRINT_LOG_START
×
1100
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1101
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1102

1103
  SMqConsumerEp *pConsumerEp = NULL;
×
1104

1105
  while (1) {
1106
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1107
    if (pIterConsumer == NULL) break;
×
1108
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1109

1110
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1111
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1112
      MND_TMQ_NULL_CHECK(pVgEp);
×
1113
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1114
      MND_TMQ_NULL_CHECK(vg);
×
1115
      vg->pVgEp = *pVgEp;
×
1116
      vg->oldConsumerId = -1;
×
1117
      vg->newConsumerId = pConsumerEp->consumerId;
×
1118
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1119
             vg->newConsumerId);
1120
    }
1121
  }
1122
END:
×
1123
  PRINT_LOG_END
×
1124
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1125
  return code;
×
1126
}
1127

1128
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1129
  int32_t code = 0;
×
1130
  int32_t lino = 0;
×
1131

1132
  SMnode *        pMnode = pMsg->info.node;
×
1133
  SMqRebOutputObj rebOutput = {0};
×
1134

1135
  PRINT_LOG_START
×
1136
  taosRLockLatch(&pSub->lock);
×
1137

1138
  // topic and cgroup
1139
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1140
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1141
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1142
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1143
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1144
    goto END;
×
1145
  }
1146

1147
  rebOutput.pSub = pSub;
×
1148
  rebOutput.isReload = true;
×
1149
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1150
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1151
  if (code != 0) {
×
1152
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1153
  }
1154

1155
END:
×
1156
  taosRUnLockLatch(&pSub->lock);
×
1157
  rebOutput.pSub = NULL;  // avoid double free
×
1158
  clearRebOutput(&rebOutput);
×
1159
  PRINT_LOG_END
×
1160
  return code;
×
1161
}
1162

1163
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1164
  SMnode *pMnode = pMsg->info.node;
×
1165

1166
  SSdb *           pSdb = pMnode->pSdb;
×
1167
  SMqSubscribeObj *pSub = NULL;
×
1168
  int32_t          code = 0;
×
1169
  int32_t          lino = 0;
×
1170

1171
  PRINT_LOG_START
×
1172
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1173
  void *pIter = NULL;
×
1174
  while (1) {
1175
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1176
    if (pIter == NULL) {
×
1177
      break;
×
1178
    }
1179

1180
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1181
    sdbRelease(pSdb, pSub);
×
1182
  }
1183
  taosHashClear(topicsToReload);
×
1184
END:
×
1185
  sdbCancelFetch(pSdb, pIter);
×
1186
  sdbRelease(pSdb, pSub);
×
1187
  PRINT_LOG_END
×
1188

1189
  return code;
×
1190
}
1191

1192
static int32_t normalRebalance(SRpcMsg *pMsg) {
12,982,311✔
1193
  int     code = 0;
12,982,311✔
1194
  int32_t lino = 0;
12,982,311✔
1195

1196
  void *  pIter = NULL;
12,982,311✔
1197
  SMnode *pMnode = pMsg->info.node;
12,982,311✔
1198

1199
  PRINT_LOG_START
12,982,311✔
1200
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
12,982,311✔
1201
  MND_TMQ_NULL_CHECK(rebSubHash);
12,982,311✔
1202

1203
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
12,982,311✔
1204

1205
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
12,982,311✔
1206
  if (taosHashGetSize(rebSubHash) > 0) {
12,982,311✔
1207
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
276,533✔
1208
  }
1209

1210
  while (1) {
317,225✔
1211
    pIter = taosHashIterate(rebSubHash, pIter);
13,299,536✔
1212
    if (pIter == NULL) {
13,299,536✔
1213
      break;
12,982,311✔
1214
    }
1215

1216
    SMqRebInputObj  rebInput = {0};
317,225✔
1217
    SMqRebOutputObj rebOutput = {0};
317,225✔
1218
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
317,225✔
1219
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
317,225✔
1220
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
317,225✔
1221
    if (code != 0) {
317,225✔
1222
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1223
    }
1224

1225
    if (code == 0) {
317,225✔
1226
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
317,225✔
1227
      if (code != 0) {
317,225✔
1228
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1229
      }
1230
    }
1231

1232
    if (code == 0) {
317,225✔
1233
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
317,225✔
1234
      if (code != 0) {
317,225✔
1235
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
64,888✔
1236
      }
1237
    }
1238

1239
    clearRebOutput(&rebOutput);
317,225✔
1240
  }
1241

1242
  if (taosHashGetSize(rebSubHash) > 0) {
12,982,311✔
1243
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
276,533✔
1244
  }
1245

1246
END:
12,705,778✔
1247
  PRINT_LOG_END
12,982,311✔
1248
  taosHashCancelIterate(rebSubHash, pIter);
12,982,311✔
1249
  taosHashCleanup(rebSubHash);
12,982,311✔
1250
  return code;
12,982,311✔
1251
}
1252

1253
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
12,982,311✔
1254
  if (pMsg == NULL) {
12,982,311✔
1255
    return TSDB_CODE_INVALID_PARA;
×
1256
  }
1257
  int     code = 0;
12,982,311✔
1258
  int32_t lino = 0;
12,982,311✔
1259

1260
  void *  pIter = NULL;
12,982,311✔
1261
  SMnode *pMnode = pMsg->info.node;
12,982,311✔
1262
  PRINT_LOG_START;
12,982,311✔
1263
  if (!mndRebTryStart()) {
12,982,311✔
UNCOV
1264
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
UNCOV
1265
    goto END;
×
1266
  }
1267

1268
  if (taosHashGetSize(topicsToReload) > 0) {
12,982,311✔
1269
    code = reloadRebalance(pMsg);
×
1270
  } else {
1271
    code = normalRebalance(pMsg);
12,982,311✔
1272
  }
1273

1274
  mndRebCntDec();
12,982,311✔
1275

1276
END:
12,982,311✔
1277
  PRINT_LOG_END
12,982,311✔
1278
  TAOS_RETURN(code);
12,982,311✔
1279
}
1280

1281
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
71,688✔
1282
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
71,688✔
1283
    return TSDB_CODE_INVALID_PARA;
×
1284
  }
1285
  void *  pIter = NULL;
71,688✔
1286
  SVgObj *pVgObj = NULL;
71,688✔
1287
  int32_t code = 0;
71,688✔
1288
  int32_t lino = 0;
71,688✔
1289
  PRINT_LOG_START
71,688✔
1290
  while (1) {
523,440✔
1291
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
595,128✔
1292
    if (pIter == NULL) {
595,128✔
1293
      break;
71,688✔
1294
    }
1295
    if (pVgObj->mountVgId) {
523,440✔
1296
      sdbRelease(pMnode->pSdb, pVgObj);
×
1297
      continue;
×
1298
    }
1299

1300
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
523,440✔
1301
      sdbRelease(pMnode->pSdb, pVgObj);
285,886✔
1302
      continue;
285,886✔
1303
    }
1304
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
237,554✔
1305
    MND_TMQ_NULL_CHECK(pReq);
237,554✔
1306
    pReq->head.vgId = htonl(pVgObj->vgId);
237,554✔
1307
    pReq->vgId = pVgObj->vgId;
237,554✔
1308
    pReq->consumerId = -1;
237,554✔
1309
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
237,554✔
1310

1311
    STransAction action = {0};
237,554✔
1312
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
237,554✔
1313
    action.pCont = pReq;
237,554✔
1314
    action.contLen = sizeof(SMqVDeleteReq);
237,554✔
1315
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
237,554✔
1316
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
237,554✔
1317

1318
    sdbRelease(pMnode->pSdb, pVgObj);
237,554✔
1319
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
237,554✔
1320
  }
1321

1322
END:
71,688✔
1323
  PRINT_LOG_END
71,688✔
1324
  sdbRelease(pMnode->pSdb, pVgObj);
71,688✔
1325
  sdbCancelFetch(pMnode->pSdb, pIter);
71,688✔
1326
  return code;
71,688✔
1327
}
1328

1329
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
564✔
1330
                                   char *cgroup) {
1331
  int32_t         code = 0;
564✔
1332
  int32_t         lino = 0;
564✔
1333
  SMqConsumerObj *pConsumerNew = NULL;
564✔
1334

1335
  taosRLockLatch(&pConsumer->lock);
564✔
1336

1337
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
564✔
1338
    goto END;
×
1339
  }
1340

1341
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
564✔
1342
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
564✔
1343
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
564✔
1344
  if (found1 || found2 || found3) {
564✔
1345
    if (deleteConsumer) {
314✔
1346
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
314✔
1347
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
314✔
1348
      tDeleteSMqConsumerObj(pConsumerNew);
314✔
1349
      pConsumerNew = NULL;
314✔
1350
    } else {
1351
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1352
             pConsumer->consumerId, pConsumer->cgroup);
1353
      code = TSDB_CODE_MND_CGROUP_USED;
×
1354
      goto END;
×
1355
    }
1356
  }
1357

1358
END:
564✔
1359
  tDeleteSMqConsumerObj(pConsumerNew);
564✔
1360
  taosRUnLockLatch(&pConsumer->lock);
564✔
1361
  return code;
564✔
1362
}
1363

1364
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
564✔
1365
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
564✔
1366
    return TSDB_CODE_INVALID_PARA;
×
1367
  }
1368
  void *          pIter = NULL;
564✔
1369
  SMqConsumerObj *pConsumer = NULL;
564✔
1370
  int             code = 0;
564✔
1371
  int32_t         lino = 0;
564✔
1372
  PRINT_LOG_START
564✔
1373
  while (1) {
1374
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,128✔
1375
    if (pIter == NULL) {
1,128✔
1376
      break;
564✔
1377
    }
1378
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
564✔
1379
    sdbRelease(pMnode->pSdb, pConsumer);
564✔
1380
  }
1381

1382
END:
564✔
1383
  sdbRelease(pMnode->pSdb, pConsumer);
564✔
1384
  sdbCancelFetch(pMnode->pSdb, pIter);
564✔
1385
  return code;
564✔
1386
}
1387

1388
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
564✔
1389
  int32_t code = 0;
564✔
1390
  int32_t lino = 0;
564✔
1391
  STrans *pTrans = NULL;
564✔
1392
  SMnode *pMnode = pMsg->info.node;
564✔
1393
  PRINT_LOG_START
564✔
1394
  taosRLockLatch(&pSub->lock);
564✔
1395
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
564✔
1396
    code = TSDB_CODE_MND_CGROUP_USED;
×
1397
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1398
    goto END;
×
1399
  }
1400

1401
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
564✔
1402
  MND_TMQ_NULL_CHECK(pTrans);
564✔
1403
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
564✔
1404
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
564✔
1405
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
564✔
1406
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
564✔
1407
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
564✔
1408
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
564✔
1409
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
564✔
1410

1411
END:
564✔
1412
  taosRUnLockLatch(&pSub->lock);
564✔
1413
  mndTransDrop(pTrans);
564✔
1414
  PRINT_LOG_END
564✔
1415
  return code;
564✔
1416
}
1417

1418
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
564✔
1419
  if (pMsg == NULL) {
564✔
1420
    return TSDB_CODE_INVALID_PARA;
×
1421
  }
1422
  SMnode *        pMnode = pMsg->info.node;
564✔
1423
  SMDropCgroupReq dropReq = {0};
564✔
1424
  int32_t         code = 0;
564✔
1425
  int32_t         lino = 0;
564✔
1426

1427
  SMqSubscribeObj *pSub = NULL;
564✔
1428

1429
  PRINT_LOG_START
564✔
1430
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
564✔
1431
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
564✔
1432
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
564✔
1433
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
564✔
1434
  if (code != 0) {
564✔
1435
    if (dropReq.igNotExists) {
×
1436
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1437
      mndReleaseSubscribe(pMnode, pSub);
×
1438
      return 0;
×
1439
    } else {
1440
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1441
      goto END;
×
1442
    }
1443
  }
1444
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
564✔
1445

1446
END:
564✔
1447
  mndReleaseSubscribe(pMnode, pSub);
564✔
1448
  PRINT_LOG_END
564✔
1449

1450
  if (code != 0) {
564✔
1451
    TAOS_RETURN(code);
×
1452
  }
1453
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
564✔
1454
}
1455

1456
void mndCleanupSubscribe(SMnode *pMnode) {}
398,860✔
1457

1458
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
419,270✔
1459
  if (pSub == NULL) {
419,270✔
1460
    return NULL;
×
1461
  }
1462
  int32_t code = 0;
419,270✔
1463
  int32_t lino = 0;
419,270✔
1464
  terrno = TSDB_CODE_OUT_OF_MEMORY;
419,270✔
1465
  void *  buf = NULL;
419,270✔
1466
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
419,270✔
1467
  if (tlen <= 0) goto SUB_ENCODE_OVER;
419,270✔
1468
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
419,270✔
1469

1470
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
419,270✔
1471
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
419,270✔
1472

1473
  buf = taosMemoryMalloc(tlen);
419,270✔
1474
  if (buf == NULL) goto SUB_ENCODE_OVER;
419,270✔
1475

1476
  void *abuf = buf;
419,270✔
1477
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
419,270✔
1478
    goto SUB_ENCODE_OVER;
×
1479
  }
1480

1481
  int32_t dataPos = 0;
419,270✔
1482
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
419,270✔
1483
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
419,270✔
1484
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
419,270✔
1485
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
419,270✔
1486

1487
  terrno = TSDB_CODE_SUCCESS;
419,270✔
1488

1489
SUB_ENCODE_OVER:
419,270✔
1490
  taosMemoryFreeClear(buf);
419,270✔
1491
  if (terrno != TSDB_CODE_SUCCESS) {
419,270✔
1492
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1493
    sdbFreeRaw(pRaw);
×
1494
    return NULL;
×
1495
  }
1496

1497
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
419,270✔
1498
  return pRaw;
419,270✔
1499
}
1500

1501
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
337,787✔
1502
  if (pRaw == NULL) {
337,787✔
1503
    return NULL;
×
1504
  }
1505
  int32_t code = 0;
337,787✔
1506
  int32_t lino = 0;
337,787✔
1507
  terrno = TSDB_CODE_OUT_OF_MEMORY;
337,787✔
1508
  SSdbRow *        pRow = NULL;
337,787✔
1509
  SMqSubscribeObj *pSub = NULL;
337,787✔
1510
  void *           buf = NULL;
337,787✔
1511

1512
  int8_t sver = 0;
337,787✔
1513
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
337,787✔
1514

1515
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
337,787✔
1516
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1517
    goto SUB_DECODE_OVER;
×
1518
  }
1519

1520
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
337,787✔
1521
  if (pRow == NULL) goto SUB_DECODE_OVER;
337,787✔
1522

1523
  pSub = sdbGetRowObj(pRow);
337,787✔
1524
  if (pSub == NULL) goto SUB_DECODE_OVER;
337,787✔
1525

1526
  int32_t dataPos = 0;
337,787✔
1527
  int32_t tlen;
337,469✔
1528
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
337,787✔
1529
  buf = taosMemoryMalloc(tlen);
337,787✔
1530
  if (buf == NULL) goto SUB_DECODE_OVER;
337,787✔
1531
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
337,787✔
1532
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
337,787✔
1533

1534
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
337,787✔
1535
    goto SUB_DECODE_OVER;
×
1536
  }
1537

1538
  // update epset saved in mnode
1539
  if (pSub->unassignedVgs != NULL) {
337,787✔
1540
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
337,787✔
1541
    for (int32_t i = 0; i < size; ++i) {
934,100✔
1542
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
596,313✔
1543
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
596,313✔
1544
    }
1545
  }
1546
  if (pSub->consumerHash != NULL) {
337,787✔
1547
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
337,787✔
1548
    while (pIter) {
500,061✔
1549
      SMqConsumerEp *pConsumerEp = pIter;
162,274✔
1550
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
162,274✔
1551
      for (int32_t i = 0; i < size; ++i) {
599,146✔
1552
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
436,872✔
1553
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
436,872✔
1554
      }
1555
      pIter = taosHashIterate(pSub->consumerHash, pIter);
162,274✔
1556
    }
1557
  }
1558

1559
  terrno = TSDB_CODE_SUCCESS;
337,787✔
1560

1561
SUB_DECODE_OVER:
337,787✔
1562
  taosMemoryFreeClear(buf);
337,787✔
1563
  if (terrno != TSDB_CODE_SUCCESS) {
337,787✔
1564
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1565
    taosMemoryFreeClear(pRow);
×
1566
    return NULL;
×
1567
  }
1568

1569
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
337,787✔
1570
  return pRow;
337,787✔
1571
}
1572

1573
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
128,925✔
1574
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
128,925✔
1575
  return 0;
128,925✔
1576
}
1577

1578
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
337,787✔
1579
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
337,787✔
1580
  tDeleteSubscribeObj(pSub);
337,787✔
1581
  return 0;
337,787✔
1582
}
1583

1584
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
137,174✔
1585
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
137,174✔
1586
  mDebug("subscribe:%s, perform update action", pOldSub->key);
137,174✔
1587

1588
  taosWLockLatch(&pOldSub->lock);
137,174✔
1589
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
137,174✔
1590
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
137,174✔
1591
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
137,174✔
1592
  taosWUnLockLatch(&pOldSub->lock);
137,174✔
1593

1594
  return 0;
137,174✔
1595
}
1596

1597
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
3,531,898✔
1598
  if (pMnode == NULL || key == NULL || pSub == NULL) {
3,531,898✔
1599
    return TSDB_CODE_INVALID_PARA;
×
1600
  }
1601
  SSdb *pSdb = pMnode->pSdb;
3,531,898✔
1602
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
3,531,898✔
1603
  if (*pSub == NULL) {
3,531,898✔
1604
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
237,302✔
1605
  }
1606
  return 0;
3,294,596✔
1607
}
1608

1609
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
153,678✔
1610
  if (pMnode == NULL || topicName == NULL) return 0;
153,678✔
1611
  int32_t num = 0;
153,678✔
1612
  SSdb *  pSdb = pMnode->pSdb;
153,678✔
1613

1614
  void *           pIter = NULL;
153,678✔
1615
  SMqSubscribeObj *pSub = NULL;
153,678✔
1616
  while (1) {
198,717✔
1617
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
352,395✔
1618
    if (pIter == NULL) break;
352,395✔
1619

1620
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
198,717✔
1621
    char cgroup[TSDB_CGROUP_LEN] = {0};
198,717✔
1622
    taosRLockLatch(&pSub->lock);
198,717✔
1623
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
198,717✔
1624
    taosRUnLockLatch(&pSub->lock);
198,717✔
1625
    if (strcmp(topic, topicName) != 0) {
198,717✔
1626
      sdbRelease(pSdb, pSub);
99,734✔
1627
      continue;
99,734✔
1628
    }
1629

1630
    num++;
98,983✔
1631
    sdbRelease(pSdb, pSub);
98,983✔
1632
  }
1633

1634
  return num;
153,678✔
1635
}
1636

1637
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
3,413,247✔
1638
  if (pMnode == NULL || pSub == NULL) return;
3,413,247✔
1639
  SSdb *pSdb = pMnode->pSdb;
3,294,596✔
1640
  sdbRelease(pSdb, pSub);
3,294,596✔
1641
}
1642

1643
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
71,688✔
1644
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
71,688✔
1645
  int32_t code = 0;
71,688✔
1646
  int32_t lino = 0;
71,688✔
1647
  PRINT_LOG_START
71,688✔
1648
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
71,688✔
1649
  MND_TMQ_NULL_CHECK(pCommitRaw);
71,688✔
1650
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
71,688✔
1651
  if (code != 0) {
71,688✔
1652
    sdbFreeRaw(pCommitRaw);
×
1653
    goto END;
×
1654
  }
1655
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
71,688✔
1656
END:
71,688✔
1657
  PRINT_LOG_END
71,688✔
1658
  return code;
71,688✔
1659
}
1660

1661
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
134,026✔
1662
  int32_t code = 0;
134,026✔
1663
  int32_t lino = 0;
134,026✔
1664
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
134,026✔
1665
  char    cgroup[TSDB_CGROUP_LEN] = {0};
134,026✔
1666
  taosRLockLatch(&pSub->lock);
134,026✔
1667
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
134,026✔
1668
  if (strcmp(topic, topicName) != 0) {
134,026✔
1669
    goto END;
62,902✔
1670
  }
1671

1672
  // iter all vnode to delete handle
1673
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
71,124✔
1674
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1675
    goto END;
×
1676
  }
1677

1678
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
71,124✔
1679
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
71,124✔
1680

1681
END:
134,026✔
1682
  taosRUnLockLatch(&pSub->lock);
134,026✔
1683

1684
  return code;
134,026✔
1685
}
1686

1687
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
94,359✔
1688
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
94,359✔
1689
  SSdb *           pSdb = pMnode->pSdb;
94,359✔
1690
  int32_t          code = 0;
94,359✔
1691
  int32_t          lino = 0;
94,359✔
1692
  void *           pIter = NULL;
94,359✔
1693
  SMqSubscribeObj *pSub = NULL;
94,359✔
1694
  PRINT_LOG_START
94,359✔
1695
  while (1) {
1696
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
228,385✔
1697
    if (pIter == NULL) break;
228,385✔
1698

1699
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
134,026✔
1700
    sdbRelease(pSdb, pSub);
134,026✔
1701
  }
1702

1703
END:
94,359✔
1704
  PRINT_LOG_END
94,359✔
1705
  sdbRelease(pSdb, pSub);
94,359✔
1706
  sdbCancelFetch(pSdb, pIter);
94,359✔
1707

1708
  TAOS_RETURN(code);
94,359✔
1709
}
1710

1711
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
250,460✔
1712
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1713
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
250,460✔
1714
    return TSDB_CODE_INVALID_PARA;
×
1715
  }
1716
  int32_t code = 0;
250,460✔
1717
  int32_t lino = 0;
250,460✔
1718
  PRINT_LOG_START
250,460✔
1719
  int32_t sz = taosArrayGetSize(vgs);
250,460✔
1720
  for (int32_t j = 0; j < sz; j++) {
429,306✔
1721
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
178,846✔
1722
    MND_TMQ_NULL_CHECK(pVgEp);
178,846✔
1723

1724
    SColumnInfoData *pColInfo = NULL;
178,846✔
1725
    int32_t          cols = 0;
178,846✔
1726

1727
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1728
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1729
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
178,846✔
1730

1731
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1732
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1733
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
178,846✔
1734

1735
    // vg id
1736
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1737
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1738
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
178,846✔
1739

1740
    // consumer id
1741
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
178,846✔
1742
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
178,846✔
1743
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
178,846✔
1744

1745
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1746
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1747
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
178,846✔
1748

1749
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
178,846✔
1750
    if (user) STR_TO_VARSTR(userStr, user);
178,846✔
1751
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1752
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1753
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
178,846✔
1754

1755
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
178,846✔
1756
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
178,846✔
1757
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
178,846✔
1758
    MND_TMQ_NULL_CHECK(pColInfo);
178,846✔
1759
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
178,846✔
1760

1761
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
178,846✔
1762
          varDataVal(cgroup), pVgEp->vgId);
1763

1764
    // offset
1765
    OffsetRows *data = NULL;
178,846✔
1766
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
435,348✔
1767
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
256,502✔
1768
      MND_TMQ_NULL_CHECK(tmp);
256,502✔
1769
      if (tmp->vgId != pVgEp->vgId) {
256,502✔
1770
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1771
        continue;
174,252✔
1772
      }
1773
      data = tmp;
82,250✔
1774
    }
1775
    if (data) {
178,846✔
1776
      // vg id
1777
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
82,250✔
1778
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
82,250✔
1779
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
164,500✔
1780
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
82,250✔
1781
      varDataSetLen(buf, strlen(varDataVal(buf)));
82,250✔
1782
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
82,250✔
1783
      MND_TMQ_NULL_CHECK(pColInfo);
82,250✔
1784
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
82,250✔
1785
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
82,250✔
1786
      MND_TMQ_NULL_CHECK(pColInfo);
82,250✔
1787
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
82,250✔
1788
    } else {
1789
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
96,596✔
1790
      MND_TMQ_NULL_CHECK(pColInfo);
96,596✔
1791
      colDataSetNULL(pColInfo, *numOfRows);
96,596✔
1792
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
96,596✔
1793
      MND_TMQ_NULL_CHECK(pColInfo);
96,596✔
1794
      colDataSetNULL(pColInfo, *numOfRows);
96,596✔
1795
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
96,596✔
1796
    }
1797
    (*numOfRows)++;
178,846✔
1798
  }
1799

1800
END:
250,460✔
1801
  PRINT_LOG_END
250,460✔
1802
  return code;
250,460✔
1803
}
1804

1805
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SSDataBlock *pBlock, int32_t *numOfRows,
134,068✔
1806
                           int32_t rowsCapacity) {
1807
  int32_t        code = 0;
134,068✔
1808
  int32_t        lino = 0;
134,068✔
1809
  SMnode *       pMnode = pReq->info.node;
134,068✔
1810
  SSdb *         pSdb = pMnode->pSdb;
134,068✔
1811
  SMqConsumerEp *pConsumerEp = NULL;
134,068✔
1812
  void *         pIter = NULL;
134,068✔
1813
  PRINT_LOG_START
134,068✔
1814

1815
  taosRLockLatch(&pSub->lock);
134,068✔
1816
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
134,068✔
1817
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1818
  }
1819

1820
  // topic and cgroup
1821
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
134,068✔
1822
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
134,068✔
1823
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
134,068✔
1824
  varDataSetLen(topic, strlen(varDataVal(topic)));
134,068✔
1825
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
134,068✔
1826

1827
  while (1) {
116,392✔
1828
    pIter = taosHashIterate(pSub->consumerHash, pIter);
250,460✔
1829
    if (pIter == NULL) break;
250,460✔
1830
    pConsumerEp = (SMqConsumerEp *)pIter;
116,392✔
1831

1832
    char *          user = NULL;
116,392✔
1833
    char *          fqdn = NULL;
116,392✔
1834
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
116,392✔
1835
    if (pConsumer != NULL) {
116,392✔
1836
      user = pConsumer->user;
116,392✔
1837
      fqdn = pConsumer->fqdn;
116,392✔
1838
      sdbRelease(pSdb, pConsumer);
116,392✔
1839
    }
1840
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
116,392✔
1841
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1842
  }
1843

1844
  MND_TMQ_RETURN_CHECK(
134,068✔
1845
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1846

1847
  pBlock->info.rows = *numOfRows;
134,068✔
1848

1849
END:
134,068✔
1850
  taosRUnLockLatch(&pSub->lock);
134,068✔
1851
  taosHashCancelIterate(pSub->consumerHash, pIter);
134,068✔
1852
  PRINT_LOG_END
134,068✔
1853
  return code;
134,068✔
1854
}
1855

1856
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
22,831✔
1857
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
22,831✔
1858
    return TSDB_CODE_INVALID_PARA;
×
1859
  }
1860
  SMnode *         pMnode = pReq->info.node;
22,831✔
1861
  SSdb *           pSdb = pMnode->pSdb;
22,831✔
1862
  int32_t          numOfRows = 0;
22,831✔
1863
  SMqSubscribeObj *pSub = NULL;
22,831✔
1864
  int32_t          code = 0;
22,831✔
1865
  int32_t          lino = 0;
22,831✔
1866

1867
  mInfo("mnd show subscriptions begin");
22,831✔
1868

1869
  while (numOfRows < rowsCapacity) {
156,899✔
1870
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
156,899✔
1871
    if (pShow->pIter == NULL) {
156,899✔
1872
      break;
22,831✔
1873
    }
1874

1875
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pBlock, &numOfRows, rowsCapacity));
134,068✔
1876

1877
    sdbRelease(pSdb, pSub);
134,068✔
1878
    pSub = NULL;
134,068✔
1879
  }
1880
  mInfo("mnd end show subscriptions");
22,831✔
1881
  pShow->numOfRows += numOfRows;
22,831✔
1882

1883
END:
22,831✔
1884
  sdbCancelFetch(pSdb, pShow->pIter);
22,831✔
1885
  sdbRelease(pSdb, pSub);
22,831✔
1886

1887
  if (code != 0) {
22,831✔
1888
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1889
    TAOS_RETURN(code);
×
1890
  } else {
1891
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
22,831✔
1892
    return numOfRows;
22,831✔
1893
  }
1894
}
1895

1896
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1897
  if (pMnode == NULL) {
×
1898
    return;
×
1899
  }
1900
  SSdb *pSdb = pMnode->pSdb;
×
1901
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc