• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.68
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_SUBSCRIBE_VER_NUMBER   3
29
#define MND_SUBSCRIBE_RESERVE_SIZE 64
30

31
//#define MND_CONSUMER_LOST_HB_CNT          6
32

33
static int32_t mqRebInExecCnt = 0;
34

35
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
36
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
37
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
39
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
41
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
42
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
44
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
45

46
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
244,747✔
47
  if (pTrans == NULL || pSub == NULL) {
244,747✔
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  int32_t code = 0;
244,747✔
51
  int32_t lino = 0;
244,747✔
52
  PRINT_LOG_START
244,747✔
53
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
244,747✔
54
  MND_TMQ_NULL_CHECK(pCommitRaw);
244,747✔
55
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
244,747✔
56
  if (code != 0) {
244,747✔
57
    sdbFreeRaw(pCommitRaw);
×
58
    goto END;
×
59
  }
60
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
244,747✔
61

62
END:
244,747✔
63
  PRINT_LOG_END
244,747✔
64
  return code;
244,747✔
65
}
66

67
int32_t mndInitSubscribe(SMnode *pMnode) {
401,505✔
68
  SSdbTable table = {
401,505✔
69
      .sdbType = SDB_SUBSCRIBE,
70
      .keyType = SDB_KEY_BINARY,
71
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
72
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
73
      .insertFp = (SdbInsertFp)mndSubActionInsert,
74
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
75
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
76
  };
77

78
  if (pMnode == NULL) {
401,505✔
79
    return TSDB_CODE_INVALID_PARA;
×
80
  }
81
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
401,505✔
82
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
401,505✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
401,505✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
401,505✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
401,505✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
401,505✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
401,505✔
89

90
  return sdbSetTable(pMnode->pSdb, table);
401,505✔
91
}
92

93
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
118,381✔
94
  int32_t code = 0;
118,381✔
95
  SSdb *  pSdb = pMnode->pSdb;
118,381✔
96
  SVgObj *pVgroup = NULL;
118,381✔
97

98
  void *pIter = NULL;
118,381✔
99
  while (1) {
699,631✔
100
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
818,012✔
101
    if (pIter == NULL) {
818,012✔
102
      break;
118,381✔
103
    }
104

105
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
699,631✔
106
      sdbRelease(pSdb, pVgroup);
395,435✔
107
      continue;
395,435✔
108
    }
109

110
    pSub->vgNum++;
304,196✔
111

112
    SMqVgEp pVgEp = {0};
304,196✔
113
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
304,196✔
114
    pVgEp.vgId = pVgroup->vgId;
304,196✔
115
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
608,392✔
116
      code = terrno;
×
117
      sdbRelease(pSdb, pVgroup);
×
118
      sdbCancelFetch(pSdb, pIter);
×
119
      goto END;
×
120
    }
121
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
304,196✔
122
    sdbRelease(pSdb, pVgroup);
304,196✔
123
  }
124

125
END:
118,381✔
126
  return code;
118,381✔
127
}
128

129
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
118,381✔
130
                                     SMqSubscribeObj **pSub) {
131
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
118,381✔
132
    return TSDB_CODE_INVALID_PARA;
×
133
  }
134
  int32_t lino = 0;
118,381✔
135
  int32_t code = 0;
118,381✔
136
  PRINT_LOG_START
118,381✔
137
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
118,381✔
138
  (*pSub)->dbUid = pTopic->dbUid;
118,381✔
139
  (*pSub)->stbUid = pTopic->stbUid;
118,381✔
140
  (*pSub)->subType = pTopic->subType;
118,381✔
141
  (*pSub)->withMeta = pTopic->withMeta;
118,381✔
142

143
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
118,381✔
144

145
END:
118,381✔
146
  PRINT_LOG_END
118,381✔
147
  return code;
118,381✔
148
}
149

150
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
1,590,392✔
151
  if (key == NULL || topic == NULL || cgroup == NULL) {
1,590,392✔
152
    return;
×
153
  }
154
  int32_t i = 0;
1,590,392✔
155
  while (key[i] != TMQ_SEPARATOR_CHAR) {
11,546,371✔
156
    i++;
9,955,979✔
157
  }
158
  (void)memcpy(cgroup, key, i);
1,590,392✔
159
  cgroup[i] = 0;
1,590,392✔
160
  if (fullName) {
1,590,392✔
161
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
1,475,540✔
162
  } else {
163
    while (key[i] != '.') {
344,556✔
164
      i++;
229,704✔
165
    }
166
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
114,852✔
167
  }
168
}
169

170
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
704,970✔
171
                                    const SMqRebOutputVg *pRebVg) {
172
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
704,970✔
173
    return TSDB_CODE_INVALID_PARA;
×
174
  }
175
  SMqRebVgReq  req = {0};
704,970✔
176
  int32_t      code = 0;
704,970✔
177
  int32_t      lino = 0;
704,970✔
178
  SEncoder     encoder = {0};
704,970✔
179
  SMqTopicObj *pTopic = NULL;
704,970✔
180
  void *       buf = NULL;
704,970✔
181

182
  PRINT_LOG_START
704,970✔
183
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
704,970✔
184
  char cgroup[TSDB_CGROUP_LEN] = {0};
704,970✔
185
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
704,970✔
186
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
704,970✔
187
  taosRLockLatch(&pTopic->lock);
704,970✔
188
  req.oldConsumerId = pRebVg->oldConsumerId;
704,970✔
189
  req.newConsumerId = pRebVg->newConsumerId;
704,970✔
190
  req.vgId = pRebVg->pVgEp.vgId;
704,970✔
191
  req.qmsg = pTopic->physicalPlan;
704,970✔
192
  req.schema = pTopic->schema;
704,970✔
193
  req.subType = pSub->subType;
704,970✔
194
  req.withMeta = pSub->withMeta;
704,970✔
195
  req.suid = pSub->stbUid;
704,970✔
196
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
704,970✔
197

198
  int32_t tlen = 0;
704,970✔
199
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
704,970✔
200
  if (code < 0) {
704,970✔
201
    goto END;
×
202
  }
203

204
  tlen += sizeof(SMsgHead);
704,970✔
205
  buf = taosMemoryMalloc(tlen);
704,970✔
206
  MND_TMQ_NULL_CHECK(buf);
704,970✔
207
  SMsgHead *pMsgHead = (SMsgHead *)buf;
704,970✔
208
  pMsgHead->contLen = htonl(tlen);
704,970✔
209
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
704,970✔
210

211
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
704,970✔
212
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
704,970✔
213
  *pBuf = buf;
704,970✔
214
  buf = NULL;
704,970✔
215
  *pLen = tlen;
704,970✔
216

217
END:
704,970✔
218
  PRINT_LOG_END
704,970✔
219
  taosMemoryFree(buf);
704,970✔
220
  if (pTopic != NULL) {
704,970✔
221
    taosRUnLockLatch(&pTopic->lock);
704,970✔
222
  }
223
  mndReleaseTopic(pMnode, pTopic);
704,970✔
224
  tEncoderClear(&encoder);
704,970✔
225
  return code;
704,970✔
226
}
227

228
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
704,970✔
229
                                        const SMqRebOutputVg *pRebVg) {
230
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
704,970✔
231
    return TSDB_CODE_INVALID_PARA;
×
232
  }
233
  int32_t code = 0;
704,970✔
234
  int32_t lino = 0;
704,970✔
235
  void *  buf = NULL;
704,970✔
236
  PRINT_LOG_START
704,970✔
237
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
704,970✔
238
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
239
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
240
    goto END;
×
241
  }
242

243
  int32_t tlen = 0;
704,970✔
244
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
704,970✔
245
  int32_t vgId = pRebVg->pVgEp.vgId;
704,970✔
246
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
704,970✔
247
  if (pVgObj == NULL) {
704,970✔
248
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
249
    goto END;
×
250
  }
251

252
  STransAction action = {0};
704,970✔
253
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
704,970✔
254
  action.pCont = buf;
704,970✔
255
  buf = NULL;
704,970✔
256
  action.contLen = tlen;
704,970✔
257
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
704,970✔
258

259
  mndReleaseVgroup(pMnode, pVgObj);
704,970✔
260
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
704,970✔
261

262
END:
704,970✔
263
  PRINT_LOG_END
704,970✔
264
  taosMemoryFree(buf);
704,970✔
265
  return code;
704,970✔
266
}
267

268
static void freeRebalanceItem(void *param) {
314,707✔
269
  if (param == NULL) return;
314,707✔
270
  SMqRebInfo *pInfo = param;
314,707✔
271
  taosArrayDestroy(pInfo->newConsumers);
314,707✔
272
  taosArrayDestroy(pInfo->removedConsumers);
314,707✔
273
}
274

275
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
319,512✔
276
  if (pHash == NULL || key == NULL) {
319,512✔
277
    return TSDB_CODE_INVALID_PARA;
×
278
  }
279
  int32_t code = 0;
319,512✔
280
  int32_t lino = 0;
319,512✔
281
  PRINT_LOG_START
319,512✔
282
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
319,512✔
283
  if (pRebInfo == NULL) {
319,512✔
284
    pRebInfo = tNewSMqRebSubscribe(key);
314,707✔
285
    if (pRebInfo == NULL) {
314,707✔
286
      code = terrno;
×
287
      goto END;
×
288
    }
289
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
314,707✔
290
    if (code != 0) {
314,707✔
291
      freeRebalanceItem(pRebInfo);
×
292
      taosMemoryFreeClear(pRebInfo);
×
293
      goto END;
×
294
    }
295
    taosMemoryFreeClear(pRebInfo);
314,707✔
296

297
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
314,707✔
298
    MND_TMQ_NULL_CHECK(pRebInfo);
314,707✔
299
  }
300
  if (pReb) {
319,512✔
301
    *pReb = pRebInfo;
252,336✔
302
  }
303

304
END:
67,176✔
305
  PRINT_LOG_END
319,512✔
306
  return code;
319,512✔
307
}
308

309
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
840,005✔
310
  if (vgs == NULL || pHash == NULL || key == NULL) {
840,005✔
311
    return TSDB_CODE_INVALID_PARA;
×
312
  }
313
  int32_t code = 0;
840,005✔
314
  int32_t lino = 0;
840,005✔
315
  PRINT_LOG_START
840,005✔
316
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
840,005✔
317
  MND_TMQ_NULL_CHECK(pVgEp);
840,005✔
318
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
840,005✔
319
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
840,005✔
320
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
840,005✔
321

322
END:
839,693✔
323
  PRINT_LOG_END
840,005✔
324
  return code;
840,005✔
325
}
326

327
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
314,707✔
328
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
314,707✔
329
    return TSDB_CODE_INVALID_PARA;
×
330
  }
331
  int32_t code = 0;
314,707✔
332
  int32_t lino = 0;
314,707✔
333
  PRINT_LOG_START
314,707✔
334
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
314,707✔
335
  int32_t actualRemoved = 0;
314,707✔
336
  for (int32_t i = 0; i < numOfRemoved; i++) {
426,039✔
337
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
111,332✔
338
    MND_TMQ_NULL_CHECK(consumerId);
111,332✔
339
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
111,332✔
340
    if (pConsumerEp == NULL) {
111,332✔
341
      continue;
×
342
    }
343

344
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
111,332✔
345
    for (int32_t j = 0; j < consumerVgNum; j++) {
448,942✔
346
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
337,610✔
347
    }
348

349
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
111,332✔
350
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
222,664✔
351
    actualRemoved++;
111,332✔
352
  }
353

354
  if (numOfRemoved != actualRemoved) {
314,707✔
355
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
356
           numOfRemoved, actualRemoved);
357
  } else {
358
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
314,707✔
359
  }
360
END:
×
361
  PRINT_LOG_END
314,707✔
362
  return code;
314,707✔
363
}
364

365
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
314,707✔
366
  if (pOutput == NULL || pInput == NULL) {
314,707✔
367
    return TSDB_CODE_INVALID_PARA;
×
368
  }
369
  int32_t code = 0;
314,707✔
370
  int32_t lino = 0;
314,707✔
371
  PRINT_LOG_START
314,707✔
372
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
314,707✔
373

374
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
455,711✔
375
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
141,004✔
376
    MND_TMQ_NULL_CHECK(consumerId);
141,004✔
377
    SMqConsumerEp newConsumerEp = {0};
141,004✔
378
    newConsumerEp.consumerId = *consumerId;
141,004✔
379
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
141,004✔
380
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
141,004✔
381
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
141,004✔
382
        0) {
383
      freeSMqConsumerEp(&newConsumerEp);
×
384
      code = terrno;
×
385
      goto END;
×
386
    }
387
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
282,008✔
388
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
141,004✔
389
  }
390
END:
314,707✔
391
  PRINT_LOG_END
314,707✔
392
  return code;
314,707✔
393
}
394

395
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
314,707✔
396
  if (pOutput == NULL || pHash == NULL) {
314,707✔
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399
  int32_t code = 0;
314,707✔
400
  int32_t lino = 0;
314,707✔
401
  PRINT_LOG_START
314,707✔
402
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
314,707✔
403
  for (int32_t i = 0; i < numOfVgroups; i++) {
815,252✔
404
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
500,545✔
405
  }
406
END:
314,707✔
407
  PRINT_LOG_END
314,707✔
408
  return code;
314,707✔
409
}
410

411
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
314,707✔
412
                                        int32_t remainderVgCnt) {
413
  if (pOutput == NULL || pHash == NULL) {
314,707✔
414
    return TSDB_CODE_INVALID_PARA;
×
415
  }
416
  int32_t code = 0;
314,707✔
417
  int32_t lino = 0;
314,707✔
418
  int32_t cnt = 0;
314,707✔
419
  void *  pIter = NULL;
314,707✔
420
  PRINT_LOG_START
314,707✔
421

422
  while (1) {
72,459✔
423
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
387,166✔
424
    if (pIter == NULL) {
387,166✔
425
      break;
314,707✔
426
    }
427

428
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
72,459✔
429
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
72,459✔
430

431
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
144,918✔
432
    if (consumerVgNum > minVgCnt) {
72,459✔
433
      if (cnt < remainderVgCnt) {
1,540✔
434
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
615✔
435
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
436
        }
437
        cnt++;
615✔
438
      } else {
439
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
2,775✔
440
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
1,850✔
441
        }
442
      }
443
    }
444
  }
445
END:
314,707✔
446
  PRINT_LOG_END
314,707✔
447
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
314,707✔
448
  return code;
314,707✔
449
}
450

451
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
314,707✔
452
  if (pMnode == NULL || pOutput == NULL) {
314,707✔
453
    return TSDB_CODE_INVALID_PARA;
×
454
  }
455
  int32_t code = 0;
314,707✔
456
  int32_t lino = 0;
314,707✔
457
  int32_t totalVgNum = 0;
314,707✔
458
  SVgObj *pVgroup = NULL;
314,707✔
459
  void *  pIter = NULL;
314,707✔
460
  void *  pIterHash = NULL;
314,707✔
461
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
314,707✔
462
  MND_TMQ_NULL_CHECK(newVgs);
314,707✔
463
  while (1) {
1,803,330✔
464
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
2,118,037✔
465
    if (pIter == NULL) {
2,118,037✔
466
      break;
314,707✔
467
    }
468
    if (pVgroup->mountVgId) {
1,803,330✔
469
      sdbRelease(pMnode->pSdb, pVgroup);
×
470
      continue;
×
471
    }
472

473
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
1,803,330✔
474
      sdbRelease(pMnode->pSdb, pVgroup);
956,450✔
475
      continue;
956,450✔
476
    }
477

478
    totalVgNum++;
846,880✔
479
    SMqVgEp pVgEp = {0};
846,880✔
480
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
846,880✔
481
    pVgEp.vgId = pVgroup->vgId;
846,880✔
482
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
846,880✔
483
    sdbRelease(pMnode->pSdb, pVgroup);
846,880✔
484
  }
485

486
  while (1) {
183,791✔
487
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
498,498✔
488
    if (pIterHash == NULL) break;
498,498✔
489
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
183,791✔
490
    int32_t        j = 0;
183,791✔
491
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
597,302✔
492
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
413,511✔
493
      MND_TMQ_NULL_CHECK(pVgEpTmp);
413,511✔
494
      bool find = false;
413,511✔
495
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
556,591✔
496
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
489,415✔
497
        MND_TMQ_NULL_CHECK(pnewVgEp);
489,415✔
498
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
489,415✔
499
          taosArrayRemove(newVgs, k);
346,335✔
500
          find = true;
346,335✔
501
          break;
346,335✔
502
        }
503
      }
504
      if (!find) {
413,511✔
505
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
67,176✔
506
        taosArrayRemove(pConsumerEp->vgs, j);
67,176✔
507
        continue;
67,176✔
508
      }
509
      j++;
346,335✔
510
    }
511
  }
512

513
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
314,707✔
514
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
67,176✔
515
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
67,176✔
516
  }
517

518
END:
314,707✔
519
  sdbRelease(pMnode->pSdb, pVgroup);
314,707✔
520
  sdbCancelFetch(pMnode->pSdb, pIter);
314,707✔
521
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
314,707✔
522
  taosArrayDestroy(newVgs);
314,707✔
523
  if (code != 0) {
314,707✔
524
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
525
    return code;
×
526
  } else {
527
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
314,707✔
528
    return totalVgNum;
314,707✔
529
  }
530
}
531

532
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
314,707✔
533
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
314,707✔
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  void *           pIter = NULL;
314,707✔
537
  SMqSubscribeObj *pSub = NULL;
314,707✔
538
  int32_t          lino = 0;
314,707✔
539
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
314,707✔
540
  if (code != 0) {
314,707✔
541
    return 0;
118,381✔
542
  }
543
  taosRLockLatch(&pSub->lock);
196,326✔
544
  PRINT_LOG_START
196,326✔
545
  if (pOutput->pSub->offsetRows == NULL) {
196,326✔
546
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
157,346✔
547
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
157,346✔
548
  }
549
  while (1) {
183,791✔
550
    pIter = taosHashIterate(pSub->consumerHash, pIter);
380,117✔
551
    if (pIter == NULL) break;
380,117✔
552
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
183,791✔
553
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
183,791✔
554

555
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
587,509✔
556
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
403,718✔
557
      MND_TMQ_NULL_CHECK(d1);
403,718✔
558
      bool jump = false;
403,718✔
559
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
540,275✔
560
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
141,278✔
561
        MND_TMQ_NULL_CHECK(pVgEp);
141,278✔
562
        if (pVgEp->vgId == d1->vgId) {
141,278✔
563
          jump = true;
4,721✔
564
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
4,721✔
565
                pConsumerEp->consumerId, pVgEp->vgId);
566
          break;
4,721✔
567
        }
568
      }
569
      if (jump) continue;
403,718✔
570
      bool find = false;
398,997✔
571
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
972,224✔
572
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
635,383✔
573
        MND_TMQ_NULL_CHECK(d2);
635,383✔
574
        if (d1->vgId == d2->vgId) {
635,383✔
575
          d2->rows += d1->rows;
62,156✔
576
          d2->offset = d1->offset;
62,156✔
577
          d2->ever = d1->ever;
62,156✔
578
          find = true;
62,156✔
579
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
62,156✔
580
          break;
62,156✔
581
        }
582
      }
583
      if (!find) {
398,997✔
584
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
673,682✔
585
      }
586
    }
587
  }
588

589
END:
196,326✔
590
  taosRUnLockLatch(&pSub->lock);
196,326✔
591
  taosHashCancelIterate(pSub->consumerHash, pIter);
196,326✔
592
  mndReleaseSubscribe(pMnode, pSub);
196,326✔
593
  PRINT_LOG_END
196,326✔
594
  return code;
196,326✔
595
}
596

597
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
314,707✔
598
  if (pOutput == NULL) return;
314,707✔
599
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
314,707✔
600
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
1,154,712✔
601
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
840,005✔
602
    if (pOutputRebVg == NULL) continue;
840,005✔
603
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
840,005✔
604
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
605
  }
606

607
  void *pIter = NULL;
314,707✔
608
  while (1) {
213,463✔
609
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
528,170✔
610
    if (pIter == NULL) break;
528,170✔
611
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
213,463✔
612
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
213,463✔
613
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
213,463✔
614
          pConsumerEp->consumerId, sz);
615
    for (int32_t i = 0; i < sz; i++) {
728,681✔
616
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
515,218✔
617
      if (pVgEp == NULL) continue;
515,218✔
618
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
515,218✔
619
            pConsumerEp->consumerId);
620
    }
621
  }
622
}
623

624
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
314,707✔
625
                           int32_t *remainderVgCnt) {
626
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
314,707✔
627
    return;
×
628
  }
629
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
314,707✔
630
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
314,707✔
631
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
314,707✔
632

633
  // calc num
634
  if (numOfFinal != 0) {
314,707✔
635
    *minVgCnt = totalVgNum / numOfFinal;
208,419✔
636
    *remainderVgCnt = totalVgNum % numOfFinal;
208,419✔
637
  } else {
638
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
106,288✔
639
  }
640
  mInfo(
314,707✔
641
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
642
      "remainderVg:%d",
643
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
644
}
645

646
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
314,707✔
647
  if (pOutput == NULL || pHash == NULL) {
314,707✔
648
    return TSDB_CODE_INVALID_PARA;
×
649
  }
650
  SMqRebOutputVg *pRebVg = NULL;
314,707✔
651
  void *          pAssignIter = NULL;
314,707✔
652
  void *          pIter = NULL;
314,707✔
653
  int32_t         code = 0;
314,707✔
654
  int32_t         lino = 0;
314,707✔
655
  PRINT_LOG_START
314,707✔
656

657
  while (1) {
213,463✔
658
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
528,170✔
659
    if (pIter == NULL) {
528,170✔
660
      break;
314,707✔
661
    }
662
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
213,463✔
663
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
720,883✔
664
      pAssignIter = taosHashIterate(pHash, pAssignIter);
507,420✔
665
      if (pAssignIter == NULL) {
507,420✔
666
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
667
        break;
×
668
      }
669

670
      pRebVg = (SMqRebOutputVg *)pAssignIter;
507,420✔
671
      pRebVg->newConsumerId = pConsumerEp->consumerId;
507,420✔
672
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,014,840✔
673
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
507,420✔
674
            pConsumerEp->consumerId);
675
    }
676
  }
677

678
  while (1) {
1,227✔
679
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
315,934✔
680
    if (pIter == NULL) {
315,934✔
681
      break;
106,288✔
682
    }
683
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
209,646✔
684
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
209,646✔
685
      pAssignIter = taosHashIterate(pHash, pAssignIter);
209,342✔
686
      if (pAssignIter == NULL) {
209,342✔
687
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
208,419✔
688
        break;
208,419✔
689
      }
690

691
      pRebVg = (SMqRebOutputVg *)pAssignIter;
923✔
692
      pRebVg->newConsumerId = pConsumerEp->consumerId;
923✔
693
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
1,846✔
694
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
923✔
695
            pConsumerEp->consumerId);
696
    }
697
  }
698

699
  if (pAssignIter != NULL) {
314,707✔
700
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
701
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
702
    goto END;
×
703
  }
704
  while (1) {
840,005✔
705
    pAssignIter = taosHashIterate(pHash, pAssignIter);
1,154,712✔
706
    if (pAssignIter == NULL) {
1,154,712✔
707
      break;
314,707✔
708
    }
709

710
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
840,005✔
711
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
1,680,010✔
712
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
840,005✔
713
      MND_TMQ_NULL_CHECK(
663,324✔
714
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
715
    }
716
  }
717

718
END:
314,707✔
719
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
314,707✔
720
  taosHashCancelIterate(pHash, pAssignIter);
314,707✔
721
  PRINT_LOG_END
314,707✔
722
  return code;
314,707✔
723
}
724

725
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
314,707✔
726
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
314,707✔
727
    return TSDB_CODE_INVALID_PARA;
×
728
  }
729
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
314,707✔
730
  if (totalVgNum < 0) {
314,707✔
731
    return totalVgNum;
×
732
  }
733
  const char *pSubKey = pOutput->pSub->key;
314,707✔
734
  int32_t     minVgCnt = 0;
314,707✔
735
  int32_t     remainderVgCnt = 0;
314,707✔
736
  int32_t     code = 0;
314,707✔
737
  int32_t     lino = 0;
314,707✔
738
  PRINT_LOG_START
314,707✔
739
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
314,707✔
740
  MND_TMQ_NULL_CHECK(pHash);
314,707✔
741
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
314,707✔
742
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
314,707✔
743
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
314,707✔
744
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
314,707✔
745
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
314,707✔
746
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
314,707✔
747
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
314,707✔
748
  printRebalanceLog(pOutput);
314,707✔
749

750
END:
314,707✔
751
  taosHashCleanup(pHash);
314,707✔
752
  PRINT_LOG_END
314,707✔
753
  return code;
314,707✔
754
}
755

756
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
734,241✔
757
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
734,241✔
758
    return TSDB_CODE_INVALID_PARA;
×
759
  }
760
  int32_t code = 0;
734,241✔
761
  int32_t lino = 0;
734,241✔
762
  PRINT_LOG_START
734,241✔
763
  SMqConsumerObj *pConsumerNew = NULL;
734,241✔
764
  int32_t         consumerNum = taosArrayGetSize(consumers);
734,241✔
765
  for (int32_t i = 0; i < consumerNum; i++) {
989,076✔
766
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
254,835✔
767
    MND_TMQ_NULL_CHECK(consumerId);
254,835✔
768
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
254,835✔
769
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
254,835✔
770
    tDeleteSMqConsumerObj(pConsumerNew);
254,835✔
771
  }
772
  pConsumerNew = NULL;
734,241✔
773

774
END:
734,241✔
775
  PRINT_LOG_END
734,241✔
776
  tDeleteSMqConsumerObj(pConsumerNew);
734,241✔
777
  return code;
734,241✔
778
}
779

780
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
244,747✔
781
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
244,747✔
782
    return TSDB_CODE_INVALID_PARA;
×
783
  }
784
  int32_t code = 0;
244,747✔
785
  int32_t lino = 0;
244,747✔
786
  PRINT_LOG_START
244,747✔
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
244,747✔
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
244,747✔
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
244,747✔
790
END:
244,747✔
791
  PRINT_LOG_END
244,747✔
792
  return code;
244,747✔
793
}
794

795
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
314,707✔
796
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
314,707✔
797
    return TSDB_CODE_INVALID_PARA;
×
798
  }
799
  int32_t code = 0;
314,707✔
800
  int32_t lino = 0;
314,707✔
801
  STrans *pTrans = NULL;
314,707✔
802
  PRINT_LOG_START
314,707✔
803

804
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
314,707✔
805
  char cgroup[TSDB_CGROUP_LEN] = {0};
314,707✔
806
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
314,707✔
807

808
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
314,707✔
809
  if (pTrans == NULL) {
314,707✔
810
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
811
    if (terrno != 0) code = terrno;
×
812
    goto END;
×
813
  }
814

815
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
314,707✔
816
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
314,707✔
817

818
  // 1. redo action: action to all vg
819
  const SArray *rebVgs = pOutput->rebVgs;
244,747✔
820
  int32_t       vgNum = taosArrayGetSize(rebVgs);
244,747✔
821
  for (int32_t i = 0; i < vgNum; i++) {
949,717✔
822
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
704,970✔
823
    MND_TMQ_NULL_CHECK(pRebVg);
704,970✔
824
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
704,970✔
825
  }
826

827
  // 2. commit log: subscribe and vg assignment
828
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
244,747✔
829

830
  // 3. commit log: consumer to update status and epoch
831
  if (!pOutput->isReload) {
244,747✔
832
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
244,747✔
833
  }
834

835
  // 4. set cb
836
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
244,747✔
837

838
  // 5. execution
839
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
244,747✔
840

841
END:
314,707✔
842
  mndTransDrop(pTrans);
314,707✔
843
  PRINT_LOG_END
314,707✔
844
  TAOS_RETURN(code);
314,707✔
845
}
846

847
// type = 0 remove  type = 1 add
848
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
491,022✔
849
  if (rebSubHash == NULL || topicList == NULL) {
491,022✔
850
    return TSDB_CODE_INVALID_PARA;
×
851
  }
852
  int32_t code = 0;
491,022✔
853
  int32_t lino = 0;
491,022✔
854
  PRINT_LOG_START
491,022✔
855
  int32_t topicNum = taosArrayGetSize(topicList);
491,022✔
856
  for (int32_t i = 0; i < topicNum; i++) {
743,358✔
857
    char *removedTopic = taosArrayGetP(topicList, i);
252,336✔
858
    MND_TMQ_NULL_CHECK(removedTopic);
252,336✔
859
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
252,336✔
860
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
252,336✔
861
    SMqRebInfo *pRebSub = NULL;
252,336✔
862
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
252,336✔
863
    if (type == 0)
252,336✔
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
222,664✔
865
    else if (type == 1)
141,004✔
866
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
282,008✔
867
  }
868

869
END:
491,022✔
870
  PRINT_LOG_END
491,022✔
871
  return code;
491,022✔
872
}
873

874
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
1,536,017✔
875
  SMqSubscribeObj *pSub = NULL;
1,536,017✔
876
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,536,017✔
877
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,536,017✔
878
  int32_t code = 0;
1,536,017✔
879
  int32_t lino = 0;
1,536,017✔
880

881
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
1,536,017✔
882
  taosRLockLatch(&pSub->lock);
1,536,017✔
883
  // iterate all vg assigned to the consumer of that topic
884
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,536,017✔
885
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,536,017✔
886
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,536,017✔
887
  for (int32_t j = 0; j < vgNum; j++) {
4,807,935✔
888
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
3,271,918✔
889
    if (pVgEp == NULL) {
3,271,918✔
890
      continue;
×
891
    }
892
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
3,271,918✔
893
    if (!pVgroup) {
3,271,918✔
894
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
67,176✔
895
      if (code != 0) {
67,176✔
896
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
897
      } else {
898
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
67,176✔
899
      }
900
    }
901
    mndReleaseVgroup(pMnode, pVgroup);
3,271,918✔
902
  }
903

904
END:
1,536,017✔
905
  if (pSub != NULL) {
1,536,017✔
906
    taosRUnLockLatch(&pSub->lock);
1,536,017✔
907
  }
908
  mndReleaseSubscribe(pMnode, pSub);
1,536,017✔
909
}
1,536,017✔
910

911
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
1,495,989✔
912
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
1,495,989✔
913
    return;
×
914
  }
915
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
1,495,989✔
916
  for (int32_t i = 0; i < newTopicNum; i++) {
3,032,006✔
917
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,536,017✔
918
    if (topic == NULL) {
1,536,017✔
919
      continue;
×
920
    }
921
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
1,536,017✔
922
  }
923
}
924

925
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
1,743,589✔
926
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
3,484,338✔
927
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
1,740,749✔
928
}
929

930
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
1,841,259✔
931
  int32_t code = 0;
1,841,259✔
932
  int32_t lino = 0;
1,841,259✔
933
  PRINT_LOG_START
1,841,259✔
934
  taosRLockLatch(&pConsumer->lock);
1,841,259✔
935

936
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
1,841,259✔
937
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
1,841,259✔
938
  int32_t status = atomic_load_32(&pConsumer->status);
1,841,259✔
939

940
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
1,841,259✔
941
         ", hbstatus:%d, pollStatus:%d",
942
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
943
         hbStatus, pollStatus);
944

945
  if (status == MQ_CONSUMER_STATUS_READY) {
1,841,259✔
946
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
1,597,193✔
947
      MND_TMQ_RETURN_CHECK(
97,670✔
948
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
949
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
1,499,523✔
950
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,534✔
951
            ", hb lost cnt:%d, or long time no poll cnt:%d",
952
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
953
            pConsumer->createTime, hbStatus, pollStatus);
954
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
3,534✔
955
    } else {
956
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
1,495,989✔
957
    }
958
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
244,066✔
959
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
243,744✔
960
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
243,744✔
961
  } else {
962
    MND_TMQ_RETURN_CHECK(
322✔
963
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
964
  }
965

966
END:
322✔
967
  taosRUnLockLatch(&pConsumer->lock);
1,841,259✔
968
  PRINT_LOG_END
1,841,259✔
969
  return code;
1,841,259✔
970
}
971

972
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
16,633,817✔
973
  if (pMsg == NULL || rebSubHash == NULL) {
16,633,817✔
974
    return TSDB_CODE_INVALID_PARA;
×
975
  }
976
  SMnode *        pMnode = pMsg->info.node;
16,633,817✔
977
  SSdb *          pSdb = pMnode->pSdb;
16,633,817✔
978
  SMqConsumerObj *pConsumer = NULL;
16,633,817✔
979
  void *          pIter = NULL;
16,633,817✔
980
  int32_t         code = 0;
16,633,817✔
981
  int32_t         lino = 0;
16,633,817✔
982
  PRINT_LOG_START
16,633,817✔
983

984
  // iterate all consumers, find all modification
985
  while (1) {
986
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
18,475,076✔
987
    if (pIter == NULL) {
18,475,076✔
988
      break;
16,633,817✔
989
    }
990
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
1,841,259✔
991
    mndReleaseConsumer(pMnode, pConsumer);
1,841,259✔
992
  }
993
END:
16,633,817✔
994
  PRINT_LOG_END
16,633,817✔
995
  sdbCancelFetch(pSdb, pIter);
16,633,817✔
996
  mndReleaseConsumer(pMnode, pConsumer);
16,633,817✔
997
  return code;
16,633,817✔
998
}
999

1000
bool mndRebTryStart() {
16,638,402✔
1001
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
16,638,402✔
1002
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
16,638,402✔
1003
}
1004

1005
void mndRebCntInc() {
247,366✔
1006
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
247,366✔
1007
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
247,366✔
1008
}
247,366✔
1009

1010
void mndRebCntDec() {
16,881,183✔
1011
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
16,881,183✔
1012
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
16,881,183✔
1013
}
16,881,183✔
1014

1015
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
629,414✔
1016
  if (rebOutput == NULL) {
629,414✔
1017
    return;
314,707✔
1018
  }
1019
  taosArrayDestroy(rebOutput->newConsumers);
314,707✔
1020
  rebOutput->newConsumers = NULL;
314,707✔
1021
  taosArrayDestroy(rebOutput->modifyConsumers);
314,707✔
1022
  rebOutput->modifyConsumers = NULL;
314,707✔
1023
  taosArrayDestroy(rebOutput->removedConsumers);
314,707✔
1024
  rebOutput->removedConsumers = NULL;
314,707✔
1025
  taosArrayDestroy(rebOutput->rebVgs);
314,707✔
1026
  rebOutput->rebVgs = NULL;
314,707✔
1027
  tDeleteSubscribeObj(rebOutput->pSub);
314,707✔
1028
  taosMemoryFree(rebOutput->pSub);
314,707✔
1029
  rebOutput->pSub = NULL;
314,707✔
1030
}
1031

1032
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
314,707✔
1033
  if (rebOutput == NULL) {
314,707✔
1034
    return TSDB_CODE_INVALID_PARA;
×
1035
  }
1036
  int32_t code = 0;
314,707✔
1037
  int32_t lino = 0;
314,707✔
1038
  PRINT_LOG_START
314,707✔
1039
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
314,707✔
1040
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
314,707✔
1041
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
314,707✔
1042
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
314,707✔
1043
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
314,707✔
1044
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
314,707✔
1045
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
314,707✔
1046
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
314,707✔
1047
  rebOutput = NULL;
314,707✔
1048

1049
END:
314,707✔
1050
  PRINT_LOG_END
314,707✔
1051
  clearRebOutput(rebOutput);
314,707✔
1052
  return code;
314,707✔
1053
}
1054

1055
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
314,707✔
1056
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
314,707✔
1057
    return TSDB_CODE_INVALID_PARA;
×
1058
  }
1059
  const char *     key = rebInput->pRebInfo->key;
314,707✔
1060
  SMqSubscribeObj *pSub = NULL;
314,707✔
1061
  SMqTopicObj *    pTopic = NULL;
314,707✔
1062
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
314,707✔
1063
  int32_t          lino = 0;
314,707✔
1064
  PRINT_LOG_START
314,707✔
1065

1066
  if (code != 0) {
314,707✔
1067
    // split sub key and extract topic
1068
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
118,381✔
1069
    char cgroup[TSDB_CGROUP_LEN] = {0};
118,381✔
1070
    mndSplitSubscribeKey(key, topic, cgroup, true);
118,381✔
1071
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
118,381✔
1072
    taosRLockLatch(&pTopic->lock);
118,381✔
1073
    rebInput->oldConsumerNum = 0;
118,381✔
1074
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
118,381✔
1075
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
118,381✔
1076
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
118,381✔
1077
  } else {
1078
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
196,326✔
1079
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
196,326✔
1080

1081
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
196,326✔
1082
          taosHashGetSize(rebOutput->pSub->consumerHash));
1083
  }
1084

1085
END:
314,395✔
1086
  PRINT_LOG_END
314,707✔
1087
  if (pTopic != NULL) {
314,707✔
1088
    taosRUnLockLatch(&pTopic->lock);
118,381✔
1089
  }
1090
  mndReleaseTopic(pMnode, pTopic);
314,707✔
1091
  mndReleaseSubscribe(pMnode, pSub);
314,707✔
1092
  return code;
314,707✔
1093
}
1094

1095
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1096
  int32_t code = 0;
×
1097
  int32_t lino = 0;
×
1098

1099
  void *pIterConsumer = NULL;
×
1100

1101
  PRINT_LOG_START
×
1102
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1103
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1104

1105
  SMqConsumerEp *pConsumerEp = NULL;
×
1106

1107
  while (1) {
1108
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1109
    if (pIterConsumer == NULL) break;
×
1110
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1111

1112
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1113
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1114
      MND_TMQ_NULL_CHECK(pVgEp);
×
1115
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1116
      MND_TMQ_NULL_CHECK(vg);
×
1117
      vg->pVgEp = *pVgEp;
×
1118
      vg->oldConsumerId = -1;
×
1119
      vg->newConsumerId = pConsumerEp->consumerId;
×
1120
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1121
             vg->newConsumerId);
1122
    }
1123
  }
1124
END:
×
1125
  PRINT_LOG_END
×
1126
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1127
  return code;
×
1128
}
1129

1130
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1131
  int32_t code = 0;
×
1132
  int32_t lino = 0;
×
1133

1134
  SMnode *        pMnode = pMsg->info.node;
×
1135
  SMqRebOutputObj rebOutput = {0};
×
1136

1137
  PRINT_LOG_START
×
1138
  taosRLockLatch(&pSub->lock);
×
1139

1140
  // topic and cgroup
1141
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1142
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1143
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1144
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1145
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1146
    goto END;
×
1147
  }
1148

1149
  rebOutput.pSub = pSub;
×
1150
  rebOutput.isReload = true;
×
1151
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1152
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1153
  if (code != 0) {
×
1154
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1155
  }
1156

1157
END:
×
1158
  taosRUnLockLatch(&pSub->lock);
×
1159
  rebOutput.pSub = NULL;  // avoid double free
×
1160
  clearRebOutput(&rebOutput);
×
1161
  PRINT_LOG_END
×
1162
  return code;
×
1163
}
1164

1165
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1166
  SMnode *pMnode = pMsg->info.node;
×
1167

1168
  SSdb *           pSdb = pMnode->pSdb;
×
1169
  SMqSubscribeObj *pSub = NULL;
×
1170
  int32_t          code = 0;
×
1171
  int32_t          lino = 0;
×
1172

1173
  PRINT_LOG_START
×
1174
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1175
  void *pIter = NULL;
×
1176
  while (1) {
1177
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1178
    if (pIter == NULL) {
×
1179
      break;
×
1180
    }
1181

1182
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1183
    sdbRelease(pSdb, pSub);
×
1184
  }
1185
  taosHashClear(topicsToReload);
×
1186
END:
×
1187
  sdbCancelFetch(pSdb, pIter);
×
1188
  sdbRelease(pSdb, pSub);
×
1189
  PRINT_LOG_END
×
1190

1191
  return code;
×
1192
}
1193

1194
static int32_t normalRebalance(SRpcMsg *pMsg) {
16,633,817✔
1195
  int     code = 0;
16,633,817✔
1196
  int32_t lino = 0;
16,633,817✔
1197

1198
  void *  pIter = NULL;
16,633,817✔
1199
  SMnode *pMnode = pMsg->info.node;
16,633,817✔
1200

1201
  PRINT_LOG_START
16,633,817✔
1202
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
16,633,817✔
1203
  MND_TMQ_NULL_CHECK(rebSubHash);
16,633,817✔
1204

1205
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
16,633,817✔
1206

1207
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
16,633,817✔
1208
  if (taosHashGetSize(rebSubHash) > 0) {
16,633,817✔
1209
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
274,911✔
1210
  }
1211

1212
  while (1) {
314,707✔
1213
    pIter = taosHashIterate(rebSubHash, pIter);
16,948,524✔
1214
    if (pIter == NULL) {
16,948,524✔
1215
      break;
16,633,817✔
1216
    }
1217

1218
    SMqRebInputObj  rebInput = {0};
314,707✔
1219
    SMqRebOutputObj rebOutput = {0};
314,707✔
1220
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
314,707✔
1221
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
314,707✔
1222
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
314,707✔
1223
    if (code != 0) {
314,707✔
1224
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1225
    }
1226

1227
    if (code == 0) {
314,707✔
1228
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
314,707✔
1229
      if (code != 0) {
314,707✔
1230
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1231
      }
1232
    }
1233

1234
    if (code == 0) {
314,707✔
1235
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
314,707✔
1236
      if (code != 0) {
314,707✔
1237
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
69,960✔
1238
      }
1239
    }
1240

1241
    clearRebOutput(&rebOutput);
314,707✔
1242
  }
1243

1244
  if (taosHashGetSize(rebSubHash) > 0) {
16,633,817✔
1245
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
274,911✔
1246
  }
1247

1248
END:
16,358,906✔
1249
  PRINT_LOG_END
16,633,817✔
1250
  taosHashCancelIterate(rebSubHash, pIter);
16,633,817✔
1251
  taosHashCleanup(rebSubHash);
16,633,817✔
1252
  return code;
16,633,817✔
1253
}
1254

1255
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
16,638,402✔
1256
  if (pMsg == NULL) {
16,638,402✔
1257
    return TSDB_CODE_INVALID_PARA;
×
1258
  }
1259
  int     code = 0;
16,638,402✔
1260
  int32_t lino = 0;
16,638,402✔
1261

1262
  void *  pIter = NULL;
16,638,402✔
1263
  SMnode *pMnode = pMsg->info.node;
16,638,402✔
1264
  PRINT_LOG_START;
16,638,402✔
1265
  if (!mndRebTryStart()) {
16,638,402✔
1266
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
4,585✔
1267
    goto END;
4,585✔
1268
  }
1269

1270
  if (taosHashGetSize(topicsToReload) > 0) {
16,633,817✔
1271
    code = reloadRebalance(pMsg);
×
1272
  } else {
1273
    code = normalRebalance(pMsg);
16,633,817✔
1274
  }
1275

1276
  mndRebCntDec();
16,633,817✔
1277

1278
END:
16,638,402✔
1279
  PRINT_LOG_END
16,638,402✔
1280
  TAOS_RETURN(code);
16,638,402✔
1281
}
1282

1283
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
73,236✔
1284
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
73,236✔
1285
    return TSDB_CODE_INVALID_PARA;
×
1286
  }
1287
  void *  pIter = NULL;
73,236✔
1288
  SVgObj *pVgObj = NULL;
73,236✔
1289
  int32_t code = 0;
73,236✔
1290
  int32_t lino = 0;
73,236✔
1291
  PRINT_LOG_START
73,236✔
1292
  while (1) {
537,640✔
1293
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
610,876✔
1294
    if (pIter == NULL) {
610,876✔
1295
      break;
73,236✔
1296
    }
1297
    if (pVgObj->mountVgId) {
537,640✔
1298
      sdbRelease(pMnode->pSdb, pVgObj);
×
1299
      continue;
×
1300
    }
1301

1302
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
537,640✔
1303
      sdbRelease(pMnode->pSdb, pVgObj);
294,284✔
1304
      continue;
294,284✔
1305
    }
1306
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
243,356✔
1307
    MND_TMQ_NULL_CHECK(pReq);
243,356✔
1308
    pReq->head.vgId = htonl(pVgObj->vgId);
243,356✔
1309
    pReq->vgId = pVgObj->vgId;
243,356✔
1310
    pReq->consumerId = -1;
243,356✔
1311
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
243,356✔
1312

1313
    STransAction action = {0};
243,356✔
1314
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
243,356✔
1315
    action.pCont = pReq;
243,356✔
1316
    action.contLen = sizeof(SMqVDeleteReq);
243,356✔
1317
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
243,356✔
1318
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
243,356✔
1319

1320
    sdbRelease(pMnode->pSdb, pVgObj);
243,356✔
1321
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
243,356✔
1322
  }
1323

1324
END:
73,236✔
1325
  PRINT_LOG_END
73,236✔
1326
  sdbRelease(pMnode->pSdb, pVgObj);
73,236✔
1327
  sdbCancelFetch(pMnode->pSdb, pIter);
73,236✔
1328
  return code;
73,236✔
1329
}
1330

1331
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
592✔
1332
                                   char *cgroup) {
1333
  int32_t         code = 0;
592✔
1334
  int32_t         lino = 0;
592✔
1335
  SMqConsumerObj *pConsumerNew = NULL;
592✔
1336

1337
  taosRLockLatch(&pConsumer->lock);
592✔
1338

1339
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
592✔
1340
    goto END;
×
1341
  }
1342

1343
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
592✔
1344
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
592✔
1345
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
592✔
1346
  if (found1 || found2 || found3) {
592✔
1347
    if (deleteConsumer) {
322✔
1348
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
322✔
1349
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
322✔
1350
      tDeleteSMqConsumerObj(pConsumerNew);
322✔
1351
      pConsumerNew = NULL;
322✔
1352
    } else {
1353
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1354
             pConsumer->consumerId, pConsumer->cgroup);
1355
      code = TSDB_CODE_MND_CGROUP_USED;
×
1356
      goto END;
×
1357
    }
1358
  }
1359

1360
END:
592✔
1361
  tDeleteSMqConsumerObj(pConsumerNew);
592✔
1362
  taosRUnLockLatch(&pConsumer->lock);
592✔
1363
  return code;
592✔
1364
}
1365

1366
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
592✔
1367
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
592✔
1368
    return TSDB_CODE_INVALID_PARA;
×
1369
  }
1370
  void *          pIter = NULL;
592✔
1371
  SMqConsumerObj *pConsumer = NULL;
592✔
1372
  int             code = 0;
592✔
1373
  int32_t         lino = 0;
592✔
1374
  PRINT_LOG_START
592✔
1375
  while (1) {
1376
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
1,184✔
1377
    if (pIter == NULL) {
1,184✔
1378
      break;
592✔
1379
    }
1380
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
592✔
1381
    sdbRelease(pMnode->pSdb, pConsumer);
592✔
1382
  }
1383

1384
END:
592✔
1385
  sdbRelease(pMnode->pSdb, pConsumer);
592✔
1386
  sdbCancelFetch(pMnode->pSdb, pIter);
592✔
1387
  return code;
592✔
1388
}
1389

1390
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
592✔
1391
  int32_t code = 0;
592✔
1392
  int32_t lino = 0;
592✔
1393
  STrans *pTrans = NULL;
592✔
1394
  SMnode *pMnode = pMsg->info.node;
592✔
1395
  PRINT_LOG_START
592✔
1396
  taosRLockLatch(&pSub->lock);
592✔
1397
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
592✔
1398
    code = TSDB_CODE_MND_CGROUP_USED;
×
1399
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1400
    goto END;
×
1401
  }
1402

1403
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
592✔
1404
  MND_TMQ_NULL_CHECK(pTrans);
592✔
1405
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
592✔
1406
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
592✔
1407
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
592✔
1408
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
592✔
1409
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
592✔
1410
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
592✔
1411
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
592✔
1412

1413
END:
592✔
1414
  taosRUnLockLatch(&pSub->lock);
592✔
1415
  mndTransDrop(pTrans);
592✔
1416
  PRINT_LOG_END
592✔
1417
  return code;
592✔
1418
}
1419

1420
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
592✔
1421
  if (pMsg == NULL) {
592✔
1422
    return TSDB_CODE_INVALID_PARA;
×
1423
  }
1424
  SMnode *        pMnode = pMsg->info.node;
592✔
1425
  SMDropCgroupReq dropReq = {0};
592✔
1426
  int32_t         code = 0;
592✔
1427
  int32_t         lino = 0;
592✔
1428

1429
  SMqSubscribeObj *pSub = NULL;
592✔
1430

1431
  PRINT_LOG_START
592✔
1432
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
592✔
1433
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
592✔
1434
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
592✔
1435
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
592✔
1436
  if (code != 0) {
592✔
1437
    if (dropReq.igNotExists) {
×
1438
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1439
      mndReleaseSubscribe(pMnode, pSub);
×
1440
      return 0;
×
1441
    } else {
1442
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1443
      goto END;
×
1444
    }
1445
  }
1446
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
592✔
1447

1448
END:
592✔
1449
  mndReleaseSubscribe(pMnode, pSub);
592✔
1450
  PRINT_LOG_END
592✔
1451

1452
  if (code != 0) {
592✔
1453
    TAOS_RETURN(code);
×
1454
  }
1455
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
592✔
1456
}
1457

1458
void mndCleanupSubscribe(SMnode *pMnode) {}
401,448✔
1459

1460
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
412,964✔
1461
  if (pSub == NULL) {
412,964✔
1462
    return NULL;
×
1463
  }
1464
  int32_t code = 0;
412,964✔
1465
  int32_t lino = 0;
412,964✔
1466
  terrno = TSDB_CODE_OUT_OF_MEMORY;
412,964✔
1467
  void *  buf = NULL;
412,964✔
1468
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
412,964✔
1469
  if (tlen <= 0) goto SUB_ENCODE_OVER;
412,964✔
1470
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
412,964✔
1471

1472
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
412,964✔
1473
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
412,964✔
1474

1475
  buf = taosMemoryMalloc(tlen);
412,964✔
1476
  if (buf == NULL) goto SUB_ENCODE_OVER;
412,964✔
1477

1478
  void *abuf = buf;
412,964✔
1479
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
412,964✔
1480
    goto SUB_ENCODE_OVER;
×
1481
  }
1482

1483
  int32_t dataPos = 0;
412,964✔
1484
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
412,964✔
1485
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
412,964✔
1486
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
412,964✔
1487
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
412,964✔
1488

1489
  terrno = TSDB_CODE_SUCCESS;
412,964✔
1490

1491
SUB_ENCODE_OVER:
412,964✔
1492
  taosMemoryFreeClear(buf);
412,964✔
1493
  if (terrno != TSDB_CODE_SUCCESS) {
412,964✔
1494
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1495
    sdbFreeRaw(pRaw);
×
1496
    return NULL;
×
1497
  }
1498

1499
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
412,964✔
1500
  return pRaw;
412,964✔
1501
}
1502

1503
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
332,656✔
1504
  if (pRaw == NULL) {
332,656✔
1505
    return NULL;
×
1506
  }
1507
  int32_t code = 0;
332,656✔
1508
  int32_t lino = 0;
332,656✔
1509
  terrno = TSDB_CODE_OUT_OF_MEMORY;
332,656✔
1510
  SSdbRow *        pRow = NULL;
332,656✔
1511
  SMqSubscribeObj *pSub = NULL;
332,656✔
1512
  void *           buf = NULL;
332,656✔
1513

1514
  int8_t sver = 0;
332,656✔
1515
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
332,656✔
1516

1517
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
332,656✔
1518
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1519
    goto SUB_DECODE_OVER;
×
1520
  }
1521

1522
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
332,656✔
1523
  if (pRow == NULL) goto SUB_DECODE_OVER;
332,656✔
1524

1525
  pSub = sdbGetRowObj(pRow);
332,656✔
1526
  if (pSub == NULL) goto SUB_DECODE_OVER;
332,656✔
1527

1528
  int32_t dataPos = 0;
332,656✔
1529
  int32_t tlen;
332,344✔
1530
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
332,656✔
1531
  buf = taosMemoryMalloc(tlen);
332,656✔
1532
  if (buf == NULL) goto SUB_DECODE_OVER;
332,656✔
1533
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
332,656✔
1534
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
332,656✔
1535

1536
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
332,656✔
1537
    goto SUB_DECODE_OVER;
×
1538
  }
1539

1540
  // update epset saved in mnode
1541
  if (pSub->unassignedVgs != NULL) {
332,656✔
1542
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
332,656✔
1543
    for (int32_t i = 0; i < size; ++i) {
907,260✔
1544
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
574,604✔
1545
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
574,604✔
1546
    }
1547
  }
1548
  if (pSub->consumerHash != NULL) {
332,656✔
1549
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
332,656✔
1550
    while (pIter) {
489,882✔
1551
      SMqConsumerEp *pConsumerEp = pIter;
157,226✔
1552
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
157,226✔
1553
      for (int32_t i = 0; i < size; ++i) {
563,769✔
1554
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
406,543✔
1555
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
406,543✔
1556
      }
1557
      pIter = taosHashIterate(pSub->consumerHash, pIter);
157,226✔
1558
    }
1559
  }
1560

1561
  terrno = TSDB_CODE_SUCCESS;
332,656✔
1562

1563
SUB_DECODE_OVER:
332,656✔
1564
  taosMemoryFreeClear(buf);
332,656✔
1565
  if (terrno != TSDB_CODE_SUCCESS) {
332,656✔
1566
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1567
    taosMemoryFreeClear(pRow);
×
1568
    return NULL;
×
1569
  }
1570

1571
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
332,656✔
1572
  return pRow;
332,656✔
1573
}
1574

1575
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
130,674✔
1576
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
130,674✔
1577
  return 0;
130,674✔
1578
}
1579

1580
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
332,656✔
1581
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
332,656✔
1582
  tDeleteSubscribeObj(pSub);
332,656✔
1583
  return 0;
332,656✔
1584
}
1585

1586
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
127,967✔
1587
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
127,967✔
1588
  mDebug("subscribe:%s, perform update action", pOldSub->key);
127,967✔
1589

1590
  taosWLockLatch(&pOldSub->lock);
127,967✔
1591
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
127,967✔
1592
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
127,967✔
1593
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
127,967✔
1594
  taosWUnLockLatch(&pOldSub->lock);
127,967✔
1595

1596
  return 0;
127,967✔
1597
}
1598

1599
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
4,531,836✔
1600
  if (pMnode == NULL || key == NULL || pSub == NULL) {
4,531,836✔
1601
    return TSDB_CODE_INVALID_PARA;
×
1602
  }
1603
  SSdb *pSdb = pMnode->pSdb;
4,531,836✔
1604
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
4,531,836✔
1605
  if (*pSub == NULL) {
4,531,836✔
1606
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
236,762✔
1607
  }
1608
  return 0;
4,295,074✔
1609
}
1610

1611
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
147,993✔
1612
  if (pMnode == NULL || topicName == NULL) return 0;
147,993✔
1613
  int32_t num = 0;
147,993✔
1614
  SSdb *  pSdb = pMnode->pSdb;
147,993✔
1615

1616
  void *           pIter = NULL;
147,993✔
1617
  SMqSubscribeObj *pSub = NULL;
147,993✔
1618
  while (1) {
195,467✔
1619
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
343,460✔
1620
    if (pIter == NULL) break;
343,460✔
1621

1622
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
195,467✔
1623
    char cgroup[TSDB_CGROUP_LEN] = {0};
195,467✔
1624
    taosRLockLatch(&pSub->lock);
195,467✔
1625
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
195,467✔
1626
    taosRUnLockLatch(&pSub->lock);
195,467✔
1627
    if (strcmp(topic, topicName) != 0) {
195,467✔
1628
      sdbRelease(pSdb, pSub);
105,578✔
1629
      continue;
105,578✔
1630
    }
1631

1632
    num++;
89,889✔
1633
    sdbRelease(pSdb, pSub);
89,889✔
1634
  }
1635

1636
  return num;
147,993✔
1637
}
1638

1639
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
4,413,455✔
1640
  if (pMnode == NULL || pSub == NULL) return;
4,413,455✔
1641
  SSdb *pSdb = pMnode->pSdb;
4,295,074✔
1642
  sdbRelease(pSdb, pSub);
4,295,074✔
1643
}
1644

1645
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
73,236✔
1646
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
73,236✔
1647
  int32_t code = 0;
73,236✔
1648
  int32_t lino = 0;
73,236✔
1649
  PRINT_LOG_START
73,236✔
1650
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
73,236✔
1651
  MND_TMQ_NULL_CHECK(pCommitRaw);
73,236✔
1652
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
73,236✔
1653
  if (code != 0) {
73,236✔
1654
    sdbFreeRaw(pCommitRaw);
×
1655
    goto END;
×
1656
  }
1657
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
73,236✔
1658
END:
73,236✔
1659
  PRINT_LOG_END
73,236✔
1660
  return code;
73,236✔
1661
}
1662

1663
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
142,015✔
1664
  int32_t code = 0;
142,015✔
1665
  int32_t lino = 0;
142,015✔
1666
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
142,015✔
1667
  char    cgroup[TSDB_CGROUP_LEN] = {0};
142,015✔
1668
  taosRLockLatch(&pSub->lock);
142,015✔
1669
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
142,015✔
1670
  if (strcmp(topic, topicName) != 0) {
142,015✔
1671
    goto END;
69,371✔
1672
  }
1673

1674
  // iter all vnode to delete handle
1675
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
72,644✔
1676
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1677
    goto END;
×
1678
  }
1679

1680
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
72,644✔
1681
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
72,644✔
1682

1683
END:
142,015✔
1684
  taosRUnLockLatch(&pSub->lock);
142,015✔
1685

1686
  return code;
142,015✔
1687
}
1688

1689
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
95,712✔
1690
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
95,712✔
1691
  SSdb *           pSdb = pMnode->pSdb;
95,712✔
1692
  int32_t          code = 0;
95,712✔
1693
  int32_t          lino = 0;
95,712✔
1694
  void *           pIter = NULL;
95,712✔
1695
  SMqSubscribeObj *pSub = NULL;
95,712✔
1696
  PRINT_LOG_START
95,712✔
1697
  while (1) {
1698
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
237,727✔
1699
    if (pIter == NULL) break;
237,727✔
1700

1701
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
142,015✔
1702
    sdbRelease(pSdb, pSub);
142,015✔
1703
  }
1704

1705
END:
95,712✔
1706
  PRINT_LOG_END
95,712✔
1707
  sdbRelease(pSdb, pSub);
95,712✔
1708
  sdbCancelFetch(pSdb, pIter);
95,712✔
1709

1710
  TAOS_RETURN(code);
95,712✔
1711
}
1712

1713
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
211,924✔
1714
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1715
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
211,924✔
1716
    return TSDB_CODE_INVALID_PARA;
×
1717
  }
1718
  int32_t code = 0;
211,924✔
1719
  int32_t lino = 0;
211,924✔
1720
  PRINT_LOG_START
211,924✔
1721
  int32_t sz = taosArrayGetSize(vgs);
211,924✔
1722
  for (int32_t j = 0; j < sz; j++) {
371,856✔
1723
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
159,932✔
1724
    MND_TMQ_NULL_CHECK(pVgEp);
159,932✔
1725

1726
    SColumnInfoData *pColInfo = NULL;
159,932✔
1727
    int32_t          cols = 0;
159,932✔
1728

1729
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1730
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1731
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
159,932✔
1732

1733
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1734
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1735
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
159,932✔
1736

1737
    // vg id
1738
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1739
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1740
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
159,932✔
1741

1742
    // consumer id
1743
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
159,932✔
1744
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
159,932✔
1745
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
159,932✔
1746

1747
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1748
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1749
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
159,932✔
1750

1751
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
159,932✔
1752
    if (user) STR_TO_VARSTR(userStr, user);
159,932✔
1753
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1754
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1755
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
159,932✔
1756

1757
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
159,932✔
1758
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
159,932✔
1759
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
159,932✔
1760
    MND_TMQ_NULL_CHECK(pColInfo);
159,932✔
1761
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
159,932✔
1762

1763
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
159,932✔
1764
          varDataVal(cgroup), pVgEp->vgId);
1765

1766
    // offset
1767
    OffsetRows *data = NULL;
159,932✔
1768
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
408,088✔
1769
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
248,156✔
1770
      MND_TMQ_NULL_CHECK(tmp);
248,156✔
1771
      if (tmp->vgId != pVgEp->vgId) {
248,156✔
1772
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1773
        continue;
175,480✔
1774
      }
1775
      data = tmp;
72,676✔
1776
    }
1777
    if (data) {
159,932✔
1778
      // vg id
1779
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
72,676✔
1780
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
72,676✔
1781
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
145,352✔
1782
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
72,676✔
1783
      varDataSetLen(buf, strlen(varDataVal(buf)));
72,676✔
1784
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
72,676✔
1785
      MND_TMQ_NULL_CHECK(pColInfo);
72,676✔
1786
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
72,676✔
1787
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
72,676✔
1788
      MND_TMQ_NULL_CHECK(pColInfo);
72,676✔
1789
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
72,676✔
1790
    } else {
1791
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
87,256✔
1792
      MND_TMQ_NULL_CHECK(pColInfo);
87,256✔
1793
      colDataSetNULL(pColInfo, *numOfRows);
87,256✔
1794
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
87,256✔
1795
      MND_TMQ_NULL_CHECK(pColInfo);
87,256✔
1796
      colDataSetNULL(pColInfo, *numOfRows);
87,256✔
1797
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
87,256✔
1798
    }
1799
    (*numOfRows)++;
159,932✔
1800
  }
1801

1802
END:
211,924✔
1803
  PRINT_LOG_END
211,924✔
1804
  return code;
211,924✔
1805
}
1806

1807
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOperUser, bool showAll, SSDataBlock *pBlock,
114,852✔
1808
                           int32_t *numOfRows, int32_t rowsCapacity) {
1809
  int32_t        code = 0;
114,852✔
1810
  int32_t        lino = 0;
114,852✔
1811
  SMnode        *pMnode = pReq->info.node;
114,852✔
1812
  SSdb          *pSdb = pMnode->pSdb;
114,852✔
1813
  SMqConsumerEp *pConsumerEp = NULL;
114,852✔
1814
  SMqTopicObj   *pTopic = NULL;
114,852✔
1815
  void          *pIter = NULL;
114,852✔
1816
  bool           showTopic = false;
114,852✔
1817
  PRINT_LOG_START
114,852✔
1818

1819
  taosRLockLatch(&pSub->lock);
114,852✔
1820
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
114,852✔
1821
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1822
  }
1823

1824
  // topic and cgroup
1825
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
114,852✔
1826
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
114,852✔
1827
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
114,852✔
1828
  varDataSetLen(topic, strlen(varDataVal(topic)));
114,852✔
1829
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
114,852✔
1830

1831
  if (!showAll) {
114,852✔
NEW
1832
    (void)mndAcquireTopic(pMnode, topic, &pTopic);
×
NEW
1833
    if (pTopic) {
×
NEW
1834
      SName name = {0};  // 1.topic1
×
NEW
1835
      if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
×
NEW
1836
        if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_SUBSCRIPTION_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
×
NEW
1837
                                          pTopic->db, name.dbname)) {
×
NEW
1838
          showTopic = true;
×
1839
        }
1840
      }
1841
    }
1842
  }
1843

1844
  while (1) {
97,072✔
1845
    pIter = taosHashIterate(pSub->consumerHash, pIter);
211,924✔
1846
    if (pIter == NULL) break;
211,924✔
1847
    pConsumerEp = (SMqConsumerEp *)pIter;
97,072✔
1848

1849
    char           *user = NULL;
97,072✔
1850
    char           *fqdn = NULL;
97,072✔
1851
    bool            subscribeOwner = false;
97,072✔
1852
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
97,072✔
1853
    if (pConsumer != NULL) {
97,072✔
1854
      user = pConsumer->user;
97,072✔
1855
      fqdn = pConsumer->fqdn;
97,072✔
1856
      if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
97,072✔
1857
        subscribeOwner = true;
97,072✔
1858
      }
1859
      sdbRelease(pSdb, pConsumer);
97,072✔
1860
    }
1861
    if (!showAll && !showTopic && !subscribeOwner) {
97,072✔
NEW
1862
      continue;
×
1863
    }
1864
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
97,072✔
1865
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1866
  }
1867

1868
  MND_TMQ_RETURN_CHECK(
114,852✔
1869
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1870

1871
  pBlock->info.rows = *numOfRows;
114,852✔
1872

1873
END:
114,852✔
1874
  mndReleaseTopic(pMnode, pTopic);
114,852✔
1875
  taosRUnLockLatch(&pSub->lock);
114,852✔
1876
  taosHashCancelIterate(pSub->consumerHash, pIter);
114,852✔
1877
  PRINT_LOG_END
114,852✔
1878
  return code;
114,852✔
1879
}
1880

1881
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
22,519✔
1882
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
22,519✔
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885
  SMnode          *pMnode = pReq->info.node;
22,519✔
1886
  SSdb            *pSdb = pMnode->pSdb;
22,519✔
1887
  int32_t          numOfRows = 0;
22,519✔
1888
  SMqSubscribeObj *pSub = NULL;
22,519✔
1889
  SUserObj        *pOperUser = NULL;
22,519✔
1890
  int32_t          code = 0;
22,519✔
1891
  int32_t          lino = 0;
22,519✔
1892
  bool             showAll = false;
22,519✔
1893
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
22,519✔
1894

1895
  mInfo("mnd show subscriptions begin");
22,519✔
1896
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
22,519✔
1897
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
22,519✔
1898
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_SUBSCRIPTION_SHOW,
22,519✔
1899
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1900

1901
  while (numOfRows < rowsCapacity) {
137,371✔
1902
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
137,371✔
1903
    if (pShow->pIter == NULL) {
137,371✔
1904
      break;
22,519✔
1905
    }
1906

1907
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pOperUser, showAll, pBlock, &numOfRows, rowsCapacity));
114,852✔
1908

1909
    sdbRelease(pSdb, pSub);
114,852✔
1910
    pSub = NULL;
114,852✔
1911
  }
1912
  mInfo("mnd end show subscriptions");
22,519✔
1913
  pShow->numOfRows += numOfRows;
22,519✔
1914

1915
END:
22,519✔
1916
  sdbCancelFetch(pSdb, pShow->pIter);
22,519✔
1917
  sdbRelease(pSdb, pSub);
22,519✔
1918
  mndReleaseUser(pMnode, pOperUser);
22,519✔
1919

1920
  if (code != 0) {
22,519✔
1921
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1922
    TAOS_RETURN(code);
×
1923
  } else {
1924
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
22,519✔
1925
    return numOfRows;
22,519✔
1926
  }
1927
}
1928

1929
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1930
  if (pMnode == NULL) {
×
1931
    return;
×
1932
  }
1933
  SSdb *pSdb = pMnode->pSdb;
×
1934
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc