• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.53
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_SUBSCRIBE_VER_NUMBER   3
29
#define MND_SUBSCRIBE_RESERVE_SIZE 64
30

31
//#define MND_CONSUMER_LOST_HB_CNT          6
32

33
static int32_t mqRebInExecCnt = 0;
34

35
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
36
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
37
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
39
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
41
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
42
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
44
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
45

46
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
256,596✔
47
  if (pTrans == NULL || pSub == NULL) {
256,596✔
UNCOV
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  int32_t code = 0;
256,596✔
51
  int32_t lino = 0;
256,596✔
52
  PRINT_LOG_START
256,596✔
53
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
256,596✔
54
  MND_TMQ_NULL_CHECK(pCommitRaw);
256,596✔
55
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
256,596✔
56
  if (code != 0) {
256,596✔
UNCOV
57
    sdbFreeRaw(pCommitRaw);
×
UNCOV
58
    goto END;
×
59
  }
60
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
256,596✔
61

62
END:
256,596✔
63
  PRINT_LOG_END
256,596✔
64
  return code;
256,596✔
65
}
66

67
int32_t mndInitSubscribe(SMnode *pMnode) {
400,365✔
68
  SSdbTable table = {
400,365✔
69
      .sdbType = SDB_SUBSCRIBE,
70
      .keyType = SDB_KEY_BINARY,
71
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
72
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
73
      .insertFp = (SdbInsertFp)mndSubActionInsert,
74
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
75
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
76
  };
77

78
  if (pMnode == NULL) {
400,365✔
UNCOV
79
    return TSDB_CODE_INVALID_PARA;
×
80
  }
81
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
400,365✔
82
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
400,365✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
400,365✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
400,365✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
400,365✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
400,365✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
400,365✔
89

90
  return sdbSetTable(pMnode->pSdb, table);
400,365✔
91
}
92

93
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
116,707✔
94
  int32_t code = 0;
116,707✔
95
  SSdb *  pSdb = pMnode->pSdb;
116,707✔
96
  SVgObj *pVgroup = NULL;
116,707✔
97

98
  void *pIter = NULL;
116,707✔
99
  while (1) {
685,471✔
100
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
802,178✔
101
    if (pIter == NULL) {
802,178✔
102
      break;
116,707✔
103
    }
104

105
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
685,471✔
106
      sdbRelease(pSdb, pVgroup);
385,288✔
107
      continue;
385,288✔
108
    }
109

110
    pSub->vgNum++;
300,183✔
111

112
    SMqVgEp pVgEp = {0};
300,183✔
113
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
300,183✔
114
    pVgEp.vgId = pVgroup->vgId;
300,183✔
115
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
600,366✔
UNCOV
116
      code = terrno;
×
UNCOV
117
      sdbRelease(pSdb, pVgroup);
×
118
      sdbCancelFetch(pSdb, pIter);
×
119
      goto END;
×
120
    }
121
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
300,183✔
122
    sdbRelease(pSdb, pVgroup);
300,183✔
123
  }
124

125
END:
116,707✔
126
  return code;
116,707✔
127
}
128

129
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
116,707✔
130
                                     SMqSubscribeObj **pSub) {
131
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
116,707✔
UNCOV
132
    return TSDB_CODE_INVALID_PARA;
×
133
  }
134
  int32_t lino = 0;
116,707✔
135
  int32_t code = 0;
116,707✔
136
  PRINT_LOG_START
116,707✔
137
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
116,707✔
138
  (*pSub)->dbUid = pTopic->dbUid;
116,707✔
139
  (*pSub)->stbUid = pTopic->stbUid;
116,707✔
140
  (*pSub)->subType = pTopic->subType;
116,707✔
141
  (*pSub)->withMeta = pTopic->withMeta;
116,707✔
142

143
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
116,707✔
144

145
END:
116,707✔
146
  PRINT_LOG_END
116,707✔
147
  return code;
116,707✔
148
}
149

150
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,640,479✔
151
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,640,479✔
UNCOV
152
    return;
×
153
  }
154
  int32_t i = 0;
1,640,479✔
155
  while (key[i] != TMQ_SEPARATOR_CHAR) {
11,300,092✔
156
    i++;
9,659,613✔
157
  }
158
  (void)memcpy(cgroup, key, i);
1,640,479✔
159
  cgroup[i] = 0;
1,640,479✔
160
  if (fullName) {
1,640,479✔
161
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,563,521✔
162
  } else {
163
    while (key[i] != '.') {
230,874✔
164
      i++;
153,916✔
165
    }
166
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
76,958✔
167
  }
168
}
169

170
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
783,579✔
171
                                    const SMqRebOutputVg *pRebVg) {
172
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
783,579✔
UNCOV
173
    return TSDB_CODE_INVALID_PARA;
×
174
  }
175
  SMqRebVgReq  req = {0};
783,579✔
176
  int32_t      code = 0;
783,579✔
177
  int32_t      lino = 0;
783,579✔
178
  SEncoder     encoder = {0};
783,579✔
179
  SMqTopicObj *pTopic = NULL;
783,579✔
180
  void *       buf = NULL;
783,579✔
181

182
  PRINT_LOG_START
783,579✔
183
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
783,579✔
184
  char cgroup[TSDB_CGROUP_LEN] = {0};
783,579✔
185
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
783,579✔
186
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
783,579✔
187
  taosRLockLatch(&pTopic->lock);
783,579✔
188
  req.oldConsumerId = pRebVg->oldConsumerId;
783,579✔
189
  req.newConsumerId = pRebVg->newConsumerId;
783,579✔
190
  req.vgId = pRebVg->pVgEp.vgId;
783,579✔
191
  req.qmsg = pTopic->physicalPlan;
783,579✔
192
  req.schema = pTopic->schema;
783,579✔
193
  req.subType = pSub->subType;
783,579✔
194
  req.withMeta = pSub->withMeta;
783,579✔
195
  req.suid = pSub->stbUid;
783,579✔
196
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
783,579✔
197

198
  int32_t tlen = 0;
783,579✔
199
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
783,579✔
200
  if (code < 0) {
783,579✔
UNCOV
201
    goto END;
×
202
  }
203

204
  tlen += sizeof(SMsgHead);
783,579✔
205
  buf = taosMemoryMalloc(tlen);
783,579✔
206
  MND_TMQ_NULL_CHECK(buf);
783,579✔
207
  SMsgHead *pMsgHead = (SMsgHead *)buf;
783,579✔
208
  pMsgHead->contLen = htonl(tlen);
783,579✔
209
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
783,579✔
210

211
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
783,579✔
212
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
783,579✔
213
  *pBuf = buf;
783,579✔
214
  buf = NULL;
783,579✔
215
  *pLen = tlen;
783,579✔
216

217
END:
783,579✔
218
  PRINT_LOG_END
783,579✔
219
  taosMemoryFree(buf);
783,579✔
220
  if (pTopic != NULL) {
783,579✔
221
    taosRUnLockLatch(&pTopic->lock);
783,579✔
222
  }
223
  mndReleaseTopic(pMnode, pTopic);
783,579✔
224
  tEncoderClear(&encoder);
783,579✔
225
  return code;
783,579✔
226
}
227

228
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
783,579✔
229
                                        const SMqRebOutputVg *pRebVg) {
230
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
783,579✔
UNCOV
231
    return TSDB_CODE_INVALID_PARA;
×
232
  }
233
  int32_t code = 0;
783,579✔
234
  int32_t lino = 0;
783,579✔
235
  void *  buf = NULL;
783,579✔
236
  PRINT_LOG_START
783,579✔
237
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
783,579✔
UNCOV
238
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
UNCOV
239
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
240
    goto END;
×
241
  }
242

243
  int32_t tlen = 0;
783,579✔
244
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
783,579✔
245
  int32_t vgId = pRebVg->pVgEp.vgId;
783,579✔
246
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
783,579✔
247
  if (pVgObj == NULL) {
783,579✔
UNCOV
248
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
UNCOV
249
    goto END;
×
250
  }
251

252
  STransAction action = {0};
783,579✔
253
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
783,579✔
254
  action.pCont = buf;
783,579✔
255
  buf = NULL;
783,579✔
256
  action.contLen = tlen;
783,579✔
257
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
783,579✔
258

259
  mndReleaseVgroup(pMnode, pVgObj);
783,579✔
260
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
783,579✔
261

262
END:
783,579✔
263
  PRINT_LOG_END
783,579✔
264
  taosMemoryFree(buf);
783,579✔
265
  return code;
783,579✔
266
}
267

268
static void freeRebalanceItem(void *param) {
323,678✔
269
  if (param == NULL) return;
323,678✔
270
  SMqRebInfo *pInfo = param;
323,678✔
271
  taosArrayDestroy(pInfo->newConsumers);
323,678✔
272
  taosArrayDestroy(pInfo->removedConsumers);
323,678✔
273
}
274

275
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
328,611✔
276
  if (pHash == NULL || key == NULL) {
328,611✔
UNCOV
277
    return TSDB_CODE_INVALID_PARA;
×
278
  }
279
  int32_t code = 0;
328,611✔
280
  int32_t lino = 0;
328,611✔
281
  PRINT_LOG_START
328,611✔
282
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
328,611✔
283
  if (pRebInfo == NULL) {
328,611✔
284
    pRebInfo = tNewSMqRebSubscribe(key);
323,678✔
285
    if (pRebInfo == NULL) {
323,678✔
UNCOV
286
      code = terrno;
×
UNCOV
287
      goto END;
×
288
    }
289
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
323,678✔
290
    if (code != 0) {
323,678✔
UNCOV
291
      freeRebalanceItem(pRebInfo);
×
UNCOV
292
      taosMemoryFreeClear(pRebInfo);
×
293
      goto END;
×
294
    }
295
    taosMemoryFreeClear(pRebInfo);
323,678✔
296

297
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
323,678✔
298
    MND_TMQ_NULL_CHECK(pRebInfo);
323,678✔
299
  }
300
  if (pReb) {
328,611✔
301
    *pReb = pRebInfo;
263,237✔
302
  }
303

304
END:
65,374✔
305
  PRINT_LOG_END
328,611✔
306
  return code;
328,611✔
307
}
308

309
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
913,122✔
310
  if (vgs == NULL || pHash == NULL || key == NULL) {
913,122✔
UNCOV
311
    return TSDB_CODE_INVALID_PARA;
×
312
  }
313
  int32_t code = 0;
913,122✔
314
  int32_t lino = 0;
913,122✔
315
  PRINT_LOG_START
913,122✔
316
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
913,122✔
317
  MND_TMQ_NULL_CHECK(pVgEp);
913,122✔
318
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
913,122✔
319
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
913,122✔
320
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
913,122✔
321

322
END:
912,810✔
323
  PRINT_LOG_END
913,122✔
324
  return code;
913,122✔
325
}
326

327
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
323,678✔
328
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
323,678✔
UNCOV
329
    return TSDB_CODE_INVALID_PARA;
×
330
  }
331
  int32_t code = 0;
323,678✔
332
  int32_t lino = 0;
323,678✔
333
  PRINT_LOG_START
323,678✔
334
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
323,678✔
335
  int32_t actualRemoved = 0;
323,678✔
336
  for (int32_t i = 0; i < numOfRemoved; i++) {
440,424✔
337
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
116,746✔
338
    MND_TMQ_NULL_CHECK(consumerId);
116,746✔
339
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
116,746✔
340
    if (pConsumerEp == NULL) {
116,746✔
UNCOV
341
      continue;
×
342
    }
343

344
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
116,746✔
345
    for (int32_t j = 0; j < consumerVgNum; j++) {
492,862✔
346
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
376,116✔
347
    }
348

349
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
116,746✔
350
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
233,492✔
351
    actualRemoved++;
116,746✔
352
  }
353

354
  if (numOfRemoved != actualRemoved) {
323,678✔
UNCOV
355
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
356
           numOfRemoved, actualRemoved);
357
  } else {
358
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
323,678✔
359
  }
UNCOV
360
END:
×
361
  PRINT_LOG_END
323,678✔
362
  return code;
323,678✔
363
}
364

365
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
323,678✔
366
  if (pOutput == NULL || pInput == NULL) {
323,678✔
UNCOV
367
    return TSDB_CODE_INVALID_PARA;
×
368
  }
369
  int32_t code = 0;
323,678✔
370
  int32_t lino = 0;
323,678✔
371
  PRINT_LOG_START
323,678✔
372
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
323,678✔
373

374
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
470,169✔
375
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
146,491✔
376
    MND_TMQ_NULL_CHECK(consumerId);
146,491✔
377
    SMqConsumerEp newConsumerEp = {0};
146,491✔
378
    newConsumerEp.consumerId = *consumerId;
146,491✔
379
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
146,491✔
380
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
146,491✔
381
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
146,491✔
382
        0) {
UNCOV
383
      freeSMqConsumerEp(&newConsumerEp);
×
UNCOV
384
      code = terrno;
×
385
      goto END;
×
386
    }
387
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
292,982✔
388
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
146,491✔
389
  }
390
END:
323,678✔
391
  PRINT_LOG_END
323,678✔
392
  return code;
323,678✔
393
}
394

395
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
323,678✔
396
  if (pOutput == NULL || pHash == NULL) {
323,678✔
UNCOV
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399
  int32_t code = 0;
323,678✔
400
  int32_t lino = 0;
323,678✔
401
  PRINT_LOG_START
323,678✔
402
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
323,678✔
403
  for (int32_t i = 0; i < numOfVgroups; i++) {
857,540✔
404
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
533,862✔
405
  }
406
END:
323,678✔
407
  PRINT_LOG_END
323,678✔
408
  return code;
323,678✔
409
}
410

411
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
323,678✔
412
                                        int32_t remainderVgCnt) {
413
  if (pOutput == NULL || pHash == NULL) {
323,678✔
UNCOV
414
    return TSDB_CODE_INVALID_PARA;
×
415
  }
416
  int32_t code = 0;
323,678✔
417
  int32_t lino = 0;
323,678✔
418
  int32_t cnt = 0;
323,678✔
419
  void *  pIter = NULL;
323,678✔
420
  PRINT_LOG_START
323,678✔
421

422
  while (1) {
72,189✔
423
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
395,867✔
424
    if (pIter == NULL) {
395,867✔
425
      break;
323,678✔
426
    }
427

428
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
72,189✔
429
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
72,189✔
430

431
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
144,378✔
432
    if (consumerVgNum > minVgCnt) {
72,189✔
433
      if (cnt < remainderVgCnt) {
2,176✔
434
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
604✔
UNCOV
435
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
436
        }
437
        cnt++;
604✔
438
      } else {
439
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
4,716✔
440
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
3,144✔
441
        }
442
      }
443
    }
444
  }
445
END:
323,678✔
446
  PRINT_LOG_END
323,678✔
447
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
323,678✔
448
  return code;
323,678✔
449
}
450

451
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
323,678✔
452
  if (pMnode == NULL || pOutput == NULL) {
323,678✔
UNCOV
453
    return TSDB_CODE_INVALID_PARA;
×
454
  }
455
  int32_t code = 0;
323,678✔
456
  int32_t lino = 0;
323,678✔
457
  int32_t totalVgNum = 0;
323,678✔
458
  SVgObj *pVgroup = NULL;
323,678✔
459
  void *  pIter = NULL;
323,678✔
460
  void *  pIterHash = NULL;
323,678✔
461
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
323,678✔
462
  MND_TMQ_NULL_CHECK(newVgs);
323,678✔
463
  while (1) {
1,857,489✔
464
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,181,167✔
465
    if (pIter == NULL) {
2,181,167✔
466
      break;
323,678✔
467
    }
468
    if (pVgroup->mountVgId) {
1,857,489✔
UNCOV
469
      sdbRelease(pMnode->pSdb, pVgroup);
×
UNCOV
470
      continue;
×
471
    }
472

473
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,857,489✔
474
      sdbRelease(pMnode->pSdb, pVgroup);
934,369✔
475
      continue;
934,369✔
476
    }
477

478
    totalVgNum++;
923,120✔
479
    SMqVgEp pVgEp = {0};
923,120✔
480
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
923,120✔
481
    pVgEp.vgId = pVgroup->vgId;
923,120✔
482
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
923,120✔
483
    sdbRelease(pMnode->pSdb, pVgroup);
923,120✔
484
  }
485

486
  while (1) {
188,935✔
487
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
512,613✔
488
    if (pIterHash == NULL) break;
512,613✔
489
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
188,935✔
490
    int32_t        j = 0;
188,935✔
491
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
643,567✔
492
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
454,632✔
493
      MND_TMQ_NULL_CHECK(pVgEpTmp);
454,632✔
494
      bool find = false;
454,632✔
495
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
597,956✔
496
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
532,582✔
497
        MND_TMQ_NULL_CHECK(pnewVgEp);
532,582✔
498
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
532,582✔
499
          taosArrayRemove(newVgs, k);
389,258✔
500
          find = true;
389,258✔
501
          break;
389,258✔
502
        }
503
      }
504
      if (!find) {
454,632✔
505
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
65,374✔
506
        taosArrayRemove(pConsumerEp->vgs, j);
65,374✔
507
        continue;
65,374✔
508
      }
509
      j++;
389,258✔
510
    }
511
  }
512

513
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
323,678✔
514
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
65,374✔
515
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
65,374✔
516
  }
517

518
END:
323,678✔
519
  sdbRelease(pMnode->pSdb, pVgroup);
323,678✔
520
  sdbCancelFetch(pMnode->pSdb, pIter);
323,678✔
521
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
323,678✔
522
  taosArrayDestroy(newVgs);
323,678✔
523
  if (code != 0) {
323,678✔
UNCOV
524
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
UNCOV
525
    return code;
×
526
  } else {
527
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
323,678✔
528
    return totalVgNum;
323,678✔
529
  }
530
}
531

532
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
323,678✔
533
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
323,678✔
UNCOV
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  void *           pIter = NULL;
323,678✔
537
  SMqSubscribeObj *pSub = NULL;
323,678✔
538
  int32_t          lino = 0;
323,678✔
539
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
323,678✔
540
  if (code != 0) {
323,678✔
541
    return 0;
116,707✔
542
  }
543
  taosRLockLatch(&pSub->lock);
206,971✔
544
  PRINT_LOG_START
206,971✔
545
  if (pOutput->pSub->offsetRows == NULL) {
206,971✔
546
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
153,767✔
547
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
153,767✔
548
  }
549
  while (1) {
188,935✔
550
    pIter = taosHashIterate(pSub->consumerHash, pIter);
395,906✔
551
    if (pIter == NULL) break;
395,906✔
552
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
188,935✔
553
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
188,935✔
554

555
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
632,607✔
556
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
443,672✔
557
      MND_TMQ_NULL_CHECK(d1);
443,672✔
558
      bool jump = false;
443,672✔
559
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
579,095✔
560
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
142,293✔
561
        MND_TMQ_NULL_CHECK(pVgEp);
142,293✔
562
        if (pVgEp->vgId == d1->vgId) {
142,293✔
563
          jump = true;
6,870✔
564
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
6,870✔
565
                pConsumerEp->consumerId, pVgEp->vgId);
566
          break;
6,870✔
567
        }
568
      }
569
      if (jump) continue;
443,672✔
570
      bool find = false;
436,802✔
571
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,113,835✔
572
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
783,584✔
573
        MND_TMQ_NULL_CHECK(d2);
783,584✔
574
        if (d1->vgId == d2->vgId) {
783,584✔
575
          d2->rows += d1->rows;
106,551✔
576
          d2->offset = d1->offset;
106,551✔
577
          d2->ever = d1->ever;
106,551✔
578
          find = true;
106,551✔
579
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
106,551✔
580
          break;
106,551✔
581
        }
582
      }
583
      if (!find) {
436,802✔
584
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
660,502✔
585
      }
586
    }
587
  }
588

589
END:
206,971✔
590
  taosRUnLockLatch(&pSub->lock);
206,971✔
591
  taosHashCancelIterate(pSub->consumerHash, pIter);
206,971✔
592
  mndReleaseSubscribe(pMnode, pSub);
206,971✔
593
  PRINT_LOG_END
206,971✔
594
  return code;
206,971✔
595
}
596

597
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
323,678✔
598
  if (pOutput == NULL) return;
323,678✔
599
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
323,678✔
600
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,236,800✔
601
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
913,122✔
602
    if (pOutputRebVg == NULL) continue;
913,122✔
603
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
913,122✔
604
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
605
  }
606

607
  void *pIter = NULL;
323,678✔
608
  while (1) {
218,680✔
609
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
542,358✔
610
    if (pIter == NULL) break;
542,358✔
611
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
218,680✔
612
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
218,680✔
613
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
218,680✔
614
          pConsumerEp->consumerId, sz);
615
    for (int32_t i = 0; i < sz; i++) {
773,992✔
616
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
555,312✔
617
      if (pVgEp == NULL) continue;
555,312✔
618
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
555,312✔
619
            pConsumerEp->consumerId);
620
    }
621
  }
622
}
623

624
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
323,678✔
625
                           int32_t *remainderVgCnt) {
626
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
323,678✔
UNCOV
627
    return;
×
628
  }
629
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
323,678✔
630
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
323,678✔
631
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
323,678✔
632

633
  // calc num
634
  if (numOfFinal != 0) {
323,678✔
635
    *minVgCnt = totalVgNum / numOfFinal;
213,078✔
636
    *remainderVgCnt = totalVgNum % numOfFinal;
213,078✔
637
  } else {
638
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
110,600✔
639
  }
640
  mInfo(
323,678✔
641
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
642
      "remainderVg:%d",
643
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
644
}
645

646
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
323,678✔
647
  if (pOutput == NULL || pHash == NULL) {
323,678✔
UNCOV
648
    return TSDB_CODE_INVALID_PARA;
×
649
  }
650
  SMqRebOutputVg *pRebVg = NULL;
323,678✔
651
  void *          pAssignIter = NULL;
323,678✔
652
  void *          pIter = NULL;
323,678✔
653
  int32_t         code = 0;
323,678✔
654
  int32_t         lino = 0;
323,678✔
655
  PRINT_LOG_START
323,678✔
656

657
  while (1) {
218,680✔
658
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
542,358✔
659
    if (pIter == NULL) {
542,358✔
660
      break;
323,678✔
661
    }
662
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
218,680✔
663
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
763,084✔
664
      pAssignIter = taosHashIterate(pHash, pAssignIter);
544,404✔
665
      if (pAssignIter == NULL) {
544,404✔
UNCOV
666
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
UNCOV
667
        break;
×
668
      }
669

670
      pRebVg = (SMqRebOutputVg *)pAssignIter;
544,404✔
671
      pRebVg->newConsumerId = pConsumerEp->consumerId;
544,404✔
672
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,088,808✔
673
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
544,404✔
674
            pConsumerEp->consumerId);
675
    }
676
  }
677

678
  while (1) {
1,514✔
679
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
325,192✔
680
    if (pIter == NULL) {
325,192✔
681
      break;
110,600✔
682
    }
683
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
214,592✔
684
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
214,592✔
685
      pAssignIter = taosHashIterate(pHash, pAssignIter);
213,988✔
686
      if (pAssignIter == NULL) {
213,988✔
687
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
213,078✔
688
        break;
213,078✔
689
      }
690

691
      pRebVg = (SMqRebOutputVg *)pAssignIter;
910✔
692
      pRebVg->newConsumerId = pConsumerEp->consumerId;
910✔
693
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,820✔
694
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
910✔
695
            pConsumerEp->consumerId);
696
    }
697
  }
698

699
  if (pAssignIter != NULL) {
323,678✔
UNCOV
700
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
UNCOV
701
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
702
    goto END;
×
703
  }
704
  while (1) {
913,122✔
705
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,236,800✔
706
    if (pAssignIter == NULL) {
1,236,800✔
707
      break;
323,678✔
708
    }
709

710
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
913,122✔
711
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,826,244✔
712
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
913,122✔
713
      MND_TMQ_NULL_CHECK(
735,616✔
714
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
715
    }
716
  }
717

718
END:
323,678✔
719
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
323,678✔
720
  taosHashCancelIterate(pHash, pAssignIter);
323,678✔
721
  PRINT_LOG_END
323,678✔
722
  return code;
323,678✔
723
}
724

725
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
323,678✔
726
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
323,678✔
UNCOV
727
    return TSDB_CODE_INVALID_PARA;
×
728
  }
729
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
323,678✔
730
  if (totalVgNum < 0) {
323,678✔
UNCOV
731
    return totalVgNum;
×
732
  }
733
  const char *pSubKey = pOutput->pSub->key;
323,678✔
734
  int32_t     minVgCnt = 0;
323,678✔
735
  int32_t     remainderVgCnt = 0;
323,678✔
736
  int32_t     code = 0;
323,678✔
737
  int32_t     lino = 0;
323,678✔
738
  PRINT_LOG_START
323,678✔
739
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
323,678✔
740
  MND_TMQ_NULL_CHECK(pHash);
323,678✔
741
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
323,678✔
742
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
323,678✔
743
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
323,678✔
744
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
323,678✔
745
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
323,678✔
746
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
323,678✔
747
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
323,678✔
748
  printRebalanceLog(pOutput);
323,678✔
749

750
END:
323,678✔
751
  taosHashCleanup(pHash);
323,678✔
752
  PRINT_LOG_END
323,678✔
753
  return code;
323,678✔
754
}
755

756
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
769,788✔
757
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
769,788✔
UNCOV
758
    return TSDB_CODE_INVALID_PARA;
×
759
  }
760
  int32_t code = 0;
769,788✔
761
  int32_t lino = 0;
769,788✔
762
  PRINT_LOG_START
769,788✔
763
  SMqConsumerObj *pConsumerNew = NULL;
769,788✔
764
  int32_t         consumerNum = taosArrayGetSize(consumers);
769,788✔
765
  for (int32_t i = 0; i < consumerNum; i++) {
1,038,132✔
766
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
268,344✔
767
    MND_TMQ_NULL_CHECK(consumerId);
268,344✔
768
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
268,344✔
769
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
268,344✔
770
    tDeleteSMqConsumerObj(pConsumerNew);
268,344✔
771
  }
772
  pConsumerNew = NULL;
769,788✔
773

774
END:
769,788✔
775
  PRINT_LOG_END
769,788✔
776
  tDeleteSMqConsumerObj(pConsumerNew);
769,788✔
777
  return code;
769,788✔
778
}
779

780
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
256,596✔
781
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
256,596✔
UNCOV
782
    return TSDB_CODE_INVALID_PARA;
×
783
  }
784
  int32_t code = 0;
256,596✔
785
  int32_t lino = 0;
256,596✔
786
  PRINT_LOG_START
256,596✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
256,596✔
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
256,596✔
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
256,596✔
790
END:
256,596✔
791
  PRINT_LOG_END
256,596✔
792
  return code;
256,596✔
793
}
794

795
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
323,678✔
796
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
323,678✔
UNCOV
797
    return TSDB_CODE_INVALID_PARA;
×
798
  }
799
  int32_t code = 0;
323,678✔
800
  int32_t lino = 0;
323,678✔
801
  STrans *pTrans = NULL;
323,678✔
802
  PRINT_LOG_START
323,678✔
803

804
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
323,678✔
805
  char cgroup[TSDB_CGROUP_LEN] = {0};
323,678✔
806
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
323,678✔
807

808
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
323,678✔
809
  if (pTrans == NULL) {
323,678✔
UNCOV
810
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
811
    if (terrno != 0) code = terrno;
×
812
    goto END;
×
813
  }
814

815
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
323,678✔
816
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
323,678✔
817

818
  // 1. redo action: action to all vg
819
  const SArray *rebVgs = pOutput->rebVgs;
256,596✔
820
  int32_t       vgNum = taosArrayGetSize(rebVgs);
256,596✔
821
  for (int32_t i = 0; i < vgNum; i++) {
1,040,175✔
822
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
783,579✔
823
    MND_TMQ_NULL_CHECK(pRebVg);
783,579✔
824
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
783,579✔
825
  }
826

827
  // 2. commit log: subscribe and vg assignment
828
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
256,596✔
829

830
  // 3. commit log: consumer to update status and epoch
831
  if (!pOutput->isReload) {
256,596✔
832
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
256,596✔
833
  }
834

835
  // 4. set cb
836
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
256,596✔
837

838
  // 5. execution
839
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
256,596✔
840

841
END:
323,678✔
842
  mndTransDrop(pTrans);
323,678✔
843
  PRINT_LOG_END
323,678✔
844
  TAOS_RETURN(code);
323,678✔
845
}
846

847
// type = 0 remove  type = 1 add
848
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
512,670✔
849
  if (rebSubHash == NULL || topicList == NULL) {
512,670✔
UNCOV
850
    return TSDB_CODE_INVALID_PARA;
×
851
  }
852
  int32_t code = 0;
512,670✔
853
  int32_t lino = 0;
512,670✔
854
  PRINT_LOG_START
512,670✔
855
  int32_t topicNum = taosArrayGetSize(topicList);
512,670✔
856
  for (int32_t i = 0; i < topicNum; i++) {
775,907✔
857
    char *removedTopic = taosArrayGetP(topicList, i);
263,237✔
858
    MND_TMQ_NULL_CHECK(removedTopic);
263,237✔
859
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
263,237✔
860
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
263,237✔
861
    SMqRebInfo *pRebSub = NULL;
263,237✔
862
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
263,237✔
863
    if (type == 0)
263,237✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
233,492✔
865
    else if (type == 1)
146,491✔
866
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
292,982✔
867
  }
868

869
END:
512,670✔
870
  PRINT_LOG_END
512,670✔
871
  return code;
512,670✔
872
}
873

874
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
1,030,507✔
875
  SMqSubscribeObj *pSub = NULL;
1,030,507✔
876
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,030,507✔
877
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,030,507✔
878
  int32_t code = 0;
1,030,507✔
879
  int32_t lino = 0;
1,030,507✔
880

881
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
1,030,507✔
882
  taosRLockLatch(&pSub->lock);
1,030,507✔
883
  // iterate all vg assigned to the consumer of that topic
884
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,030,507✔
885
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,030,507✔
886
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,030,507✔
887
  for (int32_t j = 0; j < vgNum; j++) {
3,402,735✔
888
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
2,372,228✔
889
    if (pVgEp == NULL) {
2,372,228✔
UNCOV
890
      continue;
×
891
    }
892
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,372,228✔
893
    if (!pVgroup) {
2,372,228✔
894
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
65,374✔
895
      if (code != 0) {
65,374✔
UNCOV
896
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
897
      } else {
898
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
65,374✔
899
      }
900
    }
901
    mndReleaseVgroup(pMnode, pVgroup);
2,372,228✔
902
  }
903

904
END:
1,030,507✔
905
  if (pSub != NULL) {
1,030,507✔
906
    taosRUnLockLatch(&pSub->lock);
1,030,507✔
907
  }
908
  mndReleaseSubscribe(pMnode, pSub);
1,030,507✔
909
}
1,030,507✔
910

911
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
995,009✔
912
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
995,009✔
UNCOV
913
    return;
×
914
  }
915
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
995,009✔
916
  for (int32_t i = 0; i < newTopicNum; i++) {
2,025,516✔
917
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,030,507✔
918
    if (topic == NULL) {
1,030,507✔
UNCOV
919
      continue;
×
920
    }
921
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
1,030,507✔
922
  }
923
}
924

925
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,253,556✔
926
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,504,012✔
927
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,250,456✔
928
}
929

930
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,354,236✔
931
  int32_t code = 0;
1,354,236✔
932
  int32_t lino = 0;
1,354,236✔
933
  PRINT_LOG_START
1,354,236✔
934
  taosRLockLatch(&pConsumer->lock);
1,354,236✔
935

936
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,354,236✔
937
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,354,236✔
938
  int32_t status = atomic_load_32(&pConsumer->status);
1,354,236✔
939

940
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,354,236✔
941
         ", hbstatus:%d, pollStatus:%d",
942
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
943
         hbStatus, pollStatus);
944

945
  if (status == MQ_CONSUMER_STATUS_READY) {
1,354,236✔
946
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,099,485✔
947
      MND_TMQ_RETURN_CHECK(
100,680✔
948
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
949
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
998,805✔
950
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,796✔
951
            ", hb lost cnt:%d, or long time no poll cnt:%d",
952
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
953
            pConsumer->createTime, hbStatus, pollStatus);
954
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,796✔
955
    } else {
956
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
995,009✔
957
    }
958
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
254,751✔
959
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
254,437✔
960
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
254,437✔
961
  } else {
962
    MND_TMQ_RETURN_CHECK(
314✔
963
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
964
  }
965

966
END:
314✔
967
  taosRUnLockLatch(&pConsumer->lock);
1,354,236✔
968
  PRINT_LOG_END
1,354,236✔
969
  return code;
1,354,236✔
970
}
971

972
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
12,849,331✔
973
  if (pMsg == NULL || rebSubHash == NULL) {
12,849,331✔
UNCOV
974
    return TSDB_CODE_INVALID_PARA;
×
975
  }
976
  SMnode *        pMnode = pMsg->info.node;
12,849,331✔
977
  SSdb *          pSdb = pMnode->pSdb;
12,849,331✔
978
  SMqConsumerObj *pConsumer = NULL;
12,849,331✔
979
  void *          pIter = NULL;
12,849,331✔
980
  int32_t         code = 0;
12,849,331✔
981
  int32_t         lino = 0;
12,849,331✔
982
  PRINT_LOG_START
12,849,331✔
983

984
  // iterate all consumers, find all modification
985
  while (1) {
986
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
14,203,567✔
987
    if (pIter == NULL) {
14,203,567✔
988
      break;
12,849,331✔
989
    }
990
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,354,236✔
991
    mndReleaseConsumer(pMnode, pConsumer);
1,354,236✔
992
  }
993
END:
12,849,331✔
994
  PRINT_LOG_END
12,849,331✔
995
  sdbCancelFetch(pSdb, pIter);
12,849,331✔
996
  mndReleaseConsumer(pMnode, pConsumer);
12,849,331✔
997
  return code;
12,849,331✔
998
}
999

1000
bool mndRebTryStart() {
12,849,331✔
1001
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
12,849,331✔
1002
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
12,849,331✔
1003
}
1004

1005
void mndRebCntInc() {
259,260✔
1006
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
259,260✔
1007
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
259,260✔
1008
}
259,260✔
1009

1010
void mndRebCntDec() {
13,108,591✔
1011
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
13,108,591✔
1012
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
13,108,591✔
1013
}
13,108,591✔
1014

1015
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
647,356✔
1016
  if (rebOutput == NULL) {
647,356✔
1017
    return;
323,678✔
1018
  }
1019
  taosArrayDestroy(rebOutput->newConsumers);
323,678✔
1020
  rebOutput->newConsumers = NULL;
323,678✔
1021
  taosArrayDestroy(rebOutput->modifyConsumers);
323,678✔
1022
  rebOutput->modifyConsumers = NULL;
323,678✔
1023
  taosArrayDestroy(rebOutput->removedConsumers);
323,678✔
1024
  rebOutput->removedConsumers = NULL;
323,678✔
1025
  taosArrayDestroy(rebOutput->rebVgs);
323,678✔
1026
  rebOutput->rebVgs = NULL;
323,678✔
1027
  tDeleteSubscribeObj(rebOutput->pSub);
323,678✔
1028
  taosMemoryFree(rebOutput->pSub);
323,678✔
1029
  rebOutput->pSub = NULL;
323,678✔
1030
}
1031

1032
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
323,678✔
1033
  if (rebOutput == NULL) {
323,678✔
UNCOV
1034
    return TSDB_CODE_INVALID_PARA;
×
1035
  }
1036
  int32_t code = 0;
323,678✔
1037
  int32_t lino = 0;
323,678✔
1038
  PRINT_LOG_START
323,678✔
1039
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
323,678✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
323,678✔
1041
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
323,678✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
323,678✔
1043
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
323,678✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
323,678✔
1045
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
323,678✔
1046
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
323,678✔
1047
  rebOutput = NULL;
323,678✔
1048

1049
END:
323,678✔
1050
  PRINT_LOG_END
323,678✔
1051
  clearRebOutput(rebOutput);
323,678✔
1052
  return code;
323,678✔
1053
}
1054

1055
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
323,678✔
1056
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
323,678✔
UNCOV
1057
    return TSDB_CODE_INVALID_PARA;
×
1058
  }
1059
  const char *     key = rebInput->pRebInfo->key;
323,678✔
1060
  SMqSubscribeObj *pSub = NULL;
323,678✔
1061
  SMqTopicObj *    pTopic = NULL;
323,678✔
1062
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
323,678✔
1063
  int32_t          lino = 0;
323,678✔
1064
  PRINT_LOG_START
323,678✔
1065

1066
  if (code != 0) {
323,678✔
1067
    // split sub key and extract topic
1068
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
116,707✔
1069
    char cgroup[TSDB_CGROUP_LEN] = {0};
116,707✔
1070
    mndSplitSubscribeKey(key, topic, cgroup, true);
116,707✔
1071
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
116,707✔
1072
    taosRLockLatch(&pTopic->lock);
116,707✔
1073
    rebInput->oldConsumerNum = 0;
116,707✔
1074
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
116,707✔
1075
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
116,707✔
1076
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
116,707✔
1077
  } else {
1078
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
206,971✔
1079
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
206,971✔
1080

1081
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
206,971✔
1082
          taosHashGetSize(rebOutput->pSub->consumerHash));
1083
  }
1084

1085
END:
323,366✔
1086
  PRINT_LOG_END
323,678✔
1087
  if (pTopic != NULL) {
323,678✔
1088
    taosRUnLockLatch(&pTopic->lock);
116,707✔
1089
  }
1090
  mndReleaseTopic(pMnode, pTopic);
323,678✔
1091
  mndReleaseSubscribe(pMnode, pSub);
323,678✔
1092
  return code;
323,678✔
1093
}
1094

UNCOV
1095
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
UNCOV
1096
  int32_t code = 0;
×
1097
  int32_t lino = 0;
×
1098

1099
  void *pIterConsumer = NULL;
×
1100

1101
  PRINT_LOG_START
×
UNCOV
1102
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1103
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1104

1105
  SMqConsumerEp *pConsumerEp = NULL;
×
1106

1107
  while (1) {
UNCOV
1108
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
UNCOV
1109
    if (pIterConsumer == NULL) break;
×
1110
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1111

1112
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
UNCOV
1113
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1114
      MND_TMQ_NULL_CHECK(pVgEp);
×
1115
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1116
      MND_TMQ_NULL_CHECK(vg);
×
1117
      vg->pVgEp = *pVgEp;
×
1118
      vg->oldConsumerId = -1;
×
1119
      vg->newConsumerId = pConsumerEp->consumerId;
×
1120
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1121
             vg->newConsumerId);
1122
    }
1123
  }
UNCOV
1124
END:
×
UNCOV
1125
  PRINT_LOG_END
×
1126
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1127
  return code;
×
1128
}
1129

UNCOV
1130
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
UNCOV
1131
  int32_t code = 0;
×
1132
  int32_t lino = 0;
×
1133

1134
  SMnode *        pMnode = pMsg->info.node;
×
UNCOV
1135
  SMqRebOutputObj rebOutput = {0};
×
1136

1137
  PRINT_LOG_START
×
UNCOV
1138
  taosRLockLatch(&pSub->lock);
×
1139

1140
  // topic and cgroup
UNCOV
1141
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
UNCOV
1142
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1143
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1144
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1145
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1146
    goto END;
×
1147
  }
1148

UNCOV
1149
  rebOutput.pSub = pSub;
×
UNCOV
1150
  rebOutput.isReload = true;
×
1151
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1152
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1153
  if (code != 0) {
×
1154
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1155
  }
1156

UNCOV
1157
END:
×
UNCOV
1158
  taosRUnLockLatch(&pSub->lock);
×
1159
  rebOutput.pSub = NULL;  // avoid double free
×
1160
  clearRebOutput(&rebOutput);
×
1161
  PRINT_LOG_END
×
1162
  return code;
×
1163
}
1164

UNCOV
1165
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
UNCOV
1166
  SMnode *pMnode = pMsg->info.node;
×
1167

1168
  SSdb *           pSdb = pMnode->pSdb;
×
UNCOV
1169
  SMqSubscribeObj *pSub = NULL;
×
1170
  int32_t          code = 0;
×
1171
  int32_t          lino = 0;
×
1172

1173
  PRINT_LOG_START
×
UNCOV
1174
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1175
  void *pIter = NULL;
×
1176
  while (1) {
1177
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
UNCOV
1178
    if (pIter == NULL) {
×
1179
      break;
×
1180
    }
1181

UNCOV
1182
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
UNCOV
1183
    sdbRelease(pSdb, pSub);
×
1184
  }
1185
  taosHashClear(topicsToReload);
×
UNCOV
1186
END:
×
1187
  sdbCancelFetch(pSdb, pIter);
×
1188
  sdbRelease(pSdb, pSub);
×
1189
  PRINT_LOG_END
×
1190

1191
  return code;
×
1192
}
1193

1194
static int32_t normalRebalance(SRpcMsg *pMsg) {
12,849,331✔
1195
  int     code = 0;
12,849,331✔
1196
  int32_t lino = 0;
12,849,331✔
1197

1198
  void *  pIter = NULL;
12,849,331✔
1199
  SMnode *pMnode = pMsg->info.node;
12,849,331✔
1200

1201
  PRINT_LOG_START
12,849,331✔
1202
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
12,849,331✔
1203
  MND_TMQ_NULL_CHECK(rebSubHash);
12,849,331✔
1204

1205
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
12,849,331✔
1206

1207
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
12,849,331✔
1208
  if (taosHashGetSize(rebSubHash) > 0) {
12,849,331✔
1209
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
283,718✔
1210
  }
1211

1212
  while (1) {
323,678✔
1213
    pIter = taosHashIterate(rebSubHash, pIter);
13,173,009✔
1214
    if (pIter == NULL) {
13,173,009✔
1215
      break;
12,849,331✔
1216
    }
1217

1218
    SMqRebInputObj  rebInput = {0};
323,678✔
1219
    SMqRebOutputObj rebOutput = {0};
323,678✔
1220
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
323,678✔
1221
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
323,678✔
1222
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
323,678✔
1223
    if (code != 0) {
323,678✔
UNCOV
1224
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1225
    }
1226

1227
    if (code == 0) {
323,678✔
1228
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
323,678✔
1229
      if (code != 0) {
323,678✔
UNCOV
1230
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1231
      }
1232
    }
1233

1234
    if (code == 0) {
323,678✔
1235
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
323,678✔
1236
      if (code != 0) {
323,678✔
1237
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
67,082✔
1238
      }
1239
    }
1240

1241
    clearRebOutput(&rebOutput);
323,678✔
1242
  }
1243

1244
  if (taosHashGetSize(rebSubHash) > 0) {
12,849,331✔
1245
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
283,718✔
1246
  }
1247

1248
END:
12,565,613✔
1249
  PRINT_LOG_END
12,849,331✔
1250
  taosHashCancelIterate(rebSubHash, pIter);
12,849,331✔
1251
  taosHashCleanup(rebSubHash);
12,849,331✔
1252
  return code;
12,849,331✔
1253
}
1254

1255
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
12,849,331✔
1256
  if (pMsg == NULL) {
12,849,331✔
UNCOV
1257
    return TSDB_CODE_INVALID_PARA;
×
1258
  }
1259
  int     code = 0;
12,849,331✔
1260
  int32_t lino = 0;
12,849,331✔
1261

1262
  void *  pIter = NULL;
12,849,331✔
1263
  SMnode *pMnode = pMsg->info.node;
12,849,331✔
1264
  PRINT_LOG_START;
12,849,331✔
1265
  if (!mndRebTryStart()) {
12,849,331✔
UNCOV
1266
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
UNCOV
1267
    goto END;
×
1268
  }
1269

1270
  if (taosHashGetSize(topicsToReload) > 0) {
12,849,331✔
UNCOV
1271
    code = reloadRebalance(pMsg);
×
1272
  } else {
1273
    code = normalRebalance(pMsg);
12,849,331✔
1274
  }
1275

1276
  mndRebCntDec();
12,849,331✔
1277

1278
END:
12,849,331✔
1279
  PRINT_LOG_END
12,849,331✔
1280
  TAOS_RETURN(code);
12,849,331✔
1281
}
1282

1283
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
72,185✔
1284
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
72,185✔
UNCOV
1285
    return TSDB_CODE_INVALID_PARA;
×
1286
  }
1287
  void *  pIter = NULL;
72,185✔
1288
  SVgObj *pVgObj = NULL;
72,185✔
1289
  int32_t code = 0;
72,185✔
1290
  int32_t lino = 0;
72,185✔
1291
  PRINT_LOG_START
72,185✔
1292
  while (1) {
528,067✔
1293
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
600,252✔
1294
    if (pIter == NULL) {
600,252✔
1295
      break;
72,185✔
1296
    }
1297
    if (pVgObj->mountVgId) {
528,067✔
UNCOV
1298
      sdbRelease(pMnode->pSdb, pVgObj);
×
UNCOV
1299
      continue;
×
1300
    }
1301

1302
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
528,067✔
1303
      sdbRelease(pMnode->pSdb, pVgObj);
287,861✔
1304
      continue;
287,861✔
1305
    }
1306
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
240,206✔
1307
    MND_TMQ_NULL_CHECK(pReq);
240,206✔
1308
    pReq->head.vgId = htonl(pVgObj->vgId);
240,206✔
1309
    pReq->vgId = pVgObj->vgId;
240,206✔
1310
    pReq->consumerId = -1;
240,206✔
1311
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
240,206✔
1312

1313
    STransAction action = {0};
240,206✔
1314
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
240,206✔
1315
    action.pCont = pReq;
240,206✔
1316
    action.contLen = sizeof(SMqVDeleteReq);
240,206✔
1317
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
240,206✔
1318
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
240,206✔
1319

1320
    sdbRelease(pMnode->pSdb, pVgObj);
240,206✔
1321
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
240,206✔
1322
  }
1323

1324
END:
72,185✔
1325
  PRINT_LOG_END
72,185✔
1326
  sdbRelease(pMnode->pSdb, pVgObj);
72,185✔
1327
  sdbCancelFetch(pMnode->pSdb, pIter);
72,185✔
1328
  return code;
72,185✔
1329
}
1330

1331
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
572✔
1332
                                   char *cgroup) {
1333
  int32_t         code = 0;
572✔
1334
  int32_t         lino = 0;
572✔
1335
  SMqConsumerObj *pConsumerNew = NULL;
572✔
1336

1337
  taosRLockLatch(&pConsumer->lock);
572✔
1338

1339
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
572✔
UNCOV
1340
    goto END;
×
1341
  }
1342

1343
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
572✔
1344
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
572✔
1345
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
572✔
1346
  if (found1 || found2 || found3) {
572✔
1347
    if (deleteConsumer) {
314✔
1348
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
314✔
1349
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
314✔
1350
      tDeleteSMqConsumerObj(pConsumerNew);
314✔
1351
      pConsumerNew = NULL;
314✔
1352
    } else {
UNCOV
1353
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1354
             pConsumer->consumerId, pConsumer->cgroup);
1355
      code = TSDB_CODE_MND_CGROUP_USED;
×
UNCOV
1356
      goto END;
×
1357
    }
1358
  }
1359

1360
END:
572✔
1361
  tDeleteSMqConsumerObj(pConsumerNew);
572✔
1362
  taosRUnLockLatch(&pConsumer->lock);
572✔
1363
  return code;
572✔
1364
}
1365

1366
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
572✔
1367
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
572✔
UNCOV
1368
    return TSDB_CODE_INVALID_PARA;
×
1369
  }
1370
  void *          pIter = NULL;
572✔
1371
  SMqConsumerObj *pConsumer = NULL;
572✔
1372
  int             code = 0;
572✔
1373
  int32_t         lino = 0;
572✔
1374
  PRINT_LOG_START
572✔
1375
  while (1) {
1376
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,144✔
1377
    if (pIter == NULL) {
1,144✔
1378
      break;
572✔
1379
    }
1380
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
572✔
1381
    sdbRelease(pMnode->pSdb, pConsumer);
572✔
1382
  }
1383

1384
END:
572✔
1385
  sdbRelease(pMnode->pSdb, pConsumer);
572✔
1386
  sdbCancelFetch(pMnode->pSdb, pIter);
572✔
1387
  return code;
572✔
1388
}
1389

1390
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
572✔
1391
  int32_t code = 0;
572✔
1392
  int32_t lino = 0;
572✔
1393
  STrans *pTrans = NULL;
572✔
1394
  SMnode *pMnode = pMsg->info.node;
572✔
1395
  PRINT_LOG_START
572✔
1396
  taosRLockLatch(&pSub->lock);
572✔
1397
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
572✔
UNCOV
1398
    code = TSDB_CODE_MND_CGROUP_USED;
×
UNCOV
1399
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1400
    goto END;
×
1401
  }
1402

1403
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
572✔
1404
  MND_TMQ_NULL_CHECK(pTrans);
572✔
1405
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
572✔
1406
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
572✔
1407
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
572✔
1408
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
572✔
1409
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
572✔
1410
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
572✔
1411
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
572✔
1412

1413
END:
572✔
1414
  taosRUnLockLatch(&pSub->lock);
572✔
1415
  mndTransDrop(pTrans);
572✔
1416
  PRINT_LOG_END
572✔
1417
  return code;
572✔
1418
}
1419

1420
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
572✔
1421
  if (pMsg == NULL) {
572✔
UNCOV
1422
    return TSDB_CODE_INVALID_PARA;
×
1423
  }
1424
  SMnode *        pMnode = pMsg->info.node;
572✔
1425
  SMDropCgroupReq dropReq = {0};
572✔
1426
  int32_t         code = 0;
572✔
1427
  int32_t         lino = 0;
572✔
1428

1429
  SMqSubscribeObj *pSub = NULL;
572✔
1430

1431
  PRINT_LOG_START
572✔
1432
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
572✔
1433
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
572✔
1434
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
572✔
1435
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
572✔
1436
  if (code != 0) {
572✔
UNCOV
1437
    if (dropReq.igNotExists) {
×
UNCOV
1438
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1439
      mndReleaseSubscribe(pMnode, pSub);
×
1440
      return 0;
×
1441
    } else {
1442
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
UNCOV
1443
      goto END;
×
1444
    }
1445
  }
1446
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
572✔
1447

1448
END:
572✔
1449
  mndReleaseSubscribe(pMnode, pSub);
572✔
1450
  PRINT_LOG_END
572✔
1451

1452
  if (code != 0) {
572✔
UNCOV
1453
    TAOS_RETURN(code);
×
1454
  }
1455
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
572✔
1456
}
1457

1458
void mndCleanupSubscribe(SMnode *pMnode) {}
400,307✔
1459

1460
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
423,923✔
1461
  if (pSub == NULL) {
423,923✔
UNCOV
1462
    return NULL;
×
1463
  }
1464
  int32_t code = 0;
423,923✔
1465
  int32_t lino = 0;
423,923✔
1466
  terrno = TSDB_CODE_OUT_OF_MEMORY;
423,923✔
1467
  void *  buf = NULL;
423,923✔
1468
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
423,923✔
1469
  if (tlen <= 0) goto SUB_ENCODE_OVER;
423,923✔
1470
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
423,923✔
1471

1472
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
423,923✔
1473
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
423,923✔
1474

1475
  buf = taosMemoryMalloc(tlen);
423,923✔
1476
  if (buf == NULL) goto SUB_ENCODE_OVER;
423,923✔
1477

1478
  void *abuf = buf;
423,923✔
1479
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
423,923✔
UNCOV
1480
    goto SUB_ENCODE_OVER;
×
1481
  }
1482

1483
  int32_t dataPos = 0;
423,923✔
1484
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
423,923✔
1485
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
423,923✔
1486
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
423,923✔
1487
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
423,923✔
1488

1489
  terrno = TSDB_CODE_SUCCESS;
423,923✔
1490

1491
SUB_ENCODE_OVER:
423,923✔
1492
  taosMemoryFreeClear(buf);
423,923✔
1493
  if (terrno != TSDB_CODE_SUCCESS) {
423,923✔
UNCOV
1494
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
UNCOV
1495
    sdbFreeRaw(pRaw);
×
1496
    return NULL;
×
1497
  }
1498

1499
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
423,923✔
1500
  return pRaw;
423,923✔
1501
}
1502

1503
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
342,787✔
1504
  if (pRaw == NULL) {
342,787✔
UNCOV
1505
    return NULL;
×
1506
  }
1507
  int32_t code = 0;
342,787✔
1508
  int32_t lino = 0;
342,787✔
1509
  terrno = TSDB_CODE_OUT_OF_MEMORY;
342,787✔
1510
  SSdbRow *        pRow = NULL;
342,787✔
1511
  SMqSubscribeObj *pSub = NULL;
342,787✔
1512
  void *           buf = NULL;
342,787✔
1513

1514
  int8_t sver = 0;
342,787✔
1515
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
342,787✔
1516

1517
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
342,787✔
UNCOV
1518
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
UNCOV
1519
    goto SUB_DECODE_OVER;
×
1520
  }
1521

1522
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
342,787✔
1523
  if (pRow == NULL) goto SUB_DECODE_OVER;
342,787✔
1524

1525
  pSub = sdbGetRowObj(pRow);
342,787✔
1526
  if (pSub == NULL) goto SUB_DECODE_OVER;
342,787✔
1527

1528
  int32_t dataPos = 0;
342,787✔
1529
  int32_t tlen;
342,475✔
1530
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
342,787✔
1531
  buf = taosMemoryMalloc(tlen);
342,787✔
1532
  if (buf == NULL) goto SUB_DECODE_OVER;
342,787✔
1533
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
342,787✔
1534
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
342,787✔
1535

1536
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
342,787✔
UNCOV
1537
    goto SUB_DECODE_OVER;
×
1538
  }
1539

1540
  // update epset saved in mnode
1541
  if (pSub->unassignedVgs != NULL) {
342,787✔
1542
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
342,787✔
1543
    for (int32_t i = 0; i < size; ++i) {
951,072✔
1544
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
608,285✔
1545
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
608,285✔
1546
    }
1547
  }
1548
  if (pSub->consumerHash != NULL) {
342,787✔
1549
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
342,787✔
1550
    while (pIter) {
507,754✔
1551
      SMqConsumerEp *pConsumerEp = pIter;
164,967✔
1552
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
164,967✔
1553
      for (int32_t i = 0; i < size; ++i) {
615,848✔
1554
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
450,881✔
1555
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
450,881✔
1556
      }
1557
      pIter = taosHashIterate(pSub->consumerHash, pIter);
164,967✔
1558
    }
1559
  }
1560

1561
  terrno = TSDB_CODE_SUCCESS;
342,787✔
1562

1563
SUB_DECODE_OVER:
342,787✔
1564
  taosMemoryFreeClear(buf);
342,787✔
1565
  if (terrno != TSDB_CODE_SUCCESS) {
342,787✔
UNCOV
1566
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
UNCOV
1567
    taosMemoryFreeClear(pRow);
×
1568
    return NULL;
×
1569
  }
1570

1571
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
342,787✔
1572
  return pRow;
342,787✔
1573
}
1574

1575
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
129,382✔
1576
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
129,382✔
1577
  return 0;
129,382✔
1578
}
1579

1580
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
342,787✔
1581
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
342,787✔
1582
  tDeleteSubscribeObj(pSub);
342,787✔
1583
  return 0;
342,787✔
1584
}
1585

1586
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
140,948✔
1587
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
140,948✔
1588
  mDebug("subscribe:%s, perform update action", pOldSub->key);
140,948✔
1589

1590
  taosWLockLatch(&pOldSub->lock);
140,948✔
1591
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
140,948✔
1592
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
140,948✔
1593
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
140,948✔
1594
  taosWUnLockLatch(&pOldSub->lock);
140,948✔
1595

1596
  return 0;
140,948✔
1597
}
1598

1599
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
3,487,623✔
1600
  if (pMnode == NULL || key == NULL || pSub == NULL) {
3,487,623✔
UNCOV
1601
    return TSDB_CODE_INVALID_PARA;
×
1602
  }
1603
  SSdb *pSdb = pMnode->pSdb;
3,487,623✔
1604
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
3,487,623✔
1605
  if (*pSub == NULL) {
3,487,623✔
1606
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
233,414✔
1607
  }
1608
  return 0;
3,254,209✔
1609
}
1610

1611
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
154,201✔
1612
  if (pMnode == NULL || topicName == NULL) return 0;
154,201✔
1613
  int32_t num = 0;
154,201✔
1614
  SSdb *  pSdb = pMnode->pSdb;
154,201✔
1615

1616
  void *           pIter = NULL;
154,201✔
1617
  SMqSubscribeObj *pSub = NULL;
154,201✔
1618
  while (1) {
201,130✔
1619
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
355,331✔
1620
    if (pIter == NULL) break;
355,331✔
1621

1622
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
201,130✔
1623
    char cgroup[TSDB_CGROUP_LEN] = {0};
201,130✔
1624
    taosRLockLatch(&pSub->lock);
201,130✔
1625
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
201,130✔
1626
    taosRUnLockLatch(&pSub->lock);
201,130✔
1627
    if (strcmp(topic, topicName) != 0) {
201,130✔
1628
      sdbRelease(pSdb, pSub);
102,630✔
1629
      continue;
102,630✔
1630
    }
1631

1632
    num++;
98,500✔
1633
    sdbRelease(pSdb, pSub);
98,500✔
1634
  }
1635

1636
  return num;
154,201✔
1637
}
1638

1639
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
3,370,916✔
1640
  if (pMnode == NULL || pSub == NULL) return;
3,370,916✔
1641
  SSdb *pSdb = pMnode->pSdb;
3,254,209✔
1642
  sdbRelease(pSdb, pSub);
3,254,209✔
1643
}
1644

1645
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
72,185✔
1646
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
72,185✔
1647
  int32_t code = 0;
72,185✔
1648
  int32_t lino = 0;
72,185✔
1649
  PRINT_LOG_START
72,185✔
1650
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
72,185✔
1651
  MND_TMQ_NULL_CHECK(pCommitRaw);
72,185✔
1652
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
72,185✔
1653
  if (code != 0) {
72,185✔
UNCOV
1654
    sdbFreeRaw(pCommitRaw);
×
UNCOV
1655
    goto END;
×
1656
  }
1657
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
72,185✔
1658
END:
72,185✔
1659
  PRINT_LOG_END
72,185✔
1660
  return code;
72,185✔
1661
}
1662

1663
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
138,427✔
1664
  int32_t code = 0;
138,427✔
1665
  int32_t lino = 0;
138,427✔
1666
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
138,427✔
1667
  char    cgroup[TSDB_CGROUP_LEN] = {0};
138,427✔
1668
  taosRLockLatch(&pSub->lock);
138,427✔
1669
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
138,427✔
1670
  if (strcmp(topic, topicName) != 0) {
138,427✔
1671
    goto END;
66,814✔
1672
  }
1673

1674
  // iter all vnode to delete handle
1675
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
71,613✔
UNCOV
1676
    code = TSDB_CODE_MND_IN_REBALANCE;
×
UNCOV
1677
    goto END;
×
1678
  }
1679

1680
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
71,613✔
1681
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
71,613✔
1682

1683
END:
138,427✔
1684
  taosRUnLockLatch(&pSub->lock);
138,427✔
1685

1686
  return code;
138,427✔
1687
}
1688

1689
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
94,735✔
1690
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
94,735✔
1691
  SSdb *           pSdb = pMnode->pSdb;
94,735✔
1692
  int32_t          code = 0;
94,735✔
1693
  int32_t          lino = 0;
94,735✔
1694
  void *           pIter = NULL;
94,735✔
1695
  SMqSubscribeObj *pSub = NULL;
94,735✔
1696
  PRINT_LOG_START
94,735✔
1697
  while (1) {
1698
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
233,162✔
1699
    if (pIter == NULL) break;
233,162✔
1700

1701
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
138,427✔
1702
    sdbRelease(pSdb, pSub);
138,427✔
1703
  }
1704

1705
END:
94,735✔
1706
  PRINT_LOG_END
94,735✔
1707
  sdbRelease(pSdb, pSub);
94,735✔
1708
  sdbCancelFetch(pSdb, pIter);
94,735✔
1709

1710
  TAOS_RETURN(code);
94,735✔
1711
}
1712

1713
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
136,226✔
1714
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1715
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
136,226✔
UNCOV
1716
    return TSDB_CODE_INVALID_PARA;
×
1717
  }
1718
  int32_t code = 0;
136,226✔
1719
  int32_t lino = 0;
136,226✔
1720
  PRINT_LOG_START
136,226✔
1721
  int32_t sz = taosArrayGetSize(vgs);
136,226✔
1722
  for (int32_t j = 0; j < sz; j++) {
258,078✔
1723
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
121,852✔
1724
    MND_TMQ_NULL_CHECK(pVgEp);
121,852✔
1725

1726
    SColumnInfoData *pColInfo = NULL;
121,852✔
1727
    int32_t          cols = 0;
121,852✔
1728

1729
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1730
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1731
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
121,852✔
1732

1733
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1734
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1735
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
121,852✔
1736

1737
    // vg id
1738
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1739
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1740
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
121,852✔
1741

1742
    // consumer id
1743
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
121,852✔
1744
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
121,852✔
1745
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
121,852✔
1746

1747
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1748
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1749
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
121,852✔
1750

1751
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
121,852✔
1752
    if (user) STR_TO_VARSTR(userStr, user);
121,852✔
1753
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1754
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1755
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
121,852✔
1756

1757
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
121,852✔
1758
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
121,852✔
1759
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
121,852✔
1760
    MND_TMQ_NULL_CHECK(pColInfo);
121,852✔
1761
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
121,852✔
1762

1763
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
121,852✔
1764
          varDataVal(cgroup), pVgEp->vgId);
1765

1766
    // offset
1767
    OffsetRows *data = NULL;
121,852✔
1768
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
363,204✔
1769
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
241,352✔
1770
      MND_TMQ_NULL_CHECK(tmp);
241,352✔
1771
      if (tmp->vgId != pVgEp->vgId) {
241,352✔
1772
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1773
        continue;
174,796✔
1774
      }
1775
      data = tmp;
66,556✔
1776
    }
1777
    if (data) {
121,852✔
1778
      // vg id
1779
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
66,556✔
1780
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
66,556✔
1781
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
133,112✔
1782
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
66,556✔
1783
      varDataSetLen(buf, strlen(varDataVal(buf)));
66,556✔
1784
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
66,556✔
1785
      MND_TMQ_NULL_CHECK(pColInfo);
66,556✔
1786
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
66,556✔
1787
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
66,556✔
1788
      MND_TMQ_NULL_CHECK(pColInfo);
66,556✔
1789
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
66,556✔
1790
    } else {
1791
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
55,296✔
1792
      MND_TMQ_NULL_CHECK(pColInfo);
55,296✔
1793
      colDataSetNULL(pColInfo, *numOfRows);
55,296✔
1794
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
55,296✔
1795
      MND_TMQ_NULL_CHECK(pColInfo);
55,296✔
1796
      colDataSetNULL(pColInfo, *numOfRows);
55,296✔
1797
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
55,296✔
1798
    }
1799
    (*numOfRows)++;
121,852✔
1800
  }
1801

1802
END:
136,226✔
1803
  PRINT_LOG_END
136,226✔
1804
  return code;
136,226✔
1805
}
1806

1807
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOperUser, bool showAll, SSDataBlock *pBlock,
76,958✔
1808
                           int32_t *numOfRows, int32_t rowsCapacity) {
1809
  int32_t        code = 0;
76,958✔
1810
  int32_t        lino = 0;
76,958✔
1811
  SMnode        *pMnode = pReq->info.node;
76,958✔
1812
  SSdb          *pSdb = pMnode->pSdb;
76,958✔
1813
  SMqConsumerEp *pConsumerEp = NULL;
76,958✔
1814
  SMqTopicObj   *pTopic = NULL;
76,958✔
1815
  void          *pIter = NULL;
76,958✔
1816
  bool           showTopic = false;
76,958✔
1817
  PRINT_LOG_START
76,958✔
1818

1819
  taosRLockLatch(&pSub->lock);
76,958✔
1820
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
76,958✔
UNCOV
1821
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1822
  }
1823

1824
  // topic and cgroup
1825
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
76,958✔
1826
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
76,958✔
1827
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
76,958✔
1828
  varDataSetLen(topic, strlen(varDataVal(topic)));
76,958✔
1829
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
76,958✔
1830

1831
  if (!showAll) {
76,958✔
NEW
1832
    (void)mndAcquireTopic(pMnode, topic, &pTopic);
×
NEW
1833
    if (pTopic) {
×
NEW
1834
      SName name = {0};  // 1.topic1
×
NEW
1835
      if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
×
NEW
1836
        if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_SUBSCRIPTION_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
×
NEW
1837
                                          pTopic->db, name.dbname)) {
×
NEW
1838
          showTopic = true;
×
1839
        }
1840
      }
1841
    }
1842
  }
1843

1844
  while (1) {
59,268✔
1845
    pIter = taosHashIterate(pSub->consumerHash, pIter);
136,226✔
1846
    if (pIter == NULL) break;
136,226✔
1847
    pConsumerEp = (SMqConsumerEp *)pIter;
59,268✔
1848

1849
    char           *user = NULL;
59,268✔
1850
    char           *fqdn = NULL;
59,268✔
1851
    bool            subscribeOwner = false;
59,268✔
1852
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
59,268✔
1853
    if (pConsumer != NULL) {
59,268✔
1854
      user = pConsumer->user;
59,268✔
1855
      fqdn = pConsumer->fqdn;
59,268✔
1856
      if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
59,268✔
1857
        subscribeOwner = true;
59,268✔
1858
      }
1859
      sdbRelease(pSdb, pConsumer);
59,268✔
1860
    }
1861
    if (!showAll && !showTopic && !subscribeOwner) {
59,268✔
NEW
1862
      continue;
×
1863
    }
1864
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
59,268✔
1865
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1866
  }
1867

1868
  MND_TMQ_RETURN_CHECK(
76,958✔
1869
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1870

1871
  pBlock->info.rows = *numOfRows;
76,958✔
1872

1873
END:
76,958✔
1874
  mndReleaseTopic(pMnode, pTopic);
76,958✔
1875
  taosRUnLockLatch(&pSub->lock);
76,958✔
1876
  taosHashCancelIterate(pSub->consumerHash, pIter);
76,958✔
1877
  PRINT_LOG_END
76,958✔
1878
  return code;
76,958✔
1879
}
1880

1881
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
21,817✔
1882
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
21,817✔
UNCOV
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885
  SMnode          *pMnode = pReq->info.node;
21,817✔
1886
  SSdb            *pSdb = pMnode->pSdb;
21,817✔
1887
  int32_t          numOfRows = 0;
21,817✔
1888
  SMqSubscribeObj *pSub = NULL;
21,817✔
1889
  SUserObj        *pOperUser = NULL;
21,817✔
1890
  int32_t          code = 0;
21,817✔
1891
  int32_t          lino = 0;
21,817✔
1892
  bool             showAll = false;
21,817✔
1893
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
21,817✔
1894

1895
  mInfo("mnd show subscriptions begin");
21,817✔
1896
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
21,817✔
1897
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
21,817✔
1898
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_SUBSCRIPTION_SHOW,
21,817✔
1899
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1900

1901
  while (numOfRows < rowsCapacity) {
98,775✔
1902
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
98,775✔
1903
    if (pShow->pIter == NULL) {
98,775✔
1904
      break;
21,817✔
1905
    }
1906

1907
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pOperUser, showAll, pBlock, &numOfRows, rowsCapacity));
76,958✔
1908

1909
    sdbRelease(pSdb, pSub);
76,958✔
1910
    pSub = NULL;
76,958✔
1911
  }
1912
  mInfo("mnd end show subscriptions");
21,817✔
1913
  pShow->numOfRows += numOfRows;
21,817✔
1914

1915
END:
21,817✔
1916
  sdbCancelFetch(pSdb, pShow->pIter);
21,817✔
1917
  sdbRelease(pSdb, pSub);
21,817✔
1918
  mndReleaseUser(pMnode, pOperUser);
21,817✔
1919

1920
  if (code != 0) {
21,817✔
UNCOV
1921
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
UNCOV
1922
    TAOS_RETURN(code);
×
1923
  } else {
1924
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
21,817✔
1925
    return numOfRows;
21,817✔
1926
  }
1927
}
1928

UNCOV
1929
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
UNCOV
1930
  if (pMnode == NULL) {
×
UNCOV
1931
    return;
×
1932
  }
UNCOV
1933
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1934
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc