• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4980

10 Mar 2026 08:57AM UTC coverage: 68.492% (-0.02%) from 68.512%
#4980

push

travis-ci

web-flow
fix: add retry while exec ci case test_stable_keep_compact.py. (#34729)

211901 of 309380 relevant lines covered (68.49%)

135171938.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.68
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_SUBSCRIBE_VER_NUMBER   3
29
#define MND_SUBSCRIBE_RESERVE_SIZE 64
30

31
//#define MND_CONSUMER_LOST_HB_CNT          6
32

33
static int32_t mqRebInExecCnt = 0;
34

35
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
36
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
37
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
39
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
41
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
42
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
44
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
45

46
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
285,353✔
47
  if (pTrans == NULL || pSub == NULL) {
285,353✔
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  int32_t code = 0;
285,353✔
51
  int32_t lino = 0;
285,353✔
52
  PRINT_LOG_START
285,353✔
53
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
285,353✔
54
  MND_TMQ_NULL_CHECK(pCommitRaw);
285,353✔
55
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
285,353✔
56
  if (code != 0) {
285,353✔
57
    sdbFreeRaw(pCommitRaw);
×
58
    goto END;
×
59
  }
60
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
285,353✔
61

62
END:
285,353✔
63
  PRINT_LOG_END
285,353✔
64
  return code;
285,353✔
65
}
66

67
int32_t mndInitSubscribe(SMnode *pMnode) {
437,394✔
68
  SSdbTable table = {
437,394✔
69
      .sdbType = SDB_SUBSCRIBE,
70
      .keyType = SDB_KEY_BINARY,
71
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
72
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
73
      .insertFp = (SdbInsertFp)mndSubActionInsert,
74
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
75
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
76
  };
77

78
  if (pMnode == NULL) {
437,394✔
79
    return TSDB_CODE_INVALID_PARA;
×
80
  }
81
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
437,394✔
82
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
437,394✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
437,394✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
437,394✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
437,394✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
437,394✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
437,394✔
89

90
  return sdbSetTable(pMnode->pSdb, table);
437,394✔
91
}
92

93
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
128,512✔
94
  int32_t code = 0;
128,512✔
95
  SSdb *  pSdb = pMnode->pSdb;
128,512✔
96
  SVgObj *pVgroup = NULL;
128,512✔
97

98
  void *pIter = NULL;
128,512✔
99
  while (1) {
735,679✔
100
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
864,191✔
101
    if (pIter == NULL) {
864,191✔
102
      break;
128,512✔
103
    }
104

105
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
735,679✔
106
      sdbRelease(pSdb, pVgroup);
410,596✔
107
      continue;
410,596✔
108
    }
109

110
    pSub->vgNum++;
325,083✔
111

112
    SMqVgEp pVgEp = {0};
325,083✔
113
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
325,083✔
114
    pVgEp.vgId = pVgroup->vgId;
325,083✔
115
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
650,166✔
116
      code = terrno;
×
117
      sdbRelease(pSdb, pVgroup);
×
118
      sdbCancelFetch(pSdb, pIter);
×
119
      goto END;
×
120
    }
121
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
325,083✔
122
    sdbRelease(pSdb, pVgroup);
325,083✔
123
  }
124

125
END:
128,512✔
126
  return code;
128,512✔
127
}
128

129
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
128,512✔
130
                                     SMqSubscribeObj **pSub) {
131
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
128,512✔
132
    return TSDB_CODE_INVALID_PARA;
×
133
  }
134
  int32_t lino = 0;
128,512✔
135
  int32_t code = 0;
128,512✔
136
  PRINT_LOG_START
128,512✔
137
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
128,512✔
138
  (*pSub)->dbUid = pTopic->dbUid;
128,512✔
139
  (*pSub)->stbUid = pTopic->stbUid;
128,512✔
140
  (*pSub)->subType = pTopic->subType;
128,512✔
141
  (*pSub)->withMeta = pTopic->withMeta;
128,512✔
142

143
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
128,512✔
144

145
END:
128,512✔
146
  PRINT_LOG_END
128,512✔
147
  return code;
128,512✔
148
}
149

150
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,871,924✔
151
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,871,924✔
152
    return;
×
153
  }
154
  int32_t i = 0;
1,871,924✔
155
  while (key[i] != TMQ_SEPARATOR_CHAR) {
13,443,995✔
156
    i++;
11,572,071✔
157
  }
158
  (void)memcpy(cgroup, key, i);
1,871,924✔
159
  cgroup[i] = 0;
1,871,924✔
160
  if (fullName) {
1,871,924✔
161
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,715,819✔
162
  } else {
163
    while (key[i] != '.') {
468,315✔
164
      i++;
312,210✔
165
    }
166
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
156,105✔
167
  }
168
}
169

170
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
868,664✔
171
                                    const SMqRebOutputVg *pRebVg) {
172
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
868,664✔
173
    return TSDB_CODE_INVALID_PARA;
×
174
  }
175
  SMqRebVgReq  req = {0};
868,664✔
176
  int32_t      code = 0;
868,664✔
177
  int32_t      lino = 0;
868,664✔
178
  SEncoder     encoder = {0};
868,664✔
179
  SMqTopicObj *pTopic = NULL;
868,664✔
180
  void *       buf = NULL;
868,664✔
181

182
  PRINT_LOG_START
868,664✔
183
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
868,664✔
184
  char cgroup[TSDB_CGROUP_LEN] = {0};
868,664✔
185
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
868,664✔
186
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
868,664✔
187
  taosRLockLatch(&pTopic->lock);
868,664✔
188
  req.oldConsumerId = pRebVg->oldConsumerId;
868,664✔
189
  req.newConsumerId = pRebVg->newConsumerId;
868,664✔
190
  req.vgId = pRebVg->pVgEp.vgId;
868,664✔
191
  req.qmsg = pTopic->physicalPlan;
868,664✔
192
  req.schema = pTopic->schema;
868,664✔
193
  req.subType = pSub->subType;
868,664✔
194
  req.withMeta = pSub->withMeta;
868,664✔
195
  req.suid = pSub->stbUid;
868,664✔
196
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
868,664✔
197

198
  int32_t tlen = 0;
868,664✔
199
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
868,664✔
200
  if (code < 0) {
868,664✔
201
    goto END;
×
202
  }
203

204
  tlen += sizeof(SMsgHead);
868,664✔
205
  buf = taosMemoryMalloc(tlen);
868,664✔
206
  MND_TMQ_NULL_CHECK(buf);
868,664✔
207
  SMsgHead *pMsgHead = (SMsgHead *)buf;
868,664✔
208
  pMsgHead->contLen = htonl(tlen);
868,664✔
209
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
868,664✔
210

211
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
868,664✔
212
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
868,664✔
213
  *pBuf = buf;
868,664✔
214
  buf = NULL;
868,664✔
215
  *pLen = tlen;
868,664✔
216

217
END:
868,664✔
218
  PRINT_LOG_END
868,664✔
219
  taosMemoryFree(buf);
868,664✔
220
  if (pTopic != NULL) {
868,664✔
221
    taosRUnLockLatch(&pTopic->lock);
868,664✔
222
  }
223
  mndReleaseTopic(pMnode, pTopic);
868,664✔
224
  tEncoderClear(&encoder);
868,664✔
225
  return code;
868,664✔
226
}
227

228
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
868,664✔
229
                                        const SMqRebOutputVg *pRebVg) {
230
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
868,664✔
231
    return TSDB_CODE_INVALID_PARA;
×
232
  }
233
  int32_t code = 0;
868,664✔
234
  int32_t lino = 0;
868,664✔
235
  void *  buf = NULL;
868,664✔
236
  PRINT_LOG_START
868,664✔
237
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
868,664✔
238
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
239
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
240
    goto END;
×
241
  }
242

243
  int32_t tlen = 0;
868,664✔
244
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
868,664✔
245
  int32_t vgId = pRebVg->pVgEp.vgId;
868,664✔
246
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
868,664✔
247
  if (pVgObj == NULL) {
868,664✔
248
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
249
    goto END;
×
250
  }
251

252
  STransAction action = {0};
868,664✔
253
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
868,664✔
254
  action.pCont = buf;
868,664✔
255
  buf = NULL;
868,664✔
256
  action.contLen = tlen;
868,664✔
257
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
868,664✔
258

259
  mndReleaseVgroup(pMnode, pVgObj);
868,664✔
260
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
868,664✔
261

262
END:
868,664✔
263
  PRINT_LOG_END
868,664✔
264
  taosMemoryFree(buf);
868,664✔
265
  return code;
868,664✔
266
}
267

268
static void freeRebalanceItem(void *param) {
351,228✔
269
  if (param == NULL) return;
351,228✔
270
  SMqRebInfo *pInfo = param;
351,228✔
271
  taosArrayDestroy(pInfo->newConsumers);
351,228✔
272
  taosArrayDestroy(pInfo->removedConsumers);
351,228✔
273
}
274

275
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
356,935✔
276
  if (pHash == NULL || key == NULL) {
356,935✔
277
    return TSDB_CODE_INVALID_PARA;
×
278
  }
279
  int32_t code = 0;
356,935✔
280
  int32_t lino = 0;
356,935✔
281
  PRINT_LOG_START
356,935✔
282
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
356,935✔
283
  if (pRebInfo == NULL) {
356,935✔
284
    pRebInfo = tNewSMqRebSubscribe(key);
351,228✔
285
    if (pRebInfo == NULL) {
351,228✔
286
      code = terrno;
×
287
      goto END;
×
288
    }
289
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
351,228✔
290
    if (code != 0) {
351,228✔
291
      freeRebalanceItem(pRebInfo);
×
292
      taosMemoryFreeClear(pRebInfo);
×
293
      goto END;
×
294
    }
295
    taosMemoryFreeClear(pRebInfo);
351,228✔
296

297
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
351,228✔
298
    MND_TMQ_NULL_CHECK(pRebInfo);
351,228✔
299
  }
300
  if (pReb) {
356,935✔
301
    *pReb = pRebInfo;
291,916✔
302
  }
303

304
END:
65,019✔
305
  PRINT_LOG_END
356,935✔
306
  return code;
356,935✔
307
}
308

309
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
996,382✔
310
  if (vgs == NULL || pHash == NULL || key == NULL) {
996,382✔
311
    return TSDB_CODE_INVALID_PARA;
×
312
  }
313
  int32_t code = 0;
996,382✔
314
  int32_t lino = 0;
996,382✔
315
  PRINT_LOG_START
996,382✔
316
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
996,382✔
317
  MND_TMQ_NULL_CHECK(pVgEp);
996,382✔
318
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
996,382✔
319
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
996,382✔
320
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
996,382✔
321

322
END:
996,058✔
323
  PRINT_LOG_END
996,382✔
324
  return code;
996,382✔
325
}
326

327
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
351,228✔
328
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
351,228✔
329
    return TSDB_CODE_INVALID_PARA;
×
330
  }
331
  int32_t code = 0;
351,228✔
332
  int32_t lino = 0;
351,228✔
333
  PRINT_LOG_START
351,228✔
334
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
351,228✔
335
  int32_t actualRemoved = 0;
351,228✔
336
  for (int32_t i = 0; i < numOfRemoved; i++) {
480,527✔
337
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
129,299✔
338
    MND_TMQ_NULL_CHECK(consumerId);
129,299✔
339
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
129,299✔
340
    if (pConsumerEp == NULL) {
129,299✔
341
      continue;
×
342
    }
343

344
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
129,299✔
345
    for (int32_t j = 0; j < consumerVgNum; j++) {
545,436✔
346
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
416,137✔
347
    }
348

349
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
129,299✔
350
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
258,598✔
351
    actualRemoved++;
129,299✔
352
  }
353

354
  if (numOfRemoved != actualRemoved) {
351,228✔
355
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
356
           numOfRemoved, actualRemoved);
357
  } else {
358
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
351,228✔
359
  }
360
END:
×
361
  PRINT_LOG_END
351,228✔
362
  return code;
351,228✔
363
}
364

365
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
351,228✔
366
  if (pOutput == NULL || pInput == NULL) {
351,228✔
367
    return TSDB_CODE_INVALID_PARA;
×
368
  }
369
  int32_t code = 0;
351,228✔
370
  int32_t lino = 0;
351,228✔
371
  PRINT_LOG_START
351,228✔
372
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
351,228✔
373

374
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
513,845✔
375
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
162,617✔
376
    MND_TMQ_NULL_CHECK(consumerId);
162,617✔
377
    SMqConsumerEp newConsumerEp = {0};
162,617✔
378
    newConsumerEp.consumerId = *consumerId;
162,617✔
379
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
162,617✔
380
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
162,617✔
381
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
162,617✔
382
        0) {
383
      freeSMqConsumerEp(&newConsumerEp);
×
384
      code = terrno;
×
385
      goto END;
×
386
    }
387
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
325,234✔
388
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
162,617✔
389
  }
390
END:
351,228✔
391
  PRINT_LOG_END
351,228✔
392
  return code;
351,228✔
393
}
394

395
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
351,228✔
396
  if (pOutput == NULL || pHash == NULL) {
351,228✔
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399
  int32_t code = 0;
351,228✔
400
  int32_t lino = 0;
351,228✔
401
  PRINT_LOG_START
351,228✔
402
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
351,228✔
403
  for (int32_t i = 0; i < numOfVgroups; i++) {
928,815✔
404
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
577,587✔
405
  }
406
END:
351,228✔
407
  PRINT_LOG_END
351,228✔
408
  return code;
351,228✔
409
}
410

411
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
351,228✔
412
                                        int32_t remainderVgCnt) {
413
  if (pOutput == NULL || pHash == NULL) {
351,228✔
414
    return TSDB_CODE_INVALID_PARA;
×
415
  }
416
  int32_t code = 0;
351,228✔
417
  int32_t lino = 0;
351,228✔
418
  int32_t cnt = 0;
351,228✔
419
  void *  pIter = NULL;
351,228✔
420
  PRINT_LOG_START
351,228✔
421

422
  while (1) {
70,830✔
423
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
422,058✔
424
    if (pIter == NULL) {
422,058✔
425
      break;
351,228✔
426
    }
427

428
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
70,830✔
429
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
70,830✔
430

431
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
141,660✔
432
    if (consumerVgNum > minVgCnt) {
70,830✔
433
      if (cnt < remainderVgCnt) {
1,937✔
434
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
608✔
435
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
436
        }
437
        cnt++;
608✔
438
      } else {
439
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
3,987✔
440
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
2,658✔
441
        }
442
      }
443
    }
444
  }
445
END:
351,228✔
446
  PRINT_LOG_END
351,228✔
447
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
351,228✔
448
  return code;
351,228✔
449
}
450

451
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
351,228✔
452
  if (pMnode == NULL || pOutput == NULL) {
351,228✔
453
    return TSDB_CODE_INVALID_PARA;
×
454
  }
455
  int32_t code = 0;
351,228✔
456
  int32_t lino = 0;
351,228✔
457
  int32_t totalVgNum = 0;
351,228✔
458
  SVgObj *pVgroup = NULL;
351,228✔
459
  void *  pIter = NULL;
351,228✔
460
  void *  pIterHash = NULL;
351,228✔
461
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
351,228✔
462
  MND_TMQ_NULL_CHECK(newVgs);
351,228✔
463
  while (1) {
1,989,506✔
464
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,340,734✔
465
    if (pIter == NULL) {
2,340,734✔
466
      break;
351,228✔
467
    }
468
    if (pVgroup->mountVgId) {
1,989,506✔
469
      sdbRelease(pMnode->pSdb, pVgroup);
×
470
      continue;
×
471
    }
472

473
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,989,506✔
474
      sdbRelease(pMnode->pSdb, pVgroup);
985,190✔
475
      continue;
985,190✔
476
    }
477

478
    totalVgNum++;
1,004,316✔
479
    SMqVgEp pVgEp = {0};
1,004,316✔
480
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,004,316✔
481
    pVgEp.vgId = pVgroup->vgId;
1,004,316✔
482
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
1,004,316✔
483
    sdbRelease(pMnode->pSdb, pVgroup);
1,004,316✔
484
  }
485

486
  while (1) {
200,129✔
487
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
551,357✔
488
    if (pIterHash == NULL) break;
551,357✔
489
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
200,129✔
490
    int32_t        j = 0;
200,129✔
491
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
691,877✔
492
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
491,748✔
493
      MND_TMQ_NULL_CHECK(pVgEpTmp);
491,748✔
494
      bool find = false;
491,748✔
495
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
635,130✔
496
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
570,111✔
497
        MND_TMQ_NULL_CHECK(pnewVgEp);
570,111✔
498
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
570,111✔
499
          taosArrayRemove(newVgs, k);
426,729✔
500
          find = true;
426,729✔
501
          break;
426,729✔
502
        }
503
      }
504
      if (!find) {
491,748✔
505
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
65,019✔
506
        taosArrayRemove(pConsumerEp->vgs, j);
65,019✔
507
        continue;
65,019✔
508
      }
509
      j++;
426,729✔
510
    }
511
  }
512

513
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
351,228✔
514
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
65,019✔
515
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
65,019✔
516
  }
517

518
END:
351,228✔
519
  sdbRelease(pMnode->pSdb, pVgroup);
351,228✔
520
  sdbCancelFetch(pMnode->pSdb, pIter);
351,228✔
521
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
351,228✔
522
  taosArrayDestroy(newVgs);
351,228✔
523
  if (code != 0) {
351,228✔
524
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
525
    return code;
×
526
  } else {
527
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
351,228✔
528
    return totalVgNum;
351,228✔
529
  }
530
}
531

532
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
351,228✔
533
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
351,228✔
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  void *           pIter = NULL;
351,228✔
537
  SMqSubscribeObj *pSub = NULL;
351,228✔
538
  int32_t          lino = 0;
351,228✔
539
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
351,228✔
540
  if (code != 0) {
351,228✔
541
    return 0;
128,512✔
542
  }
543
  taosRLockLatch(&pSub->lock);
222,716✔
544
  PRINT_LOG_START
222,716✔
545
  if (pOutput->pSub->offsetRows == NULL) {
222,716✔
546
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
161,917✔
547
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
161,917✔
548
  }
549
  while (1) {
200,129✔
550
    pIter = taosHashIterate(pSub->consumerHash, pIter);
422,845✔
551
    if (pIter == NULL) break;
422,845✔
552
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
200,129✔
553
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
200,129✔
554

555
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
680,161✔
556
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
480,032✔
557
      MND_TMQ_NULL_CHECK(d1);
480,032✔
558
      bool jump = false;
480,032✔
559
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
612,046✔
560
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
136,574✔
561
        MND_TMQ_NULL_CHECK(pVgEp);
136,574✔
562
        if (pVgEp->vgId == d1->vgId) {
136,574✔
563
          jump = true;
4,560✔
564
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
4,560✔
565
                pConsumerEp->consumerId, pVgEp->vgId);
566
          break;
4,560✔
567
        }
568
      }
569
      if (jump) continue;
480,032✔
570
      bool find = false;
475,472✔
571
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
1,227,324✔
572
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
876,274✔
573
        MND_TMQ_NULL_CHECK(d2);
876,274✔
574
        if (d1->vgId == d2->vgId) {
876,274✔
575
          d2->rows += d1->rows;
124,422✔
576
          d2->offset = d1->offset;
124,422✔
577
          d2->ever = d1->ever;
124,422✔
578
          find = true;
124,422✔
579
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
124,422✔
580
          break;
124,422✔
581
        }
582
      }
583
      if (!find) {
475,472✔
584
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
702,100✔
585
      }
586
    }
587
  }
588

589
END:
222,716✔
590
  taosRUnLockLatch(&pSub->lock);
222,716✔
591
  taosHashCancelIterate(pSub->consumerHash, pIter);
222,716✔
592
  mndReleaseSubscribe(pMnode, pSub);
222,716✔
593
  PRINT_LOG_END
222,716✔
594
  return code;
222,716✔
595
}
596

597
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
351,228✔
598
  if (pOutput == NULL) return;
351,228✔
599
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
351,228✔
600
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,347,610✔
601
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
996,382✔
602
    if (pOutputRebVg == NULL) continue;
996,382✔
603
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
996,382✔
604
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
605
  }
606

607
  void *pIter = NULL;
351,228✔
608
  while (1) {
233,447✔
609
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
584,675✔
610
    if (pIter == NULL) break;
584,675✔
611
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
233,447✔
612
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
233,447✔
613
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
233,447✔
614
          pConsumerEp->consumerId, sz);
615
    for (int32_t i = 0; i < sz; i++) {
829,266✔
616
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
595,819✔
617
      if (pVgEp == NULL) continue;
595,819✔
618
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
595,819✔
619
            pConsumerEp->consumerId);
620
    }
621
  }
622
}
623

624
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
351,228✔
625
                           int32_t *remainderVgCnt) {
626
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
351,228✔
627
    return;
×
628
  }
629
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
351,228✔
630
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
351,228✔
631
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
351,228✔
632

633
  // calc num
634
  if (numOfFinal != 0) {
351,228✔
635
    *minVgCnt = totalVgNum / numOfFinal;
227,867✔
636
    *remainderVgCnt = totalVgNum % numOfFinal;
227,867✔
637
  } else {
638
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
123,361✔
639
  }
640
  mInfo(
351,228✔
641
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
642
      "remainderVg:%d",
643
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
644
}
645

646
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
351,228✔
647
  if (pOutput == NULL || pHash == NULL) {
351,228✔
648
    return TSDB_CODE_INVALID_PARA;
×
649
  }
650
  SMqRebOutputVg *pRebVg = NULL;
351,228✔
651
  void *          pAssignIter = NULL;
351,228✔
652
  void *          pIter = NULL;
351,228✔
653
  int32_t         code = 0;
351,228✔
654
  int32_t         lino = 0;
351,228✔
655
  PRINT_LOG_START
351,228✔
656

657
  while (1) {
233,447✔
658
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
584,675✔
659
    if (pIter == NULL) {
584,675✔
660
      break;
351,228✔
661
    }
662
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
233,447✔
663
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
820,400✔
664
      pAssignIter = taosHashIterate(pHash, pAssignIter);
586,953✔
665
      if (pAssignIter == NULL) {
586,953✔
666
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
667
        break;
×
668
      }
669

670
      pRebVg = (SMqRebOutputVg *)pAssignIter;
586,953✔
671
      pRebVg->newConsumerId = pConsumerEp->consumerId;
586,953✔
672
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,173,906✔
673
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
586,953✔
674
            pConsumerEp->consumerId);
675
    }
676
  }
677

678
  while (1) {
932✔
679
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
352,160✔
680
    if (pIter == NULL) {
352,160✔
681
      break;
123,361✔
682
    }
683
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
228,799✔
684
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
228,799✔
685
      pAssignIter = taosHashIterate(pHash, pAssignIter);
228,799✔
686
      if (pAssignIter == NULL) {
228,799✔
687
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
227,867✔
688
        break;
227,867✔
689
      }
690

691
      pRebVg = (SMqRebOutputVg *)pAssignIter;
932✔
692
      pRebVg->newConsumerId = pConsumerEp->consumerId;
932✔
693
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,864✔
694
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
932✔
695
            pConsumerEp->consumerId);
696
    }
697
  }
698

699
  if (pAssignIter != NULL) {
351,228✔
700
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
701
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
702
    goto END;
×
703
  }
704
  while (1) {
996,382✔
705
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,347,610✔
706
    if (pAssignIter == NULL) {
1,347,610✔
707
      break;
351,228✔
708
    }
709

710
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
996,382✔
711
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,992,764✔
712
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
996,382✔
713
      MND_TMQ_NULL_CHECK(
816,994✔
714
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
715
    }
716
  }
717

718
END:
351,228✔
719
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
351,228✔
720
  taosHashCancelIterate(pHash, pAssignIter);
351,228✔
721
  PRINT_LOG_END
351,228✔
722
  return code;
351,228✔
723
}
724

725
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
351,228✔
726
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
351,228✔
727
    return TSDB_CODE_INVALID_PARA;
×
728
  }
729
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
351,228✔
730
  if (totalVgNum < 0) {
351,228✔
731
    return totalVgNum;
×
732
  }
733
  const char *pSubKey = pOutput->pSub->key;
351,228✔
734
  int32_t     minVgCnt = 0;
351,228✔
735
  int32_t     remainderVgCnt = 0;
351,228✔
736
  int32_t     code = 0;
351,228✔
737
  int32_t     lino = 0;
351,228✔
738
  PRINT_LOG_START
351,228✔
739
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
351,228✔
740
  MND_TMQ_NULL_CHECK(pHash);
351,228✔
741
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
351,228✔
742
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
351,228✔
743
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
351,228✔
744
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
351,228✔
745
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
351,228✔
746
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
351,228✔
747
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
351,228✔
748
  printRebalanceLog(pOutput);
351,228✔
749

750
END:
351,228✔
751
  taosHashCleanup(pHash);
351,228✔
752
  PRINT_LOG_END
351,228✔
753
  return code;
351,228✔
754
}
755

756
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
856,059✔
757
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
856,059✔
758
    return TSDB_CODE_INVALID_PARA;
×
759
  }
760
  int32_t code = 0;
856,059✔
761
  int32_t lino = 0;
856,059✔
762
  PRINT_LOG_START
856,059✔
763
  SMqConsumerObj *pConsumerNew = NULL;
856,059✔
764
  int32_t         consumerNum = taosArrayGetSize(consumers);
856,059✔
765
  for (int32_t i = 0; i < consumerNum; i++) {
1,152,930✔
766
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
296,871✔
767
    MND_TMQ_NULL_CHECK(consumerId);
296,871✔
768
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
296,871✔
769
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
296,871✔
770
    tDeleteSMqConsumerObj(pConsumerNew);
296,871✔
771
  }
772
  pConsumerNew = NULL;
856,059✔
773

774
END:
856,059✔
775
  PRINT_LOG_END
856,059✔
776
  tDeleteSMqConsumerObj(pConsumerNew);
856,059✔
777
  return code;
856,059✔
778
}
779

780
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
285,353✔
781
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
285,353✔
782
    return TSDB_CODE_INVALID_PARA;
×
783
  }
784
  int32_t code = 0;
285,353✔
785
  int32_t lino = 0;
285,353✔
786
  PRINT_LOG_START
285,353✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
285,353✔
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
285,353✔
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
285,353✔
790
END:
285,353✔
791
  PRINT_LOG_END
285,353✔
792
  return code;
285,353✔
793
}
794

795
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
351,228✔
796
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
351,228✔
797
    return TSDB_CODE_INVALID_PARA;
×
798
  }
799
  int32_t code = 0;
351,228✔
800
  int32_t lino = 0;
351,228✔
801
  STrans *pTrans = NULL;
351,228✔
802
  PRINT_LOG_START
351,228✔
803

804
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
351,228✔
805
  char cgroup[TSDB_CGROUP_LEN] = {0};
351,228✔
806
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
351,228✔
807

808
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
351,228✔
809
  if (pTrans == NULL) {
351,228✔
810
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
811
    if (terrno != 0) code = terrno;
×
812
    goto END;
×
813
  }
814

815
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
351,228✔
816
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
351,228✔
817

818
  // 1. redo action: action to all vg
819
  const SArray *rebVgs = pOutput->rebVgs;
285,353✔
820
  int32_t       vgNum = taosArrayGetSize(rebVgs);
285,353✔
821
  for (int32_t i = 0; i < vgNum; i++) {
1,154,017✔
822
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
868,664✔
823
    MND_TMQ_NULL_CHECK(pRebVg);
868,664✔
824
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
868,664✔
825
  }
826

827
  // 2. commit log: subscribe and vg assignment
828
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
285,353✔
829

830
  // 3. commit log: consumer to update status and epoch
831
  if (!pOutput->isReload) {
285,353✔
832
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
285,353✔
833
  }
834

835
  // 4. set cb
836
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
285,353✔
837

838
  // 5. execution
839
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
285,353✔
840

841
END:
351,228✔
842
  mndTransDrop(pTrans);
351,228✔
843
  PRINT_LOG_END
351,228✔
844
  TAOS_RETURN(code);
351,228✔
845
}
846

847
// type = 0 remove  type = 1 add
848
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
569,074✔
849
  if (rebSubHash == NULL || topicList == NULL) {
569,074✔
850
    return TSDB_CODE_INVALID_PARA;
×
851
  }
852
  int32_t code = 0;
569,074✔
853
  int32_t lino = 0;
569,074✔
854
  PRINT_LOG_START
569,074✔
855
  int32_t topicNum = taosArrayGetSize(topicList);
569,074✔
856
  for (int32_t i = 0; i < topicNum; i++) {
860,990✔
857
    char *removedTopic = taosArrayGetP(topicList, i);
291,916✔
858
    MND_TMQ_NULL_CHECK(removedTopic);
291,916✔
859
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
291,916✔
860
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
291,916✔
861
    SMqRebInfo *pRebSub = NULL;
291,916✔
862
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
291,916✔
863
    if (type == 0)
291,916✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
258,598✔
865
    else if (type == 1)
162,617✔
866
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
325,234✔
867
  }
868

869
END:
569,074✔
870
  PRINT_LOG_END
569,074✔
871
  return code;
569,074✔
872
}
873

874
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
1,067,741✔
875
  SMqSubscribeObj *pSub = NULL;
1,067,741✔
876
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,067,741✔
877
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,067,741✔
878
  int32_t code = 0;
1,067,741✔
879
  int32_t lino = 0;
1,067,741✔
880

881
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
1,067,741✔
882
  taosRLockLatch(&pSub->lock);
1,067,741✔
883
  // iterate all vg assigned to the consumer of that topic
884
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,067,741✔
885
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,067,741✔
886
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,067,741✔
887
  for (int32_t j = 0; j < vgNum; j++) {
3,506,356✔
888
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
2,438,615✔
889
    if (pVgEp == NULL) {
2,438,615✔
890
      continue;
×
891
    }
892
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,438,615✔
893
    if (!pVgroup) {
2,438,615✔
894
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
65,019✔
895
      if (code != 0) {
65,019✔
896
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
897
      } else {
898
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
65,019✔
899
      }
900
    }
901
    mndReleaseVgroup(pMnode, pVgroup);
2,438,615✔
902
  }
903

904
END:
1,067,741✔
905
  if (pSub != NULL) {
1,067,741✔
906
    taosRUnLockLatch(&pSub->lock);
1,067,741✔
907
  }
908
  mndReleaseSubscribe(pMnode, pSub);
1,067,741✔
909
}
1,067,741✔
910

911
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
1,027,472✔
912
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
1,027,472✔
913
    return;
×
914
  }
915
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
1,027,472✔
916
  for (int32_t i = 0; i < newTopicNum; i++) {
2,095,213✔
917
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,067,741✔
918
    if (topic == NULL) {
1,067,741✔
919
      continue;
×
920
    }
921
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
1,067,741✔
922
  }
923
}
924

925
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,314,331✔
926
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,625,712✔
927
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,311,381✔
928
}
929

930
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,429,144✔
931
  int32_t code = 0;
1,429,144✔
932
  int32_t lino = 0;
1,429,144✔
933
  PRINT_LOG_START
1,429,144✔
934
  taosRLockLatch(&pConsumer->lock);
1,429,144✔
935

936
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,429,144✔
937
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,429,144✔
938
  int32_t status = atomic_load_32(&pConsumer->status);
1,429,144✔
939

940
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,429,144✔
941
         ", hbstatus:%d, pollStatus:%d",
942
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
943
         hbStatus, pollStatus);
944

945
  if (status == MQ_CONSUMER_STATUS_READY) {
1,429,144✔
946
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,146,267✔
947
      MND_TMQ_RETURN_CHECK(
114,813✔
948
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
949
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
1,031,454✔
950
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,982✔
951
            ", hb lost cnt:%d, or long time no poll cnt:%d",
952
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
953
            pConsumer->createTime, hbStatus, pollStatus);
954
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,982✔
955
    } else {
956
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
1,027,472✔
957
    }
958
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
282,877✔
959
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
282,546✔
960
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
282,546✔
961
  } else {
962
    MND_TMQ_RETURN_CHECK(
331✔
963
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
964
  }
965

966
END:
331✔
967
  taosRUnLockLatch(&pConsumer->lock);
1,429,144✔
968
  PRINT_LOG_END
1,429,144✔
969
  return code;
1,429,144✔
970
}
971

972
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
14,875,371✔
973
  if (pMsg == NULL || rebSubHash == NULL) {
14,875,371✔
974
    return TSDB_CODE_INVALID_PARA;
×
975
  }
976
  SMnode *        pMnode = pMsg->info.node;
14,875,371✔
977
  SSdb *          pSdb = pMnode->pSdb;
14,875,371✔
978
  SMqConsumerObj *pConsumer = NULL;
14,875,371✔
979
  void *          pIter = NULL;
14,875,371✔
980
  int32_t         code = 0;
14,875,371✔
981
  int32_t         lino = 0;
14,875,371✔
982
  PRINT_LOG_START
14,875,371✔
983

984
  // iterate all consumers, find all modification
985
  while (1) {
986
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
16,304,515✔
987
    if (pIter == NULL) {
16,304,515✔
988
      break;
14,875,371✔
989
    }
990
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,429,144✔
991
    mndReleaseConsumer(pMnode, pConsumer);
1,429,144✔
992
  }
993
END:
14,875,371✔
994
  PRINT_LOG_END
14,875,371✔
995
  sdbCancelFetch(pSdb, pIter);
14,875,371✔
996
  mndReleaseConsumer(pMnode, pConsumer);
14,875,371✔
997
  return code;
14,875,371✔
998
}
999

1000
bool mndRebTryStart() {
14,876,411✔
1001
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
14,876,411✔
1002
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
14,876,411✔
1003
}
1004

1005
void mndRebCntInc() {
288,539✔
1006
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
288,539✔
1007
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
288,539✔
1008
}
288,539✔
1009

1010
void mndRebCntDec() {
15,163,910✔
1011
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
15,163,910✔
1012
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
15,163,910✔
1013
}
15,163,910✔
1014

1015
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
702,456✔
1016
  if (rebOutput == NULL) {
702,456✔
1017
    return;
351,228✔
1018
  }
1019
  taosArrayDestroy(rebOutput->newConsumers);
351,228✔
1020
  rebOutput->newConsumers = NULL;
351,228✔
1021
  taosArrayDestroy(rebOutput->modifyConsumers);
351,228✔
1022
  rebOutput->modifyConsumers = NULL;
351,228✔
1023
  taosArrayDestroy(rebOutput->removedConsumers);
351,228✔
1024
  rebOutput->removedConsumers = NULL;
351,228✔
1025
  taosArrayDestroy(rebOutput->rebVgs);
351,228✔
1026
  rebOutput->rebVgs = NULL;
351,228✔
1027
  tDeleteSubscribeObj(rebOutput->pSub);
351,228✔
1028
  taosMemoryFree(rebOutput->pSub);
351,228✔
1029
  rebOutput->pSub = NULL;
351,228✔
1030
}
1031

1032
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
351,228✔
1033
  if (rebOutput == NULL) {
351,228✔
1034
    return TSDB_CODE_INVALID_PARA;
×
1035
  }
1036
  int32_t code = 0;
351,228✔
1037
  int32_t lino = 0;
351,228✔
1038
  PRINT_LOG_START
351,228✔
1039
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
351,228✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
351,228✔
1041
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
351,228✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
351,228✔
1043
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
351,228✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
351,228✔
1045
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
351,228✔
1046
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
351,228✔
1047
  rebOutput = NULL;
351,228✔
1048

1049
END:
351,228✔
1050
  PRINT_LOG_END
351,228✔
1051
  clearRebOutput(rebOutput);
351,228✔
1052
  return code;
351,228✔
1053
}
1054

1055
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
351,228✔
1056
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
351,228✔
1057
    return TSDB_CODE_INVALID_PARA;
×
1058
  }
1059
  const char *     key = rebInput->pRebInfo->key;
351,228✔
1060
  SMqSubscribeObj *pSub = NULL;
351,228✔
1061
  SMqTopicObj *    pTopic = NULL;
351,228✔
1062
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
351,228✔
1063
  int32_t          lino = 0;
351,228✔
1064
  PRINT_LOG_START
351,228✔
1065

1066
  if (code != 0) {
351,228✔
1067
    // split sub key and extract topic
1068
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
128,512✔
1069
    char cgroup[TSDB_CGROUP_LEN] = {0};
128,512✔
1070
    mndSplitSubscribeKey(key, topic, cgroup, true);
128,512✔
1071
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
128,512✔
1072
    taosRLockLatch(&pTopic->lock);
128,512✔
1073
    rebInput->oldConsumerNum = 0;
128,512✔
1074
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
128,512✔
1075
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
128,512✔
1076
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
128,512✔
1077
  } else {
1078
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
222,716✔
1079
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
222,716✔
1080

1081
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
222,716✔
1082
          taosHashGetSize(rebOutput->pSub->consumerHash));
1083
  }
1084

1085
END:
350,904✔
1086
  PRINT_LOG_END
351,228✔
1087
  if (pTopic != NULL) {
351,228✔
1088
    taosRUnLockLatch(&pTopic->lock);
128,512✔
1089
  }
1090
  mndReleaseTopic(pMnode, pTopic);
351,228✔
1091
  mndReleaseSubscribe(pMnode, pSub);
351,228✔
1092
  return code;
351,228✔
1093
}
1094

1095
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1096
  int32_t code = 0;
×
1097
  int32_t lino = 0;
×
1098

1099
  void *pIterConsumer = NULL;
×
1100

1101
  PRINT_LOG_START
×
1102
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1103
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1104

1105
  SMqConsumerEp *pConsumerEp = NULL;
×
1106

1107
  while (1) {
1108
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1109
    if (pIterConsumer == NULL) break;
×
1110
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1111

1112
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1113
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1114
      MND_TMQ_NULL_CHECK(pVgEp);
×
1115
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1116
      MND_TMQ_NULL_CHECK(vg);
×
1117
      vg->pVgEp = *pVgEp;
×
1118
      vg->oldConsumerId = -1;
×
1119
      vg->newConsumerId = pConsumerEp->consumerId;
×
1120
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1121
             vg->newConsumerId);
1122
    }
1123
  }
1124
END:
×
1125
  PRINT_LOG_END
×
1126
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1127
  return code;
×
1128
}
1129

1130
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1131
  int32_t code = 0;
×
1132
  int32_t lino = 0;
×
1133

1134
  SMnode *        pMnode = pMsg->info.node;
×
1135
  SMqRebOutputObj rebOutput = {0};
×
1136

1137
  PRINT_LOG_START
×
1138
  taosRLockLatch(&pSub->lock);
×
1139

1140
  // topic and cgroup
1141
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1142
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1143
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1144
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1145
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1146
    goto END;
×
1147
  }
1148

1149
  rebOutput.pSub = pSub;
×
1150
  rebOutput.isReload = true;
×
1151
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1152
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1153
  if (code != 0) {
×
1154
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1155
  }
1156

1157
END:
×
1158
  taosRUnLockLatch(&pSub->lock);
×
1159
  rebOutput.pSub = NULL;  // avoid double free
×
1160
  clearRebOutput(&rebOutput);
×
1161
  PRINT_LOG_END
×
1162
  return code;
×
1163
}
1164

1165
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1166
  SMnode *pMnode = pMsg->info.node;
×
1167

1168
  SSdb *           pSdb = pMnode->pSdb;
×
1169
  SMqSubscribeObj *pSub = NULL;
×
1170
  int32_t          code = 0;
×
1171
  int32_t          lino = 0;
×
1172

1173
  PRINT_LOG_START
×
1174
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1175
  void *pIter = NULL;
×
1176
  while (1) {
1177
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1178
    if (pIter == NULL) {
×
1179
      break;
×
1180
    }
1181

1182
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1183
    sdbRelease(pSdb, pSub);
×
1184
  }
1185
  taosHashClear(topicsToReload);
×
1186
END:
×
1187
  sdbCancelFetch(pSdb, pIter);
×
1188
  sdbRelease(pSdb, pSub);
×
1189
  PRINT_LOG_END
×
1190

1191
  return code;
×
1192
}
1193

1194
static int32_t normalRebalance(SRpcMsg *pMsg) {
14,875,371✔
1195
  int     code = 0;
14,875,371✔
1196
  int32_t lino = 0;
14,875,371✔
1197

1198
  void *  pIter = NULL;
14,875,371✔
1199
  SMnode *pMnode = pMsg->info.node;
14,875,371✔
1200

1201
  PRINT_LOG_START
14,875,371✔
1202
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
14,875,371✔
1203
  MND_TMQ_NULL_CHECK(rebSubHash);
14,875,371✔
1204

1205
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
14,875,371✔
1206

1207
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
14,875,371✔
1208
  if (taosHashGetSize(rebSubHash) > 0) {
14,875,371✔
1209
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
307,782✔
1210
  }
1211

1212
  while (1) {
351,228✔
1213
    pIter = taosHashIterate(rebSubHash, pIter);
15,226,599✔
1214
    if (pIter == NULL) {
15,226,599✔
1215
      break;
14,875,371✔
1216
    }
1217

1218
    SMqRebInputObj  rebInput = {0};
351,228✔
1219
    SMqRebOutputObj rebOutput = {0};
351,228✔
1220
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
351,228✔
1221
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
351,228✔
1222
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
351,228✔
1223
    if (code != 0) {
351,228✔
1224
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1225
    }
1226

1227
    if (code == 0) {
351,228✔
1228
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
351,228✔
1229
      if (code != 0) {
351,228✔
1230
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1231
      }
1232
    }
1233

1234
    if (code == 0) {
351,228✔
1235
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
351,228✔
1236
      if (code != 0) {
351,228✔
1237
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
65,875✔
1238
      }
1239
    }
1240

1241
    clearRebOutput(&rebOutput);
351,228✔
1242
  }
1243

1244
  if (taosHashGetSize(rebSubHash) > 0) {
14,875,371✔
1245
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
307,782✔
1246
  }
1247

1248
END:
14,567,589✔
1249
  PRINT_LOG_END
14,875,371✔
1250
  taosHashCancelIterate(rebSubHash, pIter);
14,875,371✔
1251
  taosHashCleanup(rebSubHash);
14,875,371✔
1252
  return code;
14,875,371✔
1253
}
1254

1255
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
14,876,411✔
1256
  if (pMsg == NULL) {
14,876,411✔
1257
    return TSDB_CODE_INVALID_PARA;
×
1258
  }
1259
  int     code = 0;
14,876,411✔
1260
  int32_t lino = 0;
14,876,411✔
1261

1262
  void *  pIter = NULL;
14,876,411✔
1263
  SMnode *pMnode = pMsg->info.node;
14,876,411✔
1264
  PRINT_LOG_START;
14,876,411✔
1265
  if (!mndRebTryStart()) {
14,876,411✔
1266
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
1,040✔
1267
    goto END;
1,040✔
1268
  }
1269

1270
  if (taosHashGetSize(topicsToReload) > 0) {
14,875,371✔
1271
    code = reloadRebalance(pMsg);
×
1272
  } else {
1273
    code = normalRebalance(pMsg);
14,875,371✔
1274
  }
1275

1276
  mndRebCntDec();
14,875,371✔
1277

1278
END:
14,876,411✔
1279
  PRINT_LOG_END
14,876,411✔
1280
  TAOS_RETURN(code);
14,876,411✔
1281
}
1282

1283
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
77,339✔
1284
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
77,339✔
1285
    return TSDB_CODE_INVALID_PARA;
×
1286
  }
1287
  void *  pIter = NULL;
77,339✔
1288
  SVgObj *pVgObj = NULL;
77,339✔
1289
  int32_t code = 0;
77,339✔
1290
  int32_t lino = 0;
77,339✔
1291
  PRINT_LOG_START
77,339✔
1292
  while (1) {
555,624✔
1293
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
632,963✔
1294
    if (pIter == NULL) {
632,963✔
1295
      break;
77,339✔
1296
    }
1297
    if (pVgObj->mountVgId) {
555,624✔
1298
      sdbRelease(pMnode->pSdb, pVgObj);
×
1299
      continue;
×
1300
    }
1301

1302
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
555,624✔
1303
      sdbRelease(pMnode->pSdb, pVgObj);
297,912✔
1304
      continue;
297,912✔
1305
    }
1306
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
257,712✔
1307
    MND_TMQ_NULL_CHECK(pReq);
257,712✔
1308
    pReq->head.vgId = htonl(pVgObj->vgId);
257,712✔
1309
    pReq->vgId = pVgObj->vgId;
257,712✔
1310
    pReq->consumerId = -1;
257,712✔
1311
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
257,712✔
1312

1313
    STransAction action = {0};
257,712✔
1314
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
257,712✔
1315
    action.pCont = pReq;
257,712✔
1316
    action.contLen = sizeof(SMqVDeleteReq);
257,712✔
1317
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
257,712✔
1318
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
257,712✔
1319

1320
    sdbRelease(pMnode->pSdb, pVgObj);
257,712✔
1321
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
257,712✔
1322
  }
1323

1324
END:
77,339✔
1325
  PRINT_LOG_END
77,339✔
1326
  sdbRelease(pMnode->pSdb, pVgObj);
77,339✔
1327
  sdbCancelFetch(pMnode->pSdb, pIter);
77,339✔
1328
  return code;
77,339✔
1329
}
1330

1331
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
602✔
1332
                                   char *cgroup) {
1333
  int32_t         code = 0;
602✔
1334
  int32_t         lino = 0;
602✔
1335
  SMqConsumerObj *pConsumerNew = NULL;
602✔
1336

1337
  taosRLockLatch(&pConsumer->lock);
602✔
1338

1339
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
602✔
1340
    goto END;
×
1341
  }
1342

1343
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
602✔
1344
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
602✔
1345
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
602✔
1346
  if (found1 || found2 || found3) {
602✔
1347
    if (deleteConsumer) {
331✔
1348
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
331✔
1349
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
331✔
1350
      tDeleteSMqConsumerObj(pConsumerNew);
331✔
1351
      pConsumerNew = NULL;
331✔
1352
    } else {
1353
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1354
             pConsumer->consumerId, pConsumer->cgroup);
1355
      code = TSDB_CODE_MND_CGROUP_USED;
×
1356
      goto END;
×
1357
    }
1358
  }
1359

1360
END:
602✔
1361
  tDeleteSMqConsumerObj(pConsumerNew);
602✔
1362
  taosRUnLockLatch(&pConsumer->lock);
602✔
1363
  return code;
602✔
1364
}
1365

1366
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
602✔
1367
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
602✔
1368
    return TSDB_CODE_INVALID_PARA;
×
1369
  }
1370
  void *          pIter = NULL;
602✔
1371
  SMqConsumerObj *pConsumer = NULL;
602✔
1372
  int             code = 0;
602✔
1373
  int32_t         lino = 0;
602✔
1374
  PRINT_LOG_START
602✔
1375
  while (1) {
1376
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,204✔
1377
    if (pIter == NULL) {
1,204✔
1378
      break;
602✔
1379
    }
1380
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
602✔
1381
    sdbRelease(pMnode->pSdb, pConsumer);
602✔
1382
  }
1383

1384
END:
602✔
1385
  sdbRelease(pMnode->pSdb, pConsumer);
602✔
1386
  sdbCancelFetch(pMnode->pSdb, pIter);
602✔
1387
  return code;
602✔
1388
}
1389

1390
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
602✔
1391
  int32_t code = 0;
602✔
1392
  int32_t lino = 0;
602✔
1393
  STrans *pTrans = NULL;
602✔
1394
  SMnode *pMnode = pMsg->info.node;
602✔
1395
  PRINT_LOG_START
602✔
1396
  taosRLockLatch(&pSub->lock);
602✔
1397
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
602✔
1398
    code = TSDB_CODE_MND_CGROUP_USED;
×
1399
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1400
    goto END;
×
1401
  }
1402

1403
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
602✔
1404
  MND_TMQ_NULL_CHECK(pTrans);
602✔
1405
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
602✔
1406
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
602✔
1407
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
602✔
1408
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
602✔
1409
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
602✔
1410
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
602✔
1411
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
602✔
1412

1413
END:
602✔
1414
  taosRUnLockLatch(&pSub->lock);
602✔
1415
  mndTransDrop(pTrans);
602✔
1416
  PRINT_LOG_END
602✔
1417
  return code;
602✔
1418
}
1419

1420
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
602✔
1421
  if (pMsg == NULL) {
602✔
1422
    return TSDB_CODE_INVALID_PARA;
×
1423
  }
1424
  SMnode *        pMnode = pMsg->info.node;
602✔
1425
  SMDropCgroupReq dropReq = {0};
602✔
1426
  int32_t         code = 0;
602✔
1427
  int32_t         lino = 0;
602✔
1428

1429
  SMqSubscribeObj *pSub = NULL;
602✔
1430

1431
  PRINT_LOG_START
602✔
1432
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
602✔
1433
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
602✔
1434
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
602✔
1435
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
602✔
1436
  if (code != 0) {
602✔
1437
    if (dropReq.igNotExists) {
×
1438
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1439
      mndReleaseSubscribe(pMnode, pSub);
×
1440
      return 0;
×
1441
    } else {
1442
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1443
      goto END;
×
1444
    }
1445
  }
1446
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
602✔
1447

1448
END:
602✔
1449
  mndReleaseSubscribe(pMnode, pSub);
602✔
1450
  PRINT_LOG_END
602✔
1451

1452
  if (code != 0) {
602✔
1453
    TAOS_RETURN(code);
×
1454
  }
1455
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
602✔
1456
}
1457

1458
void mndCleanupSubscribe(SMnode *pMnode) {}
437,329✔
1459

1460
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
467,738✔
1461
  if (pSub == NULL) {
467,738✔
1462
    return NULL;
×
1463
  }
1464
  int32_t code = 0;
467,738✔
1465
  int32_t lino = 0;
467,738✔
1466
  terrno = TSDB_CODE_OUT_OF_MEMORY;
467,738✔
1467
  void *  buf = NULL;
467,738✔
1468
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
467,738✔
1469
  if (tlen <= 0) goto SUB_ENCODE_OVER;
467,738✔
1470
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
467,738✔
1471

1472
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
467,738✔
1473
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
467,738✔
1474

1475
  buf = taosMemoryMalloc(tlen);
467,738✔
1476
  if (buf == NULL) goto SUB_ENCODE_OVER;
467,738✔
1477

1478
  void *abuf = buf;
467,738✔
1479
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
467,738✔
1480
    goto SUB_ENCODE_OVER;
×
1481
  }
1482

1483
  int32_t dataPos = 0;
467,738✔
1484
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
467,738✔
1485
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
467,738✔
1486
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
467,738✔
1487
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
467,738✔
1488

1489
  terrno = TSDB_CODE_SUCCESS;
467,738✔
1490

1491
SUB_ENCODE_OVER:
467,738✔
1492
  taosMemoryFreeClear(buf);
467,738✔
1493
  if (terrno != TSDB_CODE_SUCCESS) {
467,738✔
1494
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1495
    sdbFreeRaw(pRaw);
×
1496
    return NULL;
×
1497
  }
1498

1499
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
467,738✔
1500
  return pRaw;
467,738✔
1501
}
1502

1503
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
377,648✔
1504
  if (pRaw == NULL) {
377,648✔
1505
    return NULL;
×
1506
  }
1507
  int32_t code = 0;
377,648✔
1508
  int32_t lino = 0;
377,648✔
1509
  terrno = TSDB_CODE_OUT_OF_MEMORY;
377,648✔
1510
  SSdbRow *        pRow = NULL;
377,648✔
1511
  SMqSubscribeObj *pSub = NULL;
377,648✔
1512
  void *           buf = NULL;
377,648✔
1513

1514
  int8_t sver = 0;
377,648✔
1515
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
377,648✔
1516

1517
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
377,648✔
1518
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1519
    goto SUB_DECODE_OVER;
×
1520
  }
1521

1522
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
377,648✔
1523
  if (pRow == NULL) goto SUB_DECODE_OVER;
377,648✔
1524

1525
  pSub = sdbGetRowObj(pRow);
377,648✔
1526
  if (pSub == NULL) goto SUB_DECODE_OVER;
377,648✔
1527

1528
  int32_t dataPos = 0;
377,648✔
1529
  int32_t tlen;
377,324✔
1530
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
377,648✔
1531
  buf = taosMemoryMalloc(tlen);
377,648✔
1532
  if (buf == NULL) goto SUB_DECODE_OVER;
377,648✔
1533
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
377,648✔
1534
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
377,648✔
1535

1536
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
377,648✔
1537
    goto SUB_DECODE_OVER;
×
1538
  }
1539

1540
  // update epset saved in mnode
1541
  if (pSub->unassignedVgs != NULL) {
377,648✔
1542
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
377,648✔
1543
    for (int32_t i = 0; i < size; ++i) {
1,045,288✔
1544
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
667,640✔
1545
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
667,640✔
1546
    }
1547
  }
1548
  if (pSub->consumerHash != NULL) {
377,648✔
1549
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
377,648✔
1550
    while (pIter) {
558,353✔
1551
      SMqConsumerEp *pConsumerEp = pIter;
180,705✔
1552
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
180,705✔
1553
      for (int32_t i = 0; i < size; ++i) {
674,378✔
1554
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
493,673✔
1555
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
493,673✔
1556
      }
1557
      pIter = taosHashIterate(pSub->consumerHash, pIter);
180,705✔
1558
    }
1559
  }
1560

1561
  terrno = TSDB_CODE_SUCCESS;
377,648✔
1562

1563
SUB_DECODE_OVER:
377,648✔
1564
  taosMemoryFreeClear(buf);
377,648✔
1565
  if (terrno != TSDB_CODE_SUCCESS) {
377,648✔
1566
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1567
    taosMemoryFreeClear(pRow);
×
1568
    return NULL;
×
1569
  }
1570

1571
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
377,648✔
1572
  return pRow;
377,648✔
1573
}
1574

1575
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
142,039✔
1576
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
142,039✔
1577
  return 0;
142,039✔
1578
}
1579

1580
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
377,648✔
1581
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
377,648✔
1582
  tDeleteSubscribeObj(pSub);
377,648✔
1583
  return 0;
377,648✔
1584
}
1585

1586
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
157,982✔
1587
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
157,982✔
1588
  mDebug("subscribe:%s, perform update action", pOldSub->key);
157,982✔
1589

1590
  taosWLockLatch(&pOldSub->lock);
157,982✔
1591
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
157,982✔
1592
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
157,982✔
1593
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
157,982✔
1594
  taosWUnLockLatch(&pOldSub->lock);
157,982✔
1595

1596
  return 0;
157,982✔
1597
}
1598

1599
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
3,664,851✔
1600
  if (pMnode == NULL || key == NULL || pSub == NULL) {
3,664,851✔
1601
    return TSDB_CODE_INVALID_PARA;
×
1602
  }
1603
  SSdb *pSdb = pMnode->pSdb;
3,664,851✔
1604
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
3,664,851✔
1605
  if (*pSub == NULL) {
3,664,851✔
1606
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
257,024✔
1607
  }
1608
  return 0;
3,407,827✔
1609
}
1610

1611
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
169,781✔
1612
  if (pMnode == NULL || topicName == NULL) return 0;
169,781✔
1613
  int32_t num = 0;
169,781✔
1614
  SSdb *  pSdb = pMnode->pSdb;
169,781✔
1615

1616
  void *           pIter = NULL;
169,781✔
1617
  SMqSubscribeObj *pSub = NULL;
169,781✔
1618
  while (1) {
220,317✔
1619
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
390,098✔
1620
    if (pIter == NULL) break;
390,098✔
1621

1622
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
220,317✔
1623
    char cgroup[TSDB_CGROUP_LEN] = {0};
220,317✔
1624
    taosRLockLatch(&pSub->lock);
220,317✔
1625
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
220,317✔
1626
    taosRUnLockLatch(&pSub->lock);
220,317✔
1627
    if (strcmp(topic, topicName) != 0) {
220,317✔
1628
      sdbRelease(pSdb, pSub);
113,193✔
1629
      continue;
113,193✔
1630
    }
1631

1632
    num++;
107,124✔
1633
    sdbRelease(pSdb, pSub);
107,124✔
1634
  }
1635

1636
  return num;
169,781✔
1637
}
1638

1639
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
3,536,339✔
1640
  if (pMnode == NULL || pSub == NULL) return;
3,536,339✔
1641
  SSdb *pSdb = pMnode->pSdb;
3,407,827✔
1642
  sdbRelease(pSdb, pSub);
3,407,827✔
1643
}
1644

1645
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
77,339✔
1646
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
77,339✔
1647
  int32_t code = 0;
77,339✔
1648
  int32_t lino = 0;
77,339✔
1649
  PRINT_LOG_START
77,339✔
1650
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
77,339✔
1651
  MND_TMQ_NULL_CHECK(pCommitRaw);
77,339✔
1652
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
77,339✔
1653
  if (code != 0) {
77,339✔
1654
    sdbFreeRaw(pCommitRaw);
×
1655
    goto END;
×
1656
  }
1657
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
77,339✔
1658
END:
77,339✔
1659
  PRINT_LOG_END
77,339✔
1660
  return code;
77,339✔
1661
}
1662

1663
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
147,098✔
1664
  int32_t code = 0;
147,098✔
1665
  int32_t lino = 0;
147,098✔
1666
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
147,098✔
1667
  char    cgroup[TSDB_CGROUP_LEN] = {0};
147,098✔
1668
  taosRLockLatch(&pSub->lock);
147,098✔
1669
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
147,098✔
1670
  if (strcmp(topic, topicName) != 0) {
147,098✔
1671
    goto END;
70,361✔
1672
  }
1673

1674
  // iter all vnode to delete handle
1675
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
76,737✔
1676
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1677
    goto END;
×
1678
  }
1679

1680
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
76,737✔
1681
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
76,737✔
1682

1683
END:
147,098✔
1684
  taosRUnLockLatch(&pSub->lock);
147,098✔
1685

1686
  return code;
147,098✔
1687
}
1688

1689
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
101,661✔
1690
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
101,661✔
1691
  SSdb *           pSdb = pMnode->pSdb;
101,661✔
1692
  int32_t          code = 0;
101,661✔
1693
  int32_t          lino = 0;
101,661✔
1694
  void *           pIter = NULL;
101,661✔
1695
  SMqSubscribeObj *pSub = NULL;
101,661✔
1696
  PRINT_LOG_START
101,661✔
1697
  while (1) {
1698
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
248,759✔
1699
    if (pIter == NULL) break;
248,759✔
1700

1701
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
147,098✔
1702
    sdbRelease(pSdb, pSub);
147,098✔
1703
  }
1704

1705
END:
101,661✔
1706
  PRINT_LOG_END
101,661✔
1707
  sdbRelease(pSdb, pSub);
101,661✔
1708
  sdbCancelFetch(pSdb, pIter);
101,661✔
1709

1710
  TAOS_RETURN(code);
101,661✔
1711
}
1712

1713
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
293,037✔
1714
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1715
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
293,037✔
1716
    return TSDB_CODE_INVALID_PARA;
×
1717
  }
1718
  int32_t code = 0;
293,037✔
1719
  int32_t lino = 0;
293,037✔
1720
  PRINT_LOG_START
293,037✔
1721
  int32_t sz = taosArrayGetSize(vgs);
293,037✔
1722
  for (int32_t j = 0; j < sz; j++) {
497,876✔
1723
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
204,839✔
1724
    MND_TMQ_NULL_CHECK(pVgEp);
204,839✔
1725

1726
    SColumnInfoData *pColInfo = NULL;
204,839✔
1727
    int32_t          cols = 0;
204,839✔
1728

1729
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1730
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1731
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
204,839✔
1732

1733
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1734
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1735
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
204,839✔
1736

1737
    // vg id
1738
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1739
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1740
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
204,839✔
1741

1742
    // consumer id
1743
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
204,839✔
1744
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
204,839✔
1745
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
204,839✔
1746

1747
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1748
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1749
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
204,839✔
1750

1751
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
204,839✔
1752
    if (user) STR_TO_VARSTR(userStr, user);
204,839✔
1753
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1754
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1755
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
204,839✔
1756

1757
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
204,839✔
1758
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
204,839✔
1759
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
204,839✔
1760
    MND_TMQ_NULL_CHECK(pColInfo);
204,839✔
1761
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
204,839✔
1762

1763
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
204,839✔
1764
          varDataVal(cgroup), pVgEp->vgId);
1765

1766
    // offset
1767
    OffsetRows *data = NULL;
204,839✔
1768
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
485,908✔
1769
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
281,069✔
1770
      MND_TMQ_NULL_CHECK(tmp);
281,069✔
1771
      if (tmp->vgId != pVgEp->vgId) {
281,069✔
1772
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1773
        continue;
172,684✔
1774
      }
1775
      data = tmp;
108,385✔
1776
    }
1777
    if (data) {
204,839✔
1778
      // vg id
1779
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
108,385✔
1780
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
108,385✔
1781
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
216,770✔
1782
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
108,385✔
1783
      varDataSetLen(buf, strlen(varDataVal(buf)));
108,385✔
1784
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
108,385✔
1785
      MND_TMQ_NULL_CHECK(pColInfo);
108,385✔
1786
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
108,385✔
1787
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
108,385✔
1788
      MND_TMQ_NULL_CHECK(pColInfo);
108,385✔
1789
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
108,385✔
1790
    } else {
1791
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
96,454✔
1792
      MND_TMQ_NULL_CHECK(pColInfo);
96,454✔
1793
      colDataSetNULL(pColInfo, *numOfRows);
96,454✔
1794
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
96,454✔
1795
      MND_TMQ_NULL_CHECK(pColInfo);
96,454✔
1796
      colDataSetNULL(pColInfo, *numOfRows);
96,454✔
1797
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
96,454✔
1798
    }
1799
    (*numOfRows)++;
204,839✔
1800
  }
1801

1802
END:
293,037✔
1803
  PRINT_LOG_END
293,037✔
1804
  return code;
293,037✔
1805
}
1806

1807
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOperUser, bool showAll, SSDataBlock *pBlock,
156,105✔
1808
                           int32_t *numOfRows, int32_t rowsCapacity) {
1809
  int32_t        code = 0;
156,105✔
1810
  int32_t        lino = 0;
156,105✔
1811
  SMnode        *pMnode = pReq->info.node;
156,105✔
1812
  SSdb          *pSdb = pMnode->pSdb;
156,105✔
1813
  SMqConsumerEp *pConsumerEp = NULL;
156,105✔
1814
  SMqTopicObj   *pTopic = NULL;
156,105✔
1815
  void          *pIter = NULL;
156,105✔
1816
  bool           showTopic = false;
156,105✔
1817
  PRINT_LOG_START
156,105✔
1818

1819
  taosRLockLatch(&pSub->lock);
156,105✔
1820
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
156,105✔
1821
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1822
  }
1823

1824
  // topic and cgroup
1825
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
156,105✔
1826
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
156,105✔
1827
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
156,105✔
1828
  varDataSetLen(topic, strlen(varDataVal(topic)));
156,105✔
1829
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
156,105✔
1830

1831
  if (!showAll) {
156,105✔
1832
    (void)mndAcquireTopic(pMnode, topic, &pTopic);
×
1833
    if (pTopic) {
×
1834
      SName name = {0};  // 1.topic1
×
1835
      if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
×
1836
        if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_SUBSCRIPTION_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
×
1837
                                          pTopic->db, name.dbname)) {
×
1838
          showTopic = true;
×
1839
        }
1840
      }
1841
    }
1842
  }
1843

1844
  while (1) {
136,932✔
1845
    pIter = taosHashIterate(pSub->consumerHash, pIter);
293,037✔
1846
    if (pIter == NULL) break;
293,037✔
1847
    pConsumerEp = (SMqConsumerEp *)pIter;
136,932✔
1848

1849
    char           *user = NULL;
136,932✔
1850
    char           *fqdn = NULL;
136,932✔
1851
    bool            subscribeOwner = false;
136,932✔
1852
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
136,932✔
1853
    if (pConsumer != NULL) {
136,932✔
1854
      user = pConsumer->user;
136,932✔
1855
      fqdn = pConsumer->fqdn;
136,932✔
1856
      if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
136,932✔
1857
        subscribeOwner = true;
136,932✔
1858
      }
1859
      sdbRelease(pSdb, pConsumer);
136,932✔
1860
    }
1861
    if (!showAll && !showTopic && !subscribeOwner) {
136,932✔
1862
      continue;
×
1863
    }
1864
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
136,932✔
1865
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1866
  }
1867

1868
  MND_TMQ_RETURN_CHECK(
156,105✔
1869
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1870

1871
  pBlock->info.rows = *numOfRows;
156,105✔
1872

1873
END:
156,105✔
1874
  mndReleaseTopic(pMnode, pTopic);
156,105✔
1875
  taosRUnLockLatch(&pSub->lock);
156,105✔
1876
  taosHashCancelIterate(pSub->consumerHash, pIter);
156,105✔
1877
  PRINT_LOG_END
156,105✔
1878
  return code;
156,105✔
1879
}
1880

1881
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
25,074✔
1882
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
25,074✔
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885
  SMnode          *pMnode = pReq->info.node;
25,074✔
1886
  SSdb            *pSdb = pMnode->pSdb;
25,074✔
1887
  int32_t          numOfRows = 0;
25,074✔
1888
  SMqSubscribeObj *pSub = NULL;
25,074✔
1889
  SUserObj        *pOperUser = NULL;
25,074✔
1890
  int32_t          code = 0;
25,074✔
1891
  int32_t          lino = 0;
25,074✔
1892
  bool             showAll = false;
25,074✔
1893
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
25,074✔
1894

1895
  mInfo("mnd show subscriptions begin");
25,074✔
1896
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
25,074✔
1897
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
25,074✔
1898
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_SUBSCRIPTION_SHOW,
25,074✔
1899
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1900

1901
  while (numOfRows < rowsCapacity) {
181,179✔
1902
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
181,179✔
1903
    if (pShow->pIter == NULL) {
181,179✔
1904
      break;
25,074✔
1905
    }
1906

1907
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pOperUser, showAll, pBlock, &numOfRows, rowsCapacity));
156,105✔
1908

1909
    sdbRelease(pSdb, pSub);
156,105✔
1910
    pSub = NULL;
156,105✔
1911
  }
1912
  mInfo("mnd end show subscriptions");
25,074✔
1913
  pShow->numOfRows += numOfRows;
25,074✔
1914

1915
END:
25,074✔
1916
  sdbCancelFetch(pSdb, pShow->pIter);
25,074✔
1917
  sdbRelease(pSdb, pSub);
25,074✔
1918
  mndReleaseUser(pMnode, pOperUser);
25,074✔
1919

1920
  if (code != 0) {
25,074✔
1921
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1922
    TAOS_RETURN(code);
×
1923
  } else {
1924
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
25,074✔
1925
    return numOfRows;
25,074✔
1926
  }
1927
}
1928

1929
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1930
  if (pMnode == NULL) {
×
1931
    return;
×
1932
  }
1933
  SSdb *pSdb = pMnode->pSdb;
×
1934
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc