• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4380

25 Jun 2025 06:58AM UTC coverage: 62.307% (-0.09%) from 62.393%
#4380

push

travis-ci

web-flow
feat(mqtt): mqtt subscription (#30127)

* feat(mqtt): Initial commit for mqtt

* chore(xnode/mnd): xnode message handlers for mnode

* chore(mnd/xnode): mnode part for xnode

* chore(xnode/translater): fix show commands

* fix(ast/creater): fix xnode create option

* fix(xnode/ci): fix ci & doc's error codes

* chore(xnode/sql): make create/drop/show work properly

* fix(xnode/sql): commit new files

* fix(xnode/sql): commit cmake files

* fix: fix testing cases

* fix(xnode/tsc): fix tokens

* fix(ast/anode): fix anode update decl.

* fix(xnode/error): fix xnode error codes

* fix: xnode make/destroy

* chore: xnode with option & dnode id

* chore: use taosmqtt for xnode

* chore: new error code for xnode launching

* chore(xnode): new error code

* chore: header for _xnode_mgmt_mqtt

* chore: source for _xnode_mgmt_mqtt

* chore: remove test directory from cmake

* chore: remove taosmqtt for ci to compile

* chore: remove taosudf header from xnode

* chore: new window macro

* chore: remove xnode mgmt mqtt for windows compilation

* Revert "chore: remove xnode mgmt mqtt for windows compilation"

This reverts commit 197e1640c.

* chore: cleanup code

* chore: xnode mgmt comment windows part out

* chore: mgmt/mqtt, move uv head toppest

* xnode/mnode: create xnode once per dnode

* fix(xnode/systable/test): fix column count

* xnode/sdb: renumber sdb type for xnode to make start/stop order correct

* xnode/mqtt: new param mqttPort

* fix SXnode's struct type

* transfer dnode id to mqtt subscription

* tmqtt: remove uv_a linking

* tmqtt/tools: sources for tools

* tools: fix windows compilation

* tools/producer: fix windows sleep param

* tools/producer: fix uninited var rc

* make tools only for linux

* test/mnodes: wail 1 or 2 seconds for offline to be leader

* update topic producer tool for geometry data type testing

* format tool sql statements

* show xnodes' ep

* make shell auto complete xnodes

* use usleep... (continued)

156642 of 320746 branches covered (48.84%)

Branch coverage included in aggregate %.

61 of 1020 new or added lines in 21 files covered. (5.98%)

1736 existing lines in 172 files now uncovered.

242538 of 319922 relevant lines covered (75.81%)

6277604.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.38
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
1,016✔
46
  if (pTrans == NULL || pSub == NULL) {
1,016!
47
    return TSDB_CODE_INVALID_PARA;
×
48
  }
49
  int32_t  code = 0;
1,016✔
50
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
1,016✔
51
  MND_TMQ_NULL_CHECK(pCommitRaw);
1,016!
52
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
1,016✔
53
  if (code != 0) {
1,016!
54
    sdbFreeRaw(pCommitRaw);
×
55
    goto END;
×
56
  }
57
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
1,016✔
58

59
END:
1,016✔
60
  return code;
1,016✔
61
}
62

63
int32_t mndInitSubscribe(SMnode *pMnode) {
2,032✔
64
  SSdbTable table = {
2,032✔
65
      .sdbType = SDB_SUBSCRIBE,
66
      .keyType = SDB_KEY_BINARY,
67
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
68
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
69
      .insertFp = (SdbInsertFp)mndSubActionInsert,
70
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
71
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
72
  };
73

74
  if (pMnode == NULL) {
2,032!
75
    return TSDB_CODE_INVALID_PARA;
×
76
  }
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
2,032✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
2,032✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
2,032✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
2,032✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
2,032✔
82

83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
2,032✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
2,032✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
2,032✔
87
}
88

89
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
500✔
90
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
500!
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
93
  int32_t code = 0;
500✔
94
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
500!
95
  (*pSub)->dbUid = pTopic->dbUid;
500✔
96
  (*pSub)->stbUid = pTopic->stbUid;
500✔
97
  (*pSub)->subType = pTopic->subType;
500✔
98
  (*pSub)->withMeta = pTopic->withMeta;
500✔
99

100
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
500!
101
  return code;
500✔
102

103
END:
×
104
  tDeleteSubscribeObj(*pSub);
×
105
  taosMemoryFree(*pSub);
×
106
  return code;
×
107
}
108

109
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,612✔
110
                                    SSubplan *pPlan) {
111
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
2,612!
112
    return TSDB_CODE_INVALID_PARA;
×
113
  }
114
  SMqRebVgReq req = {0};
2,612✔
115
  int32_t     code = 0;
2,612✔
116
  SEncoder encoder = {0};
2,612✔
117

118
  req.oldConsumerId = pRebVg->oldConsumerId;
2,612✔
119
  req.newConsumerId = pRebVg->newConsumerId;
2,612✔
120
  req.vgId = pRebVg->pVgEp->vgId;
2,612✔
121
  if (pPlan) {
2,612✔
122
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
2,133✔
123
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
2,133✔
124
    int32_t msgLen = 0;
2,133✔
125
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
2,133!
126
  } else {
127
    req.qmsg = taosStrdup("");
479!
128
    MND_TMQ_NULL_CHECK(req.qmsg);
479!
129
  }
130
  req.subType = pSub->subType;
2,612✔
131
  req.withMeta = pSub->withMeta;
2,612✔
132
  req.suid = pSub->stbUid;
2,612✔
133
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,612✔
134

135
  int32_t tlen = 0;
2,612✔
136
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,612!
137
  if (code < 0) {
2,612!
138
    goto END;
×
139
  }
140

141
  tlen += sizeof(SMsgHead);
2,612✔
142
  void *buf = taosMemoryMalloc(tlen);
2,612!
143
  MND_TMQ_NULL_CHECK(buf);
2,612!
144
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,612✔
145
  pMsgHead->contLen = htonl(tlen);
2,612✔
146
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,612✔
147

148
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,612✔
149
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,612!
150
  *pBuf = buf;
2,612✔
151
  *pLen = tlen;
2,612✔
152

153
END:
2,612✔
154
  tEncoderClear(&encoder);
2,612✔
155
  taosMemoryFree(req.qmsg);
2,612!
156
  return code;
2,612✔
157
}
158

159
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,614✔
160
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
161
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
2,614!
162
    return TSDB_CODE_INVALID_PARA;
×
163
  }
164
  int32_t code = 0;
2,614✔
165
  void   *buf  = NULL;
2,614✔
166

167
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,614✔
168
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
2!
169
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
170
    goto END;
×
171
  }
172

173
  int32_t tlen = 0;
2,612✔
174
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,612!
175
  int32_t vgId = pRebVg->pVgEp->vgId;
2,612✔
176
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,612✔
177
  if (pVgObj == NULL) {
2,612!
178
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
179
    goto END;
×
180
  }
181

182
  STransAction action = {0};
2,612✔
183
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,612✔
184
  action.pCont = buf;
2,612✔
185
  action.contLen = tlen;
2,612✔
186
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,612✔
187

188
  mndReleaseVgroup(pMnode, pVgObj);
2,612✔
189
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,612!
190
  return code;
2,612✔
191

192
END:
×
193
  taosMemoryFree(buf);
×
194
  return code;
×
195
}
196

197
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
3,570✔
198
  if (key == NULL || topic == NULL || cgroup == NULL) {
3,570!
199
    return;
×
200
  }
201
  int32_t i = 0;
3,570✔
202
  while (key[i] != TMQ_SEPARATOR_CHAR) {
25,819✔
203
    i++;
22,249✔
204
  }
205
  (void)memcpy(cgroup, key, i);
3,570✔
206
  cgroup[i] = 0;
3,570✔
207
  if (fullName) {
3,570✔
208
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
3,393✔
209
  } else {
210
    while (key[i] != '.') {
531✔
211
      i++;
354✔
212
    }
213
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
177✔
214
  }
215
}
216

217
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,395✔
218
  if (pHash == NULL || key == NULL) {
1,395!
219
    return TSDB_CODE_INVALID_PARA;
×
220
  }
221
  int32_t code = 0;
1,395✔
222
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,395✔
223
  if (pRebInfo == NULL) {
1,395✔
224
    pRebInfo = tNewSMqRebSubscribe(key);
1,325✔
225
    if (pRebInfo == NULL) {
1,325!
226
      code = terrno;
×
227
      goto END;
×
228
    }
229
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,325✔
230
    taosMemoryFreeClear(pRebInfo);
1,325!
231
    if (code != 0) {
1,325!
232
      goto END;
×
233
    }
234
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,325✔
235
    MND_TMQ_NULL_CHECK(pRebInfo);
1,325!
236
  }
237
  if (pReb){
1,395✔
238
    *pReb = pRebInfo;
1,142✔
239
  }
240

241
END:
253✔
242
  return code;
1,395✔
243
}
244

245
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
3,195✔
246
  if (vgs == NULL || pHash == NULL || key == NULL) {
3,195!
247
    return TSDB_CODE_INVALID_PARA;
×
248
  }
249
  int32_t         code = 0;
3,195✔
250
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
3,195✔
251
  MND_TMQ_NULL_CHECK(pVgEp);
3,195!
252
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
3,195✔
253
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
3,195!
254
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
3,195!
255
END:
×
256
  return code;
3,195✔
257
}
258

259
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,325✔
260
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
1,325!
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263
  int32_t code = 0;
1,325✔
264
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,325✔
265
  int32_t actualRemoved = 0;
1,325✔
266
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,863✔
267
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
538✔
268
    MND_TMQ_NULL_CHECK(consumerId);
538!
269
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
538✔
270
    if (pConsumerEp == NULL) {
538!
271
      continue;
×
272
    }
273

274
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
538✔
275
    for (int32_t j = 0; j < consumerVgNum; j++) {
1,828✔
276
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,290!
277
    }
278

279
    taosArrayDestroy(pConsumerEp->vgs);
538✔
280
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
538!
281
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
1,076!
282
    actualRemoved++;
538✔
283
  }
284

285
  if (numOfRemoved != actualRemoved) {
1,325!
286
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
287
           actualRemoved);
288
  } else {
289
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,325!
290
  }
291
END:
×
292
  return code;
1,325✔
293
}
294

295
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,325✔
296
  if (pOutput == NULL || pInput == NULL) {
1,325!
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
299
  int32_t code = 0;
1,325✔
300
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,325✔
301

302
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,929✔
303
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
604✔
304
    MND_TMQ_NULL_CHECK(consumerId);
604!
305
    SMqConsumerEp newConsumerEp = {0};
604✔
306
    newConsumerEp.consumerId = *consumerId;
604✔
307
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
604✔
308
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
604!
309
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
604!
310
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
1,208!
311
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
604!
312
  }
313
END:
1,325✔
314
  return code;
1,325✔
315
}
316

317
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,325✔
318
  if (pOutput == NULL || pHash == NULL) {
1,325!
319
    return TSDB_CODE_INVALID_PARA;
×
320
  }
321
  int32_t code = 0;
1,325✔
322
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,325✔
323
  for (int32_t i = 0; i < numOfVgroups; i++) {
3,216✔
324
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,891!
325
  }
326
END:
1,325✔
327
  return code;
1,325✔
328
}
329

330
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,325✔
331
                                     int32_t remainderVgCnt) {
332
  if (pOutput == NULL || pHash == NULL) {
1,325!
333
    return TSDB_CODE_INVALID_PARA;
×
334
  }
335
  int32_t code = 0;
1,325✔
336
  int32_t cnt = 0;
1,325✔
337
  void   *pIter = NULL;
1,325✔
338

339
  while (1) {
312✔
340
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,637✔
341
    if (pIter == NULL) {
1,637✔
342
      break;
1,325✔
343
    }
344

345
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
312✔
346
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
312✔
347

348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
624!
349
    if (consumerVgNum > minVgCnt) {
312✔
350
      if (cnt < remainderVgCnt) {
10✔
351
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
3!
352
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
353
        }
354
        cnt++;
3✔
355
      } else {
356
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
21✔
357
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
14!
358
        }
359
      }
360
    }
361
  }
362
END:
1,325✔
363
  return code;
1,325✔
364
}
365

366
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,325✔
367
  if (pMnode == NULL || pOutput == NULL) {
1,325!
368
    return TSDB_CODE_INVALID_PARA;
×
369
  }
370
  int32_t code = 0;
1,325✔
371
  int32_t totalVgNum = 0;
1,325✔
372
  SVgObj *pVgroup = NULL;
1,325✔
373
  SMqVgEp *pVgEp = NULL;
1,325✔
374
  void   *pIter = NULL;
1,325✔
375
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,325✔
376
  MND_TMQ_NULL_CHECK(newVgs);
1,325!
377
  while (1) {
378
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
7,993✔
379
    if (pIter == NULL) {
7,993✔
380
      break;
1,325✔
381
    }
382

383
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
6,668✔
384
      sdbRelease(pMnode->pSdb, pVgroup);
3,413✔
385
      continue;
3,413✔
386
    }
387

388
    totalVgNum++;
3,255✔
389
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
3,255!
390
    MND_TMQ_NULL_CHECK(pVgEp);
3,255!
391
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,255✔
392
    pVgEp->vgId = pVgroup->vgId;
3,255✔
393
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
3,255!
394
    pVgEp = NULL;
3,255✔
395
    sdbRelease(pMnode->pSdb, pVgroup);
3,255✔
396
  }
397

398
  pIter = NULL;
1,325✔
399
  while (1) {
850✔
400
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,175✔
401
    if (pIter == NULL) break;
2,175✔
402
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
850✔
403
    int32_t j = 0;
850✔
404
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
2,498✔
405
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,648✔
406
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,648!
407
      bool     find = false;
1,648✔
408
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
2,272✔
409
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
1,988✔
410
        MND_TMQ_NULL_CHECK(pnewVgEp);
1,988!
411
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
1,988✔
412
          tDeleteSMqVgEp(pnewVgEp);
1,364✔
413
          taosArrayRemove(newVgs, k);
1,364✔
414
          find = true;
1,364✔
415
          break;
1,364✔
416
        }
417
      }
418
      if (!find) {
1,648✔
419
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
284!
420
        tDeleteSMqVgEp(pVgEpTmp);
284✔
421
        taosArrayRemove(pConsumerEp->vgs, j);
284✔
422
        continue;
284✔
423
      }
424
      j++;
1,364✔
425
    }
426
  }
427

428
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,325✔
429
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
284!
430
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
284!
431
    taosArrayDestroy(newVgs);
284✔
432
  } else {
433
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
1,041✔
434
  }
435
  return totalVgNum;
1,325✔
436

437
END:
×
438
  sdbRelease(pMnode->pSdb, pVgroup);
×
439
  taosMemoryFree(pVgEp);
×
440
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
441
  return code;
×
442
}
443

444
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,325✔
445
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,325!
446
    return TSDB_CODE_INVALID_PARA;
×
447
  }
448
  SMqSubscribeObj *pSub = NULL;
1,325✔
449
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,325✔
450
  if( code != 0){
1,325✔
451
    return 0;
500✔
452
  }
453
  taosRLockLatch(&pSub->lock);
825✔
454
  if (pOutput->pSub->offsetRows == NULL) {
825✔
455
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
689✔
456
    if(pOutput->pSub->offsetRows == NULL) {
689!
457
      taosRUnLockLatch(&pSub->lock);
×
458
      code = terrno;
×
459
      goto END;
×
460
    }
461
  }
462
  void *pIter = NULL;
825✔
463
  while (1) {
850✔
464
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,675✔
465
    if (pIter == NULL) break;
1,675✔
466
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
850✔
467
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
850✔
468

469
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
2,435✔
470
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,585✔
471
      MND_TMQ_NULL_CHECK(d1);
1,585!
472
      bool        jump = false;
1,585✔
473
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
2,121✔
474
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
566✔
475
        MND_TMQ_NULL_CHECK(pVgEp);
566!
476
        if (pVgEp->vgId == d1->vgId) {
566✔
477
          jump = true;
30✔
478
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
30!
479
                pConsumerEp->consumerId, pVgEp->vgId);
480
          break;
30✔
481
        }
482
      }
483
      if (jump) continue;
1,585✔
484
      bool find = false;
1,555✔
485
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
3,110✔
486
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
1,731✔
487
        MND_TMQ_NULL_CHECK(d2);
1,731!
488
        if (d1->vgId == d2->vgId) {
1,731✔
489
          d2->rows += d1->rows;
176✔
490
          d2->offset = d1->offset;
176✔
491
          d2->ever = d1->ever;
176✔
492
          find = true;
176✔
493
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
176!
494
          break;
176✔
495
        }
496
      }
497
      if (!find) {
1,555✔
498
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,758!
499
      }
500
    }
501
  }
502
  taosRUnLockLatch(&pSub->lock);
825✔
503
  mndReleaseSubscribe(pMnode, pSub);
825✔
504

505
END:
825✔
506
  return code;
825✔
507
}
508

509
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,325✔
510
  if (pOutput == NULL) return;
1,325!
511
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,325!
512
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
4,520✔
513
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
3,195✔
514
    if (pOutputRebVg == NULL) continue;
3,195!
515
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
3,195!
516
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
517
  }
518

519
  void *pIter = NULL;
1,325✔
520
  while (1) {
916✔
521
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,241✔
522
    if (pIter == NULL) break;
2,241✔
523
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
916✔
524
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
916✔
525
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
916!
526
          pConsumerEp->consumerId, sz);
527
    for (int32_t i = 0; i < sz; i++) {
2,893✔
528
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
1,977✔
529
      if (pVgEp == NULL) continue;
1,977!
530
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
1,977!
531
            pConsumerEp->consumerId);
532
    }
533
  }
534
}
535

536
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,325✔
537
                           int32_t *remainderVgCnt) {
538
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
1,325!
539
    return;
×
540
  }
541
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,325✔
542
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,325✔
543
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,325✔
544

545
  // calc num
546
  if (numOfFinal != 0) {
1,325✔
547
    *minVgCnt = totalVgNum / numOfFinal;
852✔
548
    *remainderVgCnt = totalVgNum % numOfFinal;
852✔
549
  } else {
550
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
473!
551
  }
552
  mInfo(
1,325!
553
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
554
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
555
}
556

557
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,325✔
558
  if (pOutput == NULL || pHash == NULL) {
1,325!
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
561
  SMqRebOutputVg *pRebVg = NULL;
1,325✔
562
  void           *pAssignIter = NULL;
1,325✔
563
  void           *pIter = NULL;
1,325✔
564
  int32_t         code = 0;
1,325✔
565

566
  while (1) {
916✔
567
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,241✔
568
    if (pIter == NULL) {
2,241✔
569
      break;
1,325✔
570
    }
571
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
916✔
572
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,806✔
573
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,890✔
574
      if (pAssignIter == NULL) {
1,890!
575
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
576
        break;
×
577
      }
578

579
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,890✔
580
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,890✔
581
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,780!
582
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,890!
583
            pConsumerEp->consumerId);
584
    }
585
  }
586

587
  while (1) {
27✔
588
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,352✔
589
    if (pIter == NULL) {
1,352✔
590
      break;
473✔
591
    }
592
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
879✔
593
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
879!
594
      pAssignIter = taosHashIterate(pHash, pAssignIter);
879✔
595
      if (pAssignIter == NULL) {
879✔
596
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
852!
597
        break;
852✔
598
      }
599

600
      pRebVg = (SMqRebOutputVg *)pAssignIter;
27✔
601
      pRebVg->newConsumerId = pConsumerEp->consumerId;
27✔
602
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
54!
603
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
27!
604
            pConsumerEp->consumerId);
605
    }
606
  }
607

608
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,325✔
609
  if (pAssignIter != NULL) {
1,325!
610
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
611
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
612
    goto END;
×
613
  }
614
  while (1) {
3,195✔
615
    pAssignIter = taosHashIterate(pHash, pAssignIter);
4,520✔
616
    if (pAssignIter == NULL) {
4,520✔
617
      break;
1,325✔
618
    }
619

620
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
3,195✔
621
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
6,390!
622
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
3,195✔
623
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
2,556!
624
    }
625
  }
626

627
END:
1,325✔
628
  return code;
1,325✔
629
}
630

631
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,325✔
632
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,325!
633
    return TSDB_CODE_INVALID_PARA;
×
634
  }
635
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,325✔
636
  if (totalVgNum < 0){
1,325!
637
    return totalVgNum;
×
638
  }
639
  const char *pSubKey = pOutput->pSub->key;
1,325✔
640
  int32_t     minVgCnt = 0;
1,325✔
641
  int32_t     remainderVgCnt = 0;
1,325✔
642
  int32_t     code = 0;
1,325✔
643
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,325✔
644
  MND_TMQ_NULL_CHECK(pHash);
1,325!
645
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,325!
646
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,325!
647
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,325✔
648
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,325!
649
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,325!
650
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,325!
651
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,325!
652
  printRebalanceLog(pOutput);
1,325✔
653
  taosHashCleanup(pHash);
1,325✔
654

655
END:
1,325✔
656
  return code;
1,325✔
657
}
658

659
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
3,048✔
660
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
3,048!
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
663
  int32_t         code = 0;
3,048✔
664
  SMqConsumerObj *pConsumerNew = NULL;
3,048✔
665
  int32_t         consumerNum = taosArrayGetSize(consumers);
3,048✔
666
  for (int32_t i = 0; i < consumerNum; i++) {
4,193✔
667
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
1,145✔
668
    MND_TMQ_NULL_CHECK(consumerId);
1,145!
669
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
1,145!
670
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,145!
671
    tDeleteSMqConsumerObj(pConsumerNew);
1,145✔
672
  }
673
  pConsumerNew = NULL;
3,048✔
674

675
END:
3,048✔
676
  tDeleteSMqConsumerObj(pConsumerNew);
3,048✔
677
  return code;
3,048✔
678
}
679

680
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
1,016✔
681
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
1,016!
682
    return TSDB_CODE_INVALID_PARA;
×
683
  }
684
  int32_t code = 0;
1,016✔
685
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
1,016!
686
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
1,016!
687
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
1,016!
688
END:
1,016✔
689
  return code;
1,016✔
690
}
691

692
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,325✔
693
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
1,325!
694
    return TSDB_CODE_INVALID_PARA;
×
695
  }
696
  struct SSubplan *pPlan = NULL;
1,325✔
697
  int32_t          code = 0;
1,325✔
698
  STrans          *pTrans = NULL;
1,325✔
699

700
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,325✔
701
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
1,040!
702
  }
703

704
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,325✔
705
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,325✔
706
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,325✔
707

708
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,325✔
709
  if (pTrans == NULL) {
1,325!
710
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
711
    if (terrno != 0) code = terrno;
×
712
    goto END;
×
713
  }
714

715
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,325✔
716
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,325✔
717

718
  // 1. redo action: action to all vg
719
  const SArray *rebVgs = pOutput->rebVgs;
1,016✔
720
  int32_t       vgNum = taosArrayGetSize(rebVgs);
1,016✔
721
  for (int32_t i = 0; i < vgNum; i++) {
3,630✔
722
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,614✔
723
    MND_TMQ_NULL_CHECK(pRebVg);
2,614!
724
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,614!
725
  }
726

727
  // 2. commit log: subscribe and vg assignment
728
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
1,016!
729

730
  // 3. commit log: consumer to update status and epoch
731
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
1,016!
732

733
  // 4. set cb
734
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
1,016✔
735

736
  // 5. execution
737
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1,016!
738

739
END:
1,016✔
740
  nodesDestroyNode((SNode *)pPlan);
1,325✔
741
  mndTransDrop(pTrans);
1,325✔
742
  TAOS_RETURN(code);
1,325✔
743
}
744

745
static void freeRebalanceItem(void *param) {
1,325✔
746
  if (param == NULL) return;
1,325!
747
  SMqRebInfo *pInfo = param;
1,325✔
748
  taosArrayDestroy(pInfo->newConsumers);
1,325✔
749
  taosArrayDestroy(pInfo->removedConsumers);
1,325✔
750
}
751

752
// type = 0 remove  type = 1 add
753
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
2,026✔
754
  if (rebSubHash == NULL || topicList == NULL) {
2,026!
755
    return TSDB_CODE_INVALID_PARA;
×
756
  }
757
  taosRLockLatch(&pConsumer->lock);
2,026✔
758
  int32_t code = 0;
2,026✔
759
  int32_t topicNum = taosArrayGetSize(topicList);
2,026✔
760
  for (int32_t i = 0; i < topicNum; i++) {
3,168✔
761
    char *removedTopic = taosArrayGetP(topicList, i);
1,142✔
762
    MND_TMQ_NULL_CHECK(removedTopic);
1,142!
763
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,142✔
764
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
1,142✔
765
    SMqRebInfo *pRebSub = NULL;
1,142✔
766
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
1,142!
767
    if (type == 0)
1,142✔
768
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
1,076!
769
    else if (type == 1)
604!
770
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
1,208!
771
  }
772

773
END:
2,026✔
774
  taosRUnLockLatch(&pConsumer->lock);
2,026✔
775
  return code;
2,026✔
776
}
777

778
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,201✔
779
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
2,201!
780
    return;
×
781
  }
782
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,201✔
783
  for (int32_t i = 0; i < newTopicNum; i++) {
4,585✔
784
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,384✔
785
    if (topic == NULL){
2,384!
786
      continue;
×
787
    }
788
    SMqSubscribeObj *pSub = NULL;
2,384✔
789
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,384✔
790
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,384✔
791
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,384✔
792
    if (code != 0) {
2,384!
793
      continue;
×
794
    }
795
    taosRLockLatch(&pSub->lock);
2,384✔
796

797
    // iterate all vg assigned to the consumer of that topic
798
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,384✔
799
    if (pConsumerEp == NULL){
2,384!
800
      taosRUnLockLatch(&pSub->lock);
×
801
      mndReleaseSubscribe(pMnode, pSub);
×
802
      continue;
×
803
    }
804
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,384✔
805
    for (int32_t j = 0; j < vgNum; j++) {
7,419✔
806
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
5,035✔
807
      if (pVgEp == NULL) {
5,035!
808
        continue;
×
809
      }
810
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
5,035✔
811
      if (!pVgroup) {
5,035✔
812
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
253✔
813
        if (code != 0){
253!
814
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
815
        }else{
816
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
253!
817
        }
818
      }
819
      mndReleaseVgroup(pMnode, pVgroup);
5,035✔
820
    }
821
    taosRUnLockLatch(&pSub->lock);
2,384✔
822
    mndReleaseSubscribe(pMnode, pSub);
2,384✔
823
  }
824
}
825

826
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
26,107✔
827
  if (pMsg == NULL || rebSubHash == NULL) {
26,107!
828
    return TSDB_CODE_INVALID_PARA;
×
829
  }
830
  SMnode         *pMnode = pMsg->info.node;
26,107✔
831
  SSdb           *pSdb = pMnode->pSdb;
26,107✔
832
  SMqConsumerObj *pConsumer = NULL;
26,107✔
833
  void           *pIter = NULL;
26,107✔
834
  int32_t         code = 0;
26,107✔
835

836
  // iterate all consumers, find all modification
837
  while (1) {
3,600✔
838
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
29,707✔
839
    if (pIter == NULL) {
29,707✔
840
      break;
26,107✔
841
    }
842

843
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,600✔
844
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,600✔
845
    int32_t status = atomic_load_32(&pConsumer->status);
3,600✔
846

847
    mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,600!
848
           ", hbstatus:%d, pollStatus:%d",
849
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
850
           pConsumer->createTime, hbStatus, pollStatus);
851

852
    if (status == MQ_CONSUMER_STATUS_READY) {
3,600✔
853
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
2,592✔
854
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
381!
855
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,211✔
856
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,202✔
857
        mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
10!
858
           ", hb lost cnt:%d, or long time no poll cnt:%d",
859
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
860
           pConsumer->createTime, hbStatus, pollStatus);
861
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
10!
862
      } else {
863
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,201✔
864
      }
865
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
1,008!
866
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
1,008!
867
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
1,008!
868
    } else {
869
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
870
    }
871

872
    mndReleaseConsumer(pMnode, pConsumer);
3,600✔
873
  }
874
END:
26,107✔
875
  return code;
26,107✔
876
}
877

878
bool mndRebTryStart() {
26,107✔
879
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
26,107✔
880
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
26,107!
881
}
882

883
void mndRebCntInc() {
1,028✔
884
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
1,028✔
885
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
1,028!
886
}
1,028✔
887

888
void mndRebCntDec() {
27,135✔
889
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
27,135✔
890
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
27,135!
891
}
27,135✔
892

893
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,325✔
894
  if (rebOutput == NULL) {
1,325!
895
    return;
×
896
  }
897
  taosArrayDestroy(rebOutput->newConsumers);
1,325✔
898
  taosArrayDestroy(rebOutput->modifyConsumers);
1,325✔
899
  taosArrayDestroy(rebOutput->removedConsumers);
1,325✔
900
  taosArrayDestroy(rebOutput->rebVgs);
1,325✔
901
  tDeleteSubscribeObj(rebOutput->pSub);
1,325✔
902
  taosMemoryFree(rebOutput->pSub);
1,325!
903
}
904

905
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,325✔
906
  if (rebOutput == NULL) {
1,325!
907
    return TSDB_CODE_INVALID_PARA;
×
908
  }
909
  int32_t code = 0;
1,325✔
910
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,325✔
911
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,325!
912
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,325✔
913
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,325!
914
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,325✔
915
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,325!
916
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,325✔
917
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,325!
918
  return code;
1,325✔
919

920
END:
×
921
  clearRebOutput(rebOutput);
×
922
  return code;
×
923
}
924

925
// This function only works when there are dirty consumers
926
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
825✔
927
  if (pMnode == NULL || pSub == NULL) {
825!
928
    return TSDB_CODE_INVALID_PARA;
×
929
  }
930
  int32_t code = 0;
825✔
931
  void   *pIter = NULL;
825✔
932
  while (1) {
850✔
933
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,675✔
934
    if (pIter == NULL) {
1,675✔
935
      break;
825✔
936
    }
937

938
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
850✔
939
    SMqConsumerObj *pConsumer = NULL;
850✔
940
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
850✔
941
    if (code == 0) {
850!
942
      mndReleaseConsumer(pMnode, pConsumer);
850✔
943
      continue;
850✔
944
    }
945
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
946
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
947

948
    taosArrayDestroy(pConsumerEp->vgs);
×
949
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
950
  }
951
END:
825✔
952
  return code;
825✔
953
}
954

955
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,325✔
956
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
1,325!
957
    return TSDB_CODE_INVALID_PARA;
×
958
  }
959
  const char      *key = rebInput->pRebInfo->key;
1,325✔
960
  SMqSubscribeObj *pSub = NULL;
1,325✔
961
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,325✔
962

963
  if (code != 0) {
1,325✔
964
    // split sub key and extract topic
965
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
500✔
966
    char cgroup[TSDB_CGROUP_LEN] = {0};
500✔
967
    mndSplitSubscribeKey(key, topic, cgroup, true);
500✔
968
    SMqTopicObj *pTopic = NULL;
500✔
969
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
500!
970
    taosRLockLatch(&pTopic->lock);
500✔
971

972
    rebInput->oldConsumerNum = 0;
500✔
973
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
500✔
974
    if (code != 0) {
500!
975
      mError("tmq rebalance mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
976
      taosRUnLockLatch(&pTopic->lock);
×
977
      mndReleaseTopic(pMnode, pTopic);
×
978
      return code;
×
979
    }
980

981
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
500✔
982
    taosRUnLockLatch(&pTopic->lock);
500✔
983
    mndReleaseTopic(pMnode, pTopic);
500✔
984

985
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
500!
986
  } else {
987
    taosRLockLatch(&pSub->lock);
825✔
988
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
825✔
989
    if(code != 0){
825!
990
      taosRUnLockLatch(&pSub->lock);
×
991
      goto END;
×
992
    }
993
    code = checkConsumer(pMnode, rebOutput->pSub);
825✔
994
    if(code != 0){
825!
995
      taosRUnLockLatch(&pSub->lock);
×
996
      goto END;
×
997
    }
998
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
825✔
999
    taosRUnLockLatch(&pSub->lock);
825✔
1000

1001
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
825!
1002
    mndReleaseSubscribe(pMnode, pSub);
825✔
1003
  }
1004

1005
END:
1,325✔
1006
  return code;
1,325✔
1007
}
1008

1009
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
26,107✔
1010
  if (pMsg == NULL) {
26,107!
1011
    return TSDB_CODE_INVALID_PARA;
×
1012
  }
1013
  int     code = 0;
26,107✔
1014
  void   *pIter = NULL;
26,107✔
1015
  SMnode *pMnode = pMsg->info.node;
26,107✔
1016
  PRINT_LOG_START;
26,107✔
1017
  if (!mndRebTryStart()) {
26,107!
UNCOV
1018
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
UNCOV
1019
    return code;
×
1020
  }
1021

1022
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
26,107✔
1023
  MND_TMQ_NULL_CHECK(rebSubHash);
26,107!
1024

1025
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
26,107✔
1026

1027
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
26,107!
1028
  if (taosHashGetSize(rebSubHash) > 0) {
26,107✔
1029
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
1,111!
1030
  }
1031

1032
  while (1) {
1,325✔
1033
    pIter = taosHashIterate(rebSubHash, pIter);
27,432✔
1034
    if (pIter == NULL) {
27,432✔
1035
      break;
26,107✔
1036
    }
1037

1038
    SMqRebInputObj  rebInput = {0};
1,325✔
1039
    SMqRebOutputObj rebOutput = {0};
1,325✔
1040
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,325!
1041
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,325✔
1042
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,325✔
1043
    if (code != 0) {
1,325!
1044
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1045
    }
1046

1047
    if (code == 0){
1,325!
1048
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,325✔
1049
      if (code != 0) {
1,325!
1050
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1051
      }
1052
    }
1053

1054
    if (code == 0){
1,325!
1055
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,325✔
1056
      if (code != 0) {
1,325✔
1057
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
309!
1058
      }
1059
    }
1060

1061
    clearRebOutput(&rebOutput);
1,325✔
1062
  }
1063

1064
  if (taosHashGetSize(rebSubHash) > 0) {
26,107✔
1065
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
1,111!
1066
  }
1067

1068
END:
24,996✔
1069
  taosHashCancelIterate(rebSubHash, pIter);
26,107✔
1070
  taosHashCleanup(rebSubHash);
26,107✔
1071
  mndRebCntDec();
26,107✔
1072

1073
  PRINT_LOG_END(code);
26,107!
1074
  TAOS_RETURN(code);
26,107✔
1075
}
1076

1077
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
244✔
1078
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
244!
1079
    return TSDB_CODE_INVALID_PARA;
×
1080
  }
1081
  void   *pIter = NULL;
244✔
1082
  SVgObj *pVgObj = NULL;
244✔
1083
  int32_t code = 0;
244✔
1084
  while (1) {
1,650✔
1085
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
1,894✔
1086
    if (pIter == NULL) {
1,894✔
1087
      break;
244✔
1088
    }
1089

1090
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,650✔
1091
      sdbRelease(pMnode->pSdb, pVgObj);
848✔
1092
      continue;
848✔
1093
    }
1094
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
802!
1095
    MND_TMQ_NULL_CHECK(pReq);
802!
1096
    pReq->head.vgId = htonl(pVgObj->vgId);
802✔
1097
    pReq->vgId = pVgObj->vgId;
802✔
1098
    pReq->consumerId = -1;
802✔
1099
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
802✔
1100

1101
    STransAction action = {0};
802✔
1102
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
802✔
1103
    action.pCont = pReq;
802✔
1104
    action.contLen = sizeof(SMqVDeleteReq);
802✔
1105
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
802✔
1106
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
802✔
1107

1108
    sdbRelease(pMnode->pSdb, pVgObj);
802✔
1109
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
802!
1110
  }
1111

1112
END:
244✔
1113
  sdbRelease(pMnode->pSdb, pVgObj);
244✔
1114
  sdbCancelFetch(pMnode->pSdb, pIter);
244✔
1115
  return code;
244✔
1116
}
1117

1118
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
6✔
1119
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
6!
1120
    return TSDB_CODE_INVALID_PARA;
×
1121
  }
1122
  void           *pIter = NULL;
6✔
1123
  SMqConsumerObj *pConsumer = NULL;
6✔
1124
  SMqConsumerObj *pConsumerNew = NULL;
6✔
1125
  int             code = 0;
6✔
1126
  while (1) {
1127
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
8✔
1128
    if (pIter == NULL) {
8✔
1129
      break;
6✔
1130
    }
1131

1132
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
2!
1133
      sdbRelease(pMnode->pSdb, pConsumer);
×
1134
      continue;
×
1135
    }
1136

1137
    if (deleteConsumer) {
2✔
1138
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
1!
1139
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
1!
1140
      tDeleteSMqConsumerObj(pConsumerNew);
1✔
1141
      pConsumerNew = NULL;
1✔
1142
    } else {
1143
      bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1144
      if (found){
1!
1145
        mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1146
               topic, pConsumer->consumerId, pConsumer->cgroup);
1147
        code = TSDB_CODE_MND_CGROUP_USED;
×
1148
        goto END;
×
1149
      }
1150
    }
1151

1152

1153
    sdbRelease(pMnode->pSdb, pConsumer);
2✔
1154
  }
1155

1156
END:
6✔
1157
  tDeleteSMqConsumerObj(pConsumerNew);
6✔
1158
  sdbRelease(pMnode->pSdb, pConsumer);
6✔
1159
  sdbCancelFetch(pMnode->pSdb, pIter);
6✔
1160
  return code;
6✔
1161
}
1162

1163
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
6✔
1164
  if (pMsg == NULL) {
6!
1165
    return TSDB_CODE_INVALID_PARA;
×
1166
  }
1167
  SMnode         *pMnode = pMsg->info.node;
6✔
1168
  SMDropCgroupReq dropReq = {0};
6✔
1169
  STrans         *pTrans = NULL;
6✔
1170
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
6✔
1171
  SMqSubscribeObj *pSub = NULL;
6✔
1172

1173
  PRINT_LOG_START
6!
1174
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
6!
1175
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
6✔
1176
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
6✔
1177
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
6✔
1178
  if (code != 0) {
6!
1179
    if (dropReq.igNotExists) {
×
1180
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1181
      return 0;
×
1182
    } else {
1183
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1184
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1185
      return code;
×
1186
    }
1187
  }
1188

1189
  taosWLockLatch(&pSub->lock);
6✔
1190
  if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
6!
1191
    code = TSDB_CODE_MND_CGROUP_USED;
×
1192
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1193
    goto END;
×
1194
  }
1195

1196
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
6✔
1197
  MND_TMQ_NULL_CHECK(pTrans);
6!
1198
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
6!
1199
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
6✔
1200
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
6!
1201
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
6!
1202
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
6!
1203
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
6!
1204
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
6!
1205

1206
END:
6✔
1207
  taosWUnLockLatch(&pSub->lock);
6✔
1208
  mndReleaseSubscribe(pMnode, pSub);
6✔
1209
  mndTransDrop(pTrans);
6✔
1210
  PRINT_LOG_END(code);
6!
1211

1212
  if (code != 0) {
6!
1213
    TAOS_RETURN(code);
×
1214
  }
1215
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
6✔
1216
}
1217

1218
void mndCleanupSubscribe(SMnode *pMnode) {}
2,031✔
1219

1220
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,663✔
1221
  if (pSub == NULL) {
1,663!
1222
    return NULL;
×
1223
  }
1224
  int32_t code = 0;
1,663✔
1225
  int32_t lino = 0;
1,663✔
1226
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,663✔
1227
  void   *buf = NULL;
1,663✔
1228
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,663✔
1229
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,663!
1230
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,663✔
1231

1232
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,663✔
1233
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,663!
1234

1235
  buf = taosMemoryMalloc(tlen);
1,663!
1236
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,663!
1237

1238
  void *abuf = buf;
1,663✔
1239
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,663!
1240
    goto SUB_ENCODE_OVER;
×
1241
  }
1242

1243
  int32_t dataPos = 0;
1,663✔
1244
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,663!
1245
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,663!
1246
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,663!
1247
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,663!
1248

1249
  terrno = TSDB_CODE_SUCCESS;
1,663✔
1250

1251
SUB_ENCODE_OVER:
1,663✔
1252
  taosMemoryFreeClear(buf);
1,663!
1253
  if (terrno != TSDB_CODE_SUCCESS) {
1,663!
1254
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1255
    sdbFreeRaw(pRaw);
×
1256
    return NULL;
×
1257
  }
1258

1259
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,663✔
1260
  return pRaw;
1,663✔
1261
}
1262

1263
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,295✔
1264
  if (pRaw == NULL) {
1,295!
1265
    return NULL;
×
1266
  }
1267
  int32_t code = 0;
1,295✔
1268
  int32_t lino = 0;
1,295✔
1269
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,295✔
1270
  SSdbRow         *pRow = NULL;
1,295✔
1271
  SMqSubscribeObj *pSub = NULL;
1,295✔
1272
  void            *buf = NULL;
1,295✔
1273

1274
  int8_t sver = 0;
1,295✔
1275
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,295!
1276

1277
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,295!
1278
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1279
    goto SUB_DECODE_OVER;
×
1280
  }
1281

1282
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,295✔
1283
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,295!
1284

1285
  pSub = sdbGetRowObj(pRow);
1,295✔
1286
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,295!
1287

1288
  int32_t dataPos = 0;
1,295✔
1289
  int32_t tlen;
1290
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,295!
1291
  buf = taosMemoryMalloc(tlen);
1,295!
1292
  if (buf == NULL) goto SUB_DECODE_OVER;
1,295!
1293
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,295!
1294
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,295!
1295

1296
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,295!
1297
    goto SUB_DECODE_OVER;
×
1298
  }
1299

1300
  // update epset saved in mnode
1301
  if (pSub->unassignedVgs != NULL) {
1,295!
1302
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,295✔
1303
    for (int32_t i = 0; i < size; ++i) {
3,285✔
1304
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
1,990✔
1305
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,990✔
1306
    }
1307
  }
1308
  if (pSub->consumerHash != NULL) {
1,295!
1309
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,295✔
1310
    while (pIter) {
1,986✔
1311
      SMqConsumerEp *pConsumerEp = pIter;
691✔
1312
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
691✔
1313
      for (int32_t i = 0; i < size; ++i) {
2,242✔
1314
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,551✔
1315
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,551✔
1316
      }
1317
      pIter = taosHashIterate(pSub->consumerHash, pIter);
691✔
1318
    }
1319
  }
1320

1321
  terrno = TSDB_CODE_SUCCESS;
1,295✔
1322

1323
SUB_DECODE_OVER:
1,295✔
1324
  taosMemoryFreeClear(buf);
1,295!
1325
  if (terrno != TSDB_CODE_SUCCESS) {
1,295!
1326
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1327
    taosMemoryFreeClear(pRow);
×
1328
    return NULL;
×
1329
  }
1330

1331
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,295✔
1332
  return pRow;
1,295✔
1333
}
1334

1335
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
515✔
1336
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
515!
1337
  return 0;
515✔
1338
}
1339

1340
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,295✔
1341
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
1,295!
1342
  tDeleteSubscribeObj(pSub);
1,295✔
1343
  return 0;
1,295✔
1344
}
1345

1346
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
536✔
1347
  if (pOldSub == NULL || pNewSub == NULL) return -1;
536!
1348
  mTrace("subscribe:%s, perform update action", pOldSub->key);
536✔
1349
  taosWLockLatch(&pOldSub->lock);
536✔
1350

1351
  SHashObj *tmp = pOldSub->consumerHash;
536✔
1352
  pOldSub->consumerHash = pNewSub->consumerHash;
536✔
1353
  pNewSub->consumerHash = tmp;
536✔
1354

1355
  SArray *tmp1 = pOldSub->unassignedVgs;
536✔
1356
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
536✔
1357
  pNewSub->unassignedVgs = tmp1;
536✔
1358

1359
  SArray *tmp2 = pOldSub->offsetRows;
536✔
1360
  pOldSub->offsetRows = pNewSub->offsetRows;
536✔
1361
  pNewSub->offsetRows = tmp2;
536✔
1362

1363
  taosWUnLockLatch(&pOldSub->lock);
536✔
1364
  return 0;
536✔
1365
}
1366

1367
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
11,086✔
1368
  if (pMnode == NULL || key == NULL || pSub == NULL){
11,086!
1369
    return TSDB_CODE_INVALID_PARA;
×
1370
  }
1371
  SSdb            *pSdb = pMnode->pSdb;
11,086✔
1372
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
11,086✔
1373
  if (*pSub == NULL) {
11,086✔
1374
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
1,000✔
1375
  }
1376
  return 0;
10,086✔
1377
}
1378

1379
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
602✔
1380
  if (pMnode == NULL || topicName == NULL) return 0;
602!
1381
  int32_t num = 0;
602✔
1382
  SSdb   *pSdb = pMnode->pSdb;
602✔
1383

1384
  void            *pIter = NULL;
602✔
1385
  SMqSubscribeObj *pSub = NULL;
602✔
1386
  while (1) {
1,104✔
1387
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
1,706✔
1388
    if (pIter == NULL) break;
1,706✔
1389

1390
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,104✔
1391
    char cgroup[TSDB_CGROUP_LEN] = {0};
1,104✔
1392
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
1,104✔
1393
    if (strcmp(topic, topicName) != 0) {
1,104✔
1394
      sdbRelease(pSdb, pSub);
746✔
1395
      continue;
746✔
1396
    }
1397

1398
    num++;
358✔
1399
    sdbRelease(pSdb, pSub);
358✔
1400
  }
1401

1402
  return num;
602✔
1403
}
1404

1405
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
10,086✔
1406
  if (pMnode == NULL || pSub == NULL) return;
10,086!
1407
  SSdb *pSdb = pMnode->pSdb;
10,086✔
1408
  sdbRelease(pSdb, pSub);
10,086✔
1409
}
1410

1411
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
244✔
1412
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
244!
1413
  int32_t  code = 0;
244✔
1414
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
244✔
1415
  MND_TMQ_NULL_CHECK(pCommitRaw);
244!
1416
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
244✔
1417
  if (code != 0){
244!
1418
    sdbFreeRaw(pCommitRaw);
×
1419
    goto END;
×
1420
  }
1421
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
244✔
1422
END:
244✔
1423
  return code;
244✔
1424
}
1425

1426
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
323✔
1427
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
323!
1428
  SSdb            *pSdb = pMnode->pSdb;
323✔
1429
  int32_t          code = 0;
323✔
1430
  void            *pIter = NULL;
323✔
1431
  SMqSubscribeObj *pSub = NULL;
323✔
1432
  while (1) {
464✔
1433
    sdbRelease(pSdb, pSub);
787✔
1434
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
787✔
1435
    if (pIter == NULL) break;
787✔
1436

1437
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
464✔
1438
    char cgroup[TSDB_CGROUP_LEN] = {0};
464✔
1439
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
464✔
1440
    if (strcmp(topic, topicName) != 0) {
464✔
1441
      continue;
226✔
1442
    }
1443

1444
    // iter all vnode to delete handle
1445
    if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
238!
1446
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1447
      goto END;
×
1448
    }
1449

1450
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
238!
1451
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
238!
1452
  }
1453

1454
END:
323✔
1455
  sdbRelease(pSdb, pSub);
323✔
1456
  sdbCancelFetch(pSdb, pIter);
323✔
1457

1458
  TAOS_RETURN(code);
323✔
1459
}
1460

1461
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
288✔
1462
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1463
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
288!
1464
    return TSDB_CODE_INVALID_PARA;
×
1465
  }
1466
  int32_t code = 0;
288✔
1467
  int32_t sz = taosArrayGetSize(vgs);
288✔
1468
  for (int32_t j = 0; j < sz; j++) {
657✔
1469
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
369✔
1470
    MND_TMQ_NULL_CHECK(pVgEp);
369!
1471

1472
    SColumnInfoData *pColInfo = NULL;
369✔
1473
    int32_t          cols = 0;
369✔
1474

1475
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1476
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1477
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
369!
1478

1479
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1480
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1481
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
369!
1482

1483
    // vg id
1484
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1485
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1486
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
369!
1487

1488
    // consumer id
1489
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
369✔
1490
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
369✔
1491
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
369✔
1492

1493
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1494
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1495
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
369!
1496

1497
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
369✔
1498
    if (user) STR_TO_VARSTR(userStr, user);
369✔
1499
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1500
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1501
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
369!
1502

1503
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
369✔
1504
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
369✔
1505
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
369✔
1506
    MND_TMQ_NULL_CHECK(pColInfo);
369!
1507
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
369!
1508

1509
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
369!
1510
          varDataVal(cgroup), pVgEp->vgId);
1511

1512
    // offset
1513
    OffsetRows *data = NULL;
369✔
1514
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,364✔
1515
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
995✔
1516
      MND_TMQ_NULL_CHECK(tmp);
995!
1517
      if (tmp->vgId != pVgEp->vgId) {
995✔
1518
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1519
        continue;
736✔
1520
      }
1521
      data = tmp;
259✔
1522
    }
1523
    if (data) {
369✔
1524
      // vg id
1525
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
259✔
1526
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
259✔
1527
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
259✔
1528
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
259✔
1529
      varDataSetLen(buf, strlen(varDataVal(buf)));
259✔
1530
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
259✔
1531
      MND_TMQ_NULL_CHECK(pColInfo);
259!
1532
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
259!
1533
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
259✔
1534
      MND_TMQ_NULL_CHECK(pColInfo);
259!
1535
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
259!
1536
    } else {
1537
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
110✔
1538
      MND_TMQ_NULL_CHECK(pColInfo);
110!
1539
      colDataSetNULL(pColInfo, *numOfRows);
110!
1540
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
110✔
1541
      MND_TMQ_NULL_CHECK(pColInfo);
110!
1542
      colDataSetNULL(pColInfo, *numOfRows);
110!
1543
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
110!
1544
    }
1545
    (*numOfRows)++;
369✔
1546
  }
1547
  return 0;
288✔
1548
END:
×
1549
  return code;
×
1550
}
1551

1552
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
90✔
1553
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
90!
1554
    return TSDB_CODE_INVALID_PARA;
×
1555
  }
1556
  SMnode          *pMnode = pReq->info.node;
90✔
1557
  SSdb            *pSdb = pMnode->pSdb;
90✔
1558
  int32_t          numOfRows = 0;
90✔
1559
  SMqSubscribeObj *pSub = NULL;
90✔
1560
  int32_t          code = 0;
90✔
1561

1562
  mInfo("mnd show subscriptions begin");
90!
1563

1564
  while (numOfRows < rowsCapacity) {
267!
1565
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
267✔
1566
    if (pShow->pIter == NULL) {
267✔
1567
      break;
90✔
1568
    }
1569

1570
    taosRLockLatch(&pSub->lock);
177✔
1571

1572
    if (numOfRows + pSub->vgNum > rowsCapacity) {
177!
1573
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1574
    }
1575

1576
    // topic and cgroup
1577
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
1578
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
1579
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
177✔
1580
    varDataSetLen(topic, strlen(varDataVal(topic)));
177✔
1581
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
177✔
1582

1583
    SMqConsumerEp *pConsumerEp = NULL;
177✔
1584
    void          *pIter = NULL;
177✔
1585

1586
    while (1) {
111✔
1587
      pIter = taosHashIterate(pSub->consumerHash, pIter);
288✔
1588
      if (pIter == NULL) break;
288✔
1589
      pConsumerEp = (SMqConsumerEp *)pIter;
111✔
1590

1591
      char          *user = NULL;
111✔
1592
      char          *fqdn = NULL;
111✔
1593
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
111✔
1594
      if (pConsumer != NULL) {
111!
1595
        user = pConsumer->user;
111✔
1596
        fqdn = pConsumer->fqdn;
111✔
1597
        sdbRelease(pSdb, pConsumer);
111✔
1598
      }
1599
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
111!
1600
                  pConsumerEp->offsetRows));
1601
    }
1602

1603
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
177!
1604

1605
    pBlock->info.rows = numOfRows;
177✔
1606

1607
    taosRUnLockLatch(&pSub->lock);
177✔
1608
    sdbRelease(pSdb, pSub);
177✔
1609
  }
1610

1611
  mInfo("mnd end show subscriptions");
90!
1612

1613
  pShow->numOfRows += numOfRows;
90✔
1614
  return numOfRows;
90✔
1615

1616
END:
×
1617
  taosRUnLockLatch(&pSub->lock);
×
1618
  sdbRelease(pSdb, pSub);
×
1619

UNCOV
1620
  return code;
×
1621
}
1622

1623
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1624
  if (pMnode == NULL) {
×
1625
    return;
×
1626
  }
1627
  SSdb *pSdb = pMnode->pSdb;
×
1628
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1629
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc