• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.42
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
1,078✔
46
  int32_t  code = 0;
1,078✔
47
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
1,078✔
48
  MND_TMQ_NULL_CHECK(pCommitRaw);
1,078!
49
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
1,078✔
50
  if (code != 0) {
1,078!
51
    sdbFreeRaw(pCommitRaw);
×
52
    goto END;
×
53
  }
54
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
1,078✔
55

56
END:
1,078✔
57
  return code;
1,078✔
58
}
59

60
int32_t mndInitSubscribe(SMnode *pMnode) {
1,996✔
61
  SSdbTable table = {
1,996✔
62
      .sdbType = SDB_SUBSCRIBE,
63
      .keyType = SDB_KEY_BINARY,
64
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
65
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
66
      .insertFp = (SdbInsertFp)mndSubActionInsert,
67
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
68
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
69
  };
70

71
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
1,996✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
1,996✔
73
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
1,996✔
74
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
1,996✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
1,996✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
1,996✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
1,996✔
79

80
  return sdbSetTable(pMnode->pSdb, table);
1,996✔
81
}
82

83
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
482✔
84
  int32_t code = 0;
482✔
85
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
482!
86
  (*pSub)->dbUid = pTopic->dbUid;
482✔
87
  (*pSub)->stbUid = pTopic->stbUid;
482✔
88
  (*pSub)->subType = pTopic->subType;
482✔
89
  (*pSub)->withMeta = pTopic->withMeta;
482✔
90

91
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
482!
92
  return code;
482✔
93

94
END:
×
95
  tDeleteSubscribeObj(*pSub);
×
96
  taosMemoryFree(*pSub);
×
97
  return code;
×
98
}
99

100
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,974✔
101
                                    SSubplan *pPlan) {
102
  SMqRebVgReq req = {0};
2,974✔
103
  int32_t     code = 0;
2,974✔
104
  SEncoder encoder = {0};
2,974✔
105

106
  req.oldConsumerId = pRebVg->oldConsumerId;
2,974✔
107
  req.newConsumerId = pRebVg->newConsumerId;
2,974✔
108
  req.vgId = pRebVg->pVgEp->vgId;
2,974✔
109
  if (pPlan) {
2,974✔
110
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
2,497✔
111
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
2,497✔
112
    int32_t msgLen = 0;
2,497✔
113
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
2,497!
114
  } else {
115
    req.qmsg = taosStrdup("");
477✔
116
    MND_TMQ_NULL_CHECK(req.qmsg);
477!
117
  }
118
  req.subType = pSub->subType;
2,974✔
119
  req.withMeta = pSub->withMeta;
2,974✔
120
  req.suid = pSub->stbUid;
2,974✔
121
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,974✔
122

123
  int32_t tlen = 0;
2,974✔
124
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,974!
125
  if (code < 0) {
2,974!
126
    goto END;
×
127
  }
128

129
  tlen += sizeof(SMsgHead);
2,974✔
130
  void *buf = taosMemoryMalloc(tlen);
2,974✔
131
  MND_TMQ_NULL_CHECK(buf);
2,974!
132
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,974✔
133
  pMsgHead->contLen = htonl(tlen);
2,974✔
134
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,974✔
135

136
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,974✔
137
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,974!
138
  *pBuf = buf;
2,974✔
139
  *pLen = tlen;
2,974✔
140

141
END:
2,974✔
142
  tEncoderClear(&encoder);
2,974✔
143
  taosMemoryFree(req.qmsg);
2,974✔
144
  return code;
2,974✔
145
}
146

147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,974✔
148
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
149
  int32_t code = 0;
2,974✔
150
  void   *buf  = NULL;
2,974✔
151

152
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,974!
153
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
154
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
155
    goto END;
×
156
  }
157

158
  int32_t tlen = 0;
2,974✔
159
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,974!
160
  int32_t vgId = pRebVg->pVgEp->vgId;
2,974✔
161
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,974✔
162
  if (pVgObj == NULL) {
2,974!
163
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
164
    goto END;
×
165
  }
166

167
  STransAction action = {0};
2,974✔
168
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,974✔
169
  action.pCont = buf;
2,974✔
170
  action.contLen = tlen;
2,974✔
171
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,974✔
172

173
  mndReleaseVgroup(pMnode, pVgObj);
2,974✔
174
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,974!
175
  return code;
2,974✔
176

177
END:
×
178
  taosMemoryFree(buf);
×
179
  return code;
×
180
}
181

182
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
3,729✔
183
  int32_t i = 0;
3,729✔
184
  while (key[i] != TMQ_SEPARATOR_CHAR) {
27,811✔
185
    i++;
24,082✔
186
  }
187
  (void)memcpy(cgroup, key, i);
3,729✔
188
  cgroup[i] = 0;
3,729✔
189
  if (fullName) {
3,729✔
190
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
3,319✔
191
  } else {
192
    while (key[i] != '.') {
1,230✔
193
      i++;
820✔
194
    }
195
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
410✔
196
  }
197
}
3,729✔
198

199
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,395✔
200
  int32_t code = 0;
1,395✔
201
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,395✔
202
  if (pRebInfo == NULL) {
1,395✔
203
    pRebInfo = tNewSMqRebSubscribe(key);
1,321✔
204
    if (pRebInfo == NULL) {
1,321!
205
      code = terrno;
×
206
      goto END;
×
207
    }
208
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,321✔
209
    taosMemoryFreeClear(pRebInfo);
1,321!
210
    if (code != 0) {
1,321!
211
      goto END;
×
212
    }
213
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,321✔
214
    MND_TMQ_NULL_CHECK(pRebInfo);
1,321!
215
  }
216
  if (pReb){
1,395✔
217
    *pReb = pRebInfo;
1,169✔
218
  }
219

220
END:
226✔
221
  return code;
1,395✔
222
}
223

224
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
3,432✔
225
  int32_t         code = 0;
3,432✔
226
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
3,432✔
227
  MND_TMQ_NULL_CHECK(pVgEp);
3,432!
228
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
3,432✔
229
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
3,432!
230
  mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
3,432!
231
END:
×
232
  return code;
3,432✔
233
}
234

235
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,321✔
236
  int32_t code = 0;
1,321✔
237
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,321✔
238
  int32_t actualRemoved = 0;
1,321✔
239
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,869✔
240
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
548✔
241
    MND_TMQ_NULL_CHECK(consumerId);
548!
242
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
548✔
243
    if (pConsumerEp == NULL) {
548!
244
      continue;
×
245
    }
246

247
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
548✔
248
    for (int32_t j = 0; j < consumerVgNum; j++) {
2,040✔
249
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,492!
250
    }
251

252
    taosArrayDestroy(pConsumerEp->vgs);
548✔
253
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
548!
254
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
1,096!
255
    actualRemoved++;
548✔
256
  }
257

258
  if (numOfRemoved != actualRemoved) {
1,321!
259
    mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
260
           actualRemoved);
261
  } else {
262
    mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,321!
263
  }
264
END:
×
265
  return code;
1,321✔
266
}
267

268
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,321✔
269
  int32_t code = 0;
1,321✔
270
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,321✔
271

272
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,942✔
273
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
621✔
274
    MND_TMQ_NULL_CHECK(consumerId);
621!
275
    SMqConsumerEp newConsumerEp = {0};
621✔
276
    newConsumerEp.consumerId = *consumerId;
621✔
277
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
621✔
278
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
621!
279
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
621!
280
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
1,242!
281
    mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
621!
282
  }
283
END:
1,321✔
284
  return code;
1,321✔
285
}
286

287
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,321✔
288
  int32_t code = 0;
1,321✔
289
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,321✔
290
  for (int32_t i = 0; i < numOfVgroups; i++) {
3,253✔
291
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,932!
292
  }
293
END:
1,321✔
294
  return code;
1,321✔
295
}
296

297
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,321✔
298
                                     int32_t remainderVgCnt) {
299
  int32_t code = 0;
1,321✔
300
  int32_t cnt = 0;
1,321✔
301
  void   *pIter = NULL;
1,321✔
302

303
  while (1) {
278✔
304
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,599✔
305
    if (pIter == NULL) {
1,599✔
306
      break;
1,321✔
307
    }
308

309
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
278✔
310
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
278✔
311

312
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
556!
313
    if (consumerVgNum > minVgCnt) {
278✔
314
      if (cnt < remainderVgCnt) {
6✔
315
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
2!
316
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
317
        }
318
        cnt++;
2✔
319
      } else {
320
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
12✔
321
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
8!
322
        }
323
      }
324
    }
325
  }
326
END:
1,321✔
327
  return code;
1,321✔
328
}
329

330
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,321✔
331
  int32_t code = 0;
1,321✔
332
  int32_t totalVgNum = 0;
1,321✔
333
  SVgObj *pVgroup = NULL;
1,321✔
334
  SMqVgEp *pVgEp = NULL;
1,321✔
335
  void   *pIter = NULL;
1,321✔
336
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,321✔
337
  MND_TMQ_NULL_CHECK(newVgs);
1,321!
338
  while (1) {
339
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
8,168✔
340
    if (pIter == NULL) {
8,168✔
341
      break;
1,321✔
342
    }
343

344
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
6,847✔
345
      sdbRelease(pMnode->pSdb, pVgroup);
3,366✔
346
      continue;
3,366✔
347
    }
348

349
    totalVgNum++;
3,481✔
350
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
3,481✔
351
    MND_TMQ_NULL_CHECK(pVgEp);
3,481!
352
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,481✔
353
    pVgEp->vgId = pVgroup->vgId;
3,481✔
354
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
3,481!
355
    pVgEp = NULL;
3,481✔
356
    sdbRelease(pMnode->pSdb, pVgroup);
3,481✔
357
  }
358

359
  pIter = NULL;
1,321✔
360
  while (1) {
826✔
361
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,147✔
362
    if (pIter == NULL) break;
2,147✔
363
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
826✔
364
    int32_t j = 0;
826✔
365
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
2,601✔
366
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,775✔
367
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,775!
368
      bool     find = false;
1,775✔
369
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
2,280✔
370
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
2,054✔
371
        MND_TMQ_NULL_CHECK(pnewVgEp);
2,054!
372
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
2,054✔
373
          tDeleteSMqVgEp(pnewVgEp);
1,549✔
374
          taosArrayRemove(newVgs, k);
1,549✔
375
          find = true;
1,549✔
376
          break;
1,549✔
377
        }
378
      }
379
      if (!find) {
1,775✔
380
        mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
226!
381
        tDeleteSMqVgEp(pVgEpTmp);
226✔
382
        taosArrayRemove(pConsumerEp->vgs, j);
226✔
383
        continue;
226✔
384
      }
385
      j++;
1,549✔
386
    }
387
  }
388

389
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,321✔
390
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
226!
391
    mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
226!
392
    taosArrayDestroy(newVgs);
226✔
393
  } else {
394
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
1,095✔
395
  }
396
  return totalVgNum;
1,321✔
397

398
END:
×
399
  sdbRelease(pMnode->pSdb, pVgroup);
×
400
  taosMemoryFree(pVgEp);
×
401
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
402
  return code;
×
403
}
404

405
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,321✔
406
  SMqSubscribeObj *pSub = NULL;
1,321✔
407
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,321✔
408
  if( code != 0){
1,321✔
409
    return 0;
482✔
410
  }
411
  taosRLockLatch(&pSub->lock);
839✔
412
  if (pOutput->pSub->offsetRows == NULL) {
839✔
413
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
636✔
414
    if(pOutput->pSub->offsetRows == NULL) {
636!
415
      taosRUnLockLatch(&pSub->lock);
×
416
      code = terrno;
×
417
      goto END;
×
418
    }
419
  }
420
  void *pIter = NULL;
839✔
421
  while (1) {
826✔
422
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,665✔
423
    if (pIter == NULL) break;
1,665✔
424
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
826✔
425
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
826✔
426

427
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
2,548✔
428
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,722✔
429
      MND_TMQ_NULL_CHECK(d1);
1,722!
430
      bool        jump = false;
1,722✔
431
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
2,186✔
432
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
490✔
433
        MND_TMQ_NULL_CHECK(pVgEp);
490!
434
        if (pVgEp->vgId == d1->vgId) {
490✔
435
          jump = true;
26✔
436
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
26!
437
                pConsumerEp->consumerId, pVgEp->vgId);
438
          break;
26✔
439
        }
440
      }
441
      if (jump) continue;
1,722✔
442
      bool find = false;
1,696✔
443
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
3,721✔
444
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
2,388✔
445
        MND_TMQ_NULL_CHECK(d2);
2,388!
446
        if (d1->vgId == d2->vgId) {
2,388✔
447
          d2->rows += d1->rows;
363✔
448
          d2->offset = d1->offset;
363✔
449
          d2->ever = d1->ever;
363✔
450
          find = true;
363✔
451
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
363!
452
          break;
363✔
453
        }
454
      }
455
      if (!find) {
1,696✔
456
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,666!
457
      }
458
    }
459
  }
460
  taosRUnLockLatch(&pSub->lock);
839✔
461
  mndReleaseSubscribe(pMnode, pSub);
839✔
462

463
END:
839✔
464
  return code;
839✔
465
}
466

467
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,321✔
468
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,321!
469
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
4,753✔
470
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
3,432✔
471
    if (pOutputRebVg == NULL) continue;
3,432!
472
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
3,432!
473
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
474
  }
475

476
  void *pIter = NULL;
1,321✔
477
  while (1) {
899✔
478
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,220✔
479
    if (pIter == NULL) break;
2,220✔
480
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
899✔
481
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
899✔
482
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
899!
483
          pConsumerEp->consumerId, sz);
484
    for (int32_t i = 0; i < sz; i++) {
2,966✔
485
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
2,067✔
486
      if (pVgEp == NULL) continue;
2,067!
487
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
2,067!
488
            pConsumerEp->consumerId);
489
    }
490
  }
491
}
1,321✔
492

493
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,321✔
494
                           int32_t *remainderVgCnt) {
495
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,321✔
496
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,321✔
497
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,321✔
498

499
  // calc num
500
  if (numOfFinal != 0) {
1,321✔
501
    *minVgCnt = totalVgNum / numOfFinal;
838✔
502
    *remainderVgCnt = totalVgNum % numOfFinal;
838✔
503
  } else {
504
    mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey);
483!
505
  }
506
  mInfo(
1,321!
507
      "[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
508
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
509
}
1,321✔
510

511
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,321✔
512
  SMqRebOutputVg *pRebVg = NULL;
1,321✔
513
  void           *pAssignIter = NULL;
1,321✔
514
  void           *pIter = NULL;
1,321✔
515
  int32_t         code = 0;
1,321✔
516

517
  while (1) {
899✔
518
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,220✔
519
    if (pIter == NULL) {
2,220✔
520
      break;
1,321✔
521
    }
522
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
899✔
523
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,890✔
524
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,991✔
525
      if (pAssignIter == NULL) {
1,991!
526
        mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
527
        break;
×
528
      }
529

530
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,991✔
531
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,991✔
532
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,982!
533
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,991!
534
            pConsumerEp->consumerId);
535
    }
536
  }
537

538
  while (1) {
28✔
539
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,349✔
540
    if (pIter == NULL) {
1,349✔
541
      break;
483✔
542
    }
543
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
866✔
544
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
866✔
545
      pAssignIter = taosHashIterate(pHash, pAssignIter);
865✔
546
      if (pAssignIter == NULL) {
865✔
547
        mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key);
838!
548
        break;
838✔
549
      }
550

551
      pRebVg = (SMqRebOutputVg *)pAssignIter;
27✔
552
      pRebVg->newConsumerId = pConsumerEp->consumerId;
27✔
553
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
54!
554
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
27!
555
            pConsumerEp->consumerId);
556
    }
557
  }
558

559
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,321✔
560
  if (pAssignIter != NULL) {
1,321!
561
    mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
562
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
563
    goto END;
×
564
  }
565
  while (1) {
3,432✔
566
    pAssignIter = taosHashIterate(pHash, pAssignIter);
4,753✔
567
    if (pAssignIter == NULL) {
4,753✔
568
      break;
1,321✔
569
    }
570

571
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
3,432✔
572
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
6,864!
573
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
3,432✔
574
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
2,828!
575
    }
576
  }
577

578
END:
1,321✔
579
  return code;
1,321✔
580
}
581

582
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,321✔
583
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,321✔
584
  if (totalVgNum < 0){
1,321!
585
    return totalVgNum;
×
586
  }
587
  const char *pSubKey = pOutput->pSub->key;
1,321✔
588
  int32_t     minVgCnt = 0;
1,321✔
589
  int32_t     remainderVgCnt = 0;
1,321✔
590
  int32_t     code = 0;
1,321✔
591
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,321✔
592
  MND_TMQ_NULL_CHECK(pHash);
1,321!
593
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,321!
594
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,321!
595
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,321✔
596
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,321!
597
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,321!
598
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,321!
599
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,321!
600
  printRebalanceLog(pOutput);
1,321✔
601
  taosHashCleanup(pHash);
1,321✔
602

603
END:
1,321✔
604
  return code;
1,321✔
605
}
606

607
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
3,234✔
608
  int32_t         code = 0;
3,234✔
609
  SMqConsumerObj *pConsumerNew = NULL;
3,234✔
610
  int32_t         consumerNum = taosArrayGetSize(consumers);
3,234✔
611
  for (int32_t i = 0; i < consumerNum; i++) {
4,438✔
612
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
1,204✔
613
    MND_TMQ_NULL_CHECK(consumerId);
1,204!
614
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
1,204!
615
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,204!
616
    tDeleteSMqConsumerObj(pConsumerNew);
1,204✔
617
  }
618
  pConsumerNew = NULL;
3,234✔
619

620
END:
3,234✔
621
  tDeleteSMqConsumerObj(pConsumerNew);
3,234✔
622
  return code;
3,234✔
623
}
624

625
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
1,078✔
626
  int32_t code = 0;
1,078✔
627
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
1,078!
628
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
1,078!
629
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
1,078!
630
END:
1,078✔
631
  return code;
1,078✔
632
}
633

634
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,321✔
635
  struct SSubplan *pPlan = NULL;
1,321✔
636
  int32_t          code = 0;
1,321✔
637
  STrans          *pTrans = NULL;
1,321✔
638

639
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,321✔
640
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
1,032!
641
  }
642

643
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,321✔
644
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,321✔
645
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,321✔
646

647
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,321✔
648
  if (pTrans == NULL) {
1,321!
649
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
650
    if (terrno != 0) code = terrno;
×
651
    goto END;
×
652
  }
653

654
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,321✔
655
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,321✔
656

657
  // 1. redo action: action to all vg
658
  const SArray *rebVgs = pOutput->rebVgs;
1,078✔
659
  int32_t       vgNum = taosArrayGetSize(rebVgs);
1,078✔
660
  for (int32_t i = 0; i < vgNum; i++) {
4,052✔
661
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,974✔
662
    MND_TMQ_NULL_CHECK(pRebVg);
2,974!
663
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,974!
664
  }
665

666
  // 2. commit log: subscribe and vg assignment
667
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
1,078!
668

669
  // 3. commit log: consumer to update status and epoch
670
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
1,078!
671

672
  // 4. set cb
673
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
1,078✔
674

675
  // 5. execution
676
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1,078!
677

678
END:
1,078✔
679
  nodesDestroyNode((SNode *)pPlan);
1,321✔
680
  mndTransDrop(pTrans);
1,321✔
681
  TAOS_RETURN(code);
1,321✔
682
}
683

684
static void freeRebalanceItem(void *param) {
1,321✔
685
  SMqRebInfo *pInfo = param;
1,321✔
686
  taosArrayDestroy(pInfo->newConsumers);
1,321✔
687
  taosArrayDestroy(pInfo->removedConsumers);
1,321✔
688
}
1,321✔
689

690
// type = 0 remove  type = 1 add
691
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) {
2,090✔
692
  int32_t code = 0;
2,090✔
693
  int32_t topicNum = taosArrayGetSize(topicList);
2,090✔
694
  for (int32_t i = 0; i < topicNum; i++) {
3,259✔
695
    char *removedTopic = taosArrayGetP(topicList, i);
1,169✔
696
    MND_TMQ_NULL_CHECK(removedTopic);
1,169!
697
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,169✔
698
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", group, TMQ_SEPARATOR, removedTopic);
1,169✔
699
    SMqRebInfo *pRebSub = NULL;
1,169✔
700
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
1,169!
701
    if (type == 0)
1,169✔
702
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &consumerId));
1,096!
703
    else if (type == 1)
621!
704
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &consumerId));
1,242!
705
  }
706

707
END:
2,090✔
708
  return code;
2,090✔
709
}
710

711
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,502✔
712
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,502✔
713
  for (int32_t i = 0; i < newTopicNum; i++) {
5,186✔
714
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,684✔
715
    if (topic == NULL){
2,684!
716
      continue;
×
717
    }
718
    SMqSubscribeObj *pSub = NULL;
2,684✔
719
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,684✔
720
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,684✔
721
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,684✔
722
    if (code != 0) {
2,684!
723
      continue;
×
724
    }
725
    taosRLockLatch(&pSub->lock);
2,684✔
726

727
    // iterate all vg assigned to the consumer of that topic
728
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,684✔
729
    if (pConsumerEp == NULL){
2,684!
730
      taosRUnLockLatch(&pSub->lock);
×
731
      mndReleaseSubscribe(pMnode, pSub);
×
732
      continue;
×
733
    }
734
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,684✔
735
    for (int32_t j = 0; j < vgNum; j++) {
8,157✔
736
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
5,473✔
737
      if (pVgEp == NULL) {
5,473!
738
        continue;
×
739
      }
740
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
5,473✔
741
      if (!pVgroup) {
5,473✔
742
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
226✔
743
        if (code != 0){
226!
744
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
745
        }else{
746
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
226!
747
        }
748
      }
749
      mndReleaseVgroup(pMnode, pVgroup);
5,473✔
750
    }
751
    taosRUnLockLatch(&pSub->lock);
2,684✔
752
    mndReleaseSubscribe(pMnode, pSub);
2,684✔
753
  }
754
}
2,502✔
755

756
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
32,310✔
757
  SMnode         *pMnode = pMsg->info.node;
32,310✔
758
  SSdb           *pSdb = pMnode->pSdb;
32,310✔
759
  SMqConsumerObj *pConsumer = NULL;
32,310✔
760
  void           *pIter = NULL;
32,310✔
761
  int32_t         code = 0;
32,310✔
762

763
  // iterate all consumers, find all modification
764
  while (1) {
3,966✔
765
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
36,276✔
766
    if (pIter == NULL) {
36,276✔
767
      break;
32,310✔
768
    }
769

770
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,966✔
771
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,966✔
772
    int32_t status = atomic_load_32(&pConsumer->status);
3,966✔
773

774
    mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,966!
775
           ", hbstatus:%d, pollStatus:%d",
776
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
777
           pConsumer->createTime, hbStatus, pollStatus);
778

779
    if (status == MQ_CONSUMER_STATUS_READY) {
3,966✔
780
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
2,925✔
781
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
415!
782
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,510✔
783
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,502!
784
        taosRLockLatch(&pConsumer->lock);
8✔
785
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
8!
786
        taosRUnLockLatch(&pConsumer->lock);
8✔
787
      } else {
788
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,502✔
789
      }
790
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
1,041!
791
      taosRLockLatch(&pConsumer->lock);
1,041✔
792
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
1,041!
793
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
1,041!
794
      taosRUnLockLatch(&pConsumer->lock);
1,041✔
795
    } else {
796
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
797
    }
798

799
    mndReleaseConsumer(pMnode, pConsumer);
3,966✔
800
  }
801
END:
32,310✔
802
  return code;
32,310✔
803
}
804

805
bool mndRebTryStart() {
32,310✔
806
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
32,310✔
807
  if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0;
32,310!
808
}
809

810
void mndRebCntInc() {
1,088✔
811
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
1,088✔
812
  if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
1,088!
813
}
1,088✔
814

815
void mndRebCntDec() {
33,398✔
816
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
33,398✔
817
  if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
33,398!
818
}
33,398✔
819

820
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,321✔
821
  taosArrayDestroy(rebOutput->newConsumers);
1,321✔
822
  taosArrayDestroy(rebOutput->modifyConsumers);
1,321✔
823
  taosArrayDestroy(rebOutput->removedConsumers);
1,321✔
824
  taosArrayDestroy(rebOutput->rebVgs);
1,321✔
825
  tDeleteSubscribeObj(rebOutput->pSub);
1,321✔
826
  taosMemoryFree(rebOutput->pSub);
1,321✔
827
}
1,321✔
828

829
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,321✔
830
  int32_t code = 0;
1,321✔
831
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,321✔
832
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,321!
833
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,321✔
834
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,321!
835
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,321✔
836
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,321!
837
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,321✔
838
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,321!
839
  return code;
1,321✔
840

841
END:
×
842
  clearRebOutput(rebOutput);
×
843
  return code;
×
844
}
845

846
// This function only works when there are dirty consumers
847
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
839✔
848
  int32_t code = 0;
839✔
849
  void   *pIter = NULL;
839✔
850
  while (1) {
826✔
851
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,665✔
852
    if (pIter == NULL) {
1,665✔
853
      break;
839✔
854
    }
855

856
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
826✔
857
    SMqConsumerObj *pConsumer = NULL;
826✔
858
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
826✔
859
    if (code == 0) {
826!
860
      mndReleaseConsumer(pMnode, pConsumer);
826✔
861
      continue;
826✔
862
    }
863
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
864
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
865

866
    taosArrayDestroy(pConsumerEp->vgs);
×
867
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
868
  }
869
END:
839✔
870
  return code;
839✔
871
}
872

873
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,321✔
874
  const char      *key = rebInput->pRebInfo->key;
1,321✔
875
  SMqSubscribeObj *pSub = NULL;
1,321✔
876
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,321✔
877

878
  if (code != 0) {
1,321✔
879
    // split sub key and extract topic
880
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
482✔
881
    char cgroup[TSDB_CGROUP_LEN] = {0};
482✔
882
    mndSplitSubscribeKey(key, topic, cgroup, true);
482✔
883
    SMqTopicObj *pTopic = NULL;
482✔
884
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
482!
885
    taosRLockLatch(&pTopic->lock);
482✔
886

887
    rebInput->oldConsumerNum = 0;
482✔
888
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
482✔
889
    if (code != 0) {
482!
890
      mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
891
      taosRUnLockLatch(&pTopic->lock);
×
892
      mndReleaseTopic(pMnode, pTopic);
×
893
      return code;
×
894
    }
895

896
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
482✔
897
    taosRUnLockLatch(&pTopic->lock);
482✔
898
    mndReleaseTopic(pMnode, pTopic);
482✔
899

900
    mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
482!
901
  } else {
902
    taosRLockLatch(&pSub->lock);
839✔
903
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
839✔
904
    if(code != 0){
839!
905
      taosRUnLockLatch(&pSub->lock);
×
906
      goto END;
×
907
    }
908
    code = checkConsumer(pMnode, rebOutput->pSub);
839✔
909
    if(code != 0){
839!
910
      taosRUnLockLatch(&pSub->lock);
×
911
      goto END;
×
912
    }
913
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
839✔
914
    taosRUnLockLatch(&pSub->lock);
839✔
915

916
    mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
839!
917
    mndReleaseSubscribe(pMnode, pSub);
839✔
918
  }
919

920
END:
1,321✔
921
  return code;
1,321✔
922
}
923

924
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
32,310✔
925
  int     code = 0;
32,310✔
926
  void   *pIter = NULL;
32,310✔
927
  SMnode *pMnode = pMsg->info.node;
32,310✔
928
  mDebug("[rebalance] start to process mq timer");
32,310✔
929
  if (!mndRebTryStart()) {
32,310!
UNCOV
930
    mInfo("[rebalance] mq rebalance already in progress, do nothing");
×
UNCOV
931
    return code;
×
932
  }
933

934
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
32,310✔
935
  MND_TMQ_NULL_CHECK(rebSubHash);
32,310!
936

937
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
32,310✔
938

939
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
32,310!
940
  if (taosHashGetSize(rebSubHash) > 0) {
32,310✔
941
    mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
1,111!
942
  }
943

944
  while (1) {
1,321✔
945
    pIter = taosHashIterate(rebSubHash, pIter);
33,631✔
946
    if (pIter == NULL) {
33,631✔
947
      break;
32,310✔
948
    }
949

950
    SMqRebInputObj  rebInput = {0};
1,321✔
951
    SMqRebOutputObj rebOutput = {0};
1,321✔
952
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,321!
953
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,321✔
954
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,321✔
955
    if (code != 0) {
1,321!
956
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
957
    }
958

959
    if (code == 0){
1,321!
960
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,321✔
961
      if (code != 0) {
1,321!
962
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
963
      }
964
    }
965

966
    if (code == 0){
1,321!
967
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,321✔
968
      if (code != 0) {
1,321✔
969
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
243!
970
      }
971
    }
972

973
    clearRebOutput(&rebOutput);
1,321✔
974
  }
975

976
  if (taosHashGetSize(rebSubHash) > 0) {
32,310✔
977
    mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
1,111!
978
  }
979

980
END:
31,199✔
981
  taosHashCancelIterate(rebSubHash, pIter);
32,310✔
982
  taosHashCleanup(rebSubHash);
32,310✔
983
  mndRebCntDec();
32,310✔
984

985
  TAOS_RETURN(code);
32,310✔
986
}
987

988
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
246✔
989
  void   *pIter = NULL;
246✔
990
  SVgObj *pVgObj = NULL;
246✔
991
  int32_t code = 0;
246✔
992
  while (1) {
1,646✔
993
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
1,892✔
994
    if (pIter == NULL) {
1,892✔
995
      break;
246✔
996
    }
997

998
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,646✔
999
      sdbRelease(pMnode->pSdb, pVgObj);
858✔
1000
      continue;
858✔
1001
    }
1002
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
788✔
1003
    MND_TMQ_NULL_CHECK(pReq);
788!
1004
    pReq->head.vgId = htonl(pVgObj->vgId);
788✔
1005
    pReq->vgId = pVgObj->vgId;
788✔
1006
    pReq->consumerId = -1;
788✔
1007
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
788✔
1008

1009
    STransAction action = {0};
788✔
1010
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
788✔
1011
    action.pCont = pReq;
788✔
1012
    action.contLen = sizeof(SMqVDeleteReq);
788✔
1013
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
788✔
1014
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
788✔
1015

1016
    sdbRelease(pMnode->pSdb, pVgObj);
788✔
1017
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
788!
1018
  }
1019

1020
END:
246✔
1021
  sdbRelease(pMnode->pSdb, pVgObj);
246✔
1022
  sdbCancelFetch(pMnode->pSdb, pIter);
246✔
1023
  return code;
246✔
1024
}
1025

1026
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
5✔
1027
  void           *pIter = NULL;
5✔
1028
  SMqConsumerObj *pConsumer = NULL;
5✔
1029
  int             code = 0;
5✔
1030
  while (1) {
1✔
1031
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
6✔
1032
    if (pIter == NULL) {
6✔
1033
      break;
5✔
1034
    }
1035

1036
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
1!
1037
      sdbRelease(pMnode->pSdb, pConsumer);
×
1038
      continue;
×
1039
    }
1040

1041
    bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1042
    if (found){
1!
1043
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1044
             topic, pConsumer->consumerId, pConsumer->cgroup);
1045
      code = TSDB_CODE_MND_CGROUP_USED;
×
1046
      goto END;
×
1047
    }
1048

1049
    sdbRelease(pMnode->pSdb, pConsumer);
1✔
1050
  }
1051

1052
END:
5✔
1053
  sdbRelease(pMnode->pSdb, pConsumer);
5✔
1054
  sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1055
  return code;
5✔
1056
}
1057

1058
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
5✔
1059
  SMnode         *pMnode = pMsg->info.node;
5✔
1060
  SMDropCgroupReq dropReq = {0};
5✔
1061
  STrans         *pTrans = NULL;
5✔
1062
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
5✔
1063
  SMqSubscribeObj *pSub = NULL;
5✔
1064

1065
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
5!
1066
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
5✔
1067
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
5✔
1068
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
5✔
1069
  if (code != 0) {
5!
1070
    if (dropReq.igNotExists) {
×
1071
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1072
      return 0;
×
1073
    } else {
1074
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1075
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1076
      return code;
×
1077
    }
1078
  }
1079

1080
  taosWLockLatch(&pSub->lock);
5✔
1081
  if (taosHashGetSize(pSub->consumerHash) != 0) {
5!
1082
    code = TSDB_CODE_MND_CGROUP_USED;
×
1083
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1084
    goto END;
×
1085
  }
1086

1087
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
5✔
1088
  MND_TMQ_NULL_CHECK(pTrans);
5!
1089
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
5!
1090
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
5✔
1091
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
5!
1092
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
5!
1093
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
5!
1094
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
5!
1095
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
5!
1096

1097
END:
5✔
1098
  taosWUnLockLatch(&pSub->lock);
5✔
1099
  mndReleaseSubscribe(pMnode, pSub);
5✔
1100
  mndTransDrop(pTrans);
5✔
1101

1102
  if (code != 0) {
5!
1103
    mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
×
1104
    TAOS_RETURN(code);
×
1105
  }
1106
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
5✔
1107
}
1108

1109
void mndCleanupSubscribe(SMnode *pMnode) {}
1,995✔
1110

1111
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,729✔
1112
  int32_t code = 0;
1,729✔
1113
  int32_t lino = 0;
1,729✔
1114
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,729✔
1115
  void   *buf = NULL;
1,729✔
1116
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,729✔
1117
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,729!
1118
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,729✔
1119

1120
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,729✔
1121
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,729!
1122

1123
  buf = taosMemoryMalloc(tlen);
1,729✔
1124
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,729!
1125

1126
  void *abuf = buf;
1,729✔
1127
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,729!
1128
    goto SUB_ENCODE_OVER;
×
1129
  }
1130

1131
  int32_t dataPos = 0;
1,729✔
1132
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,729!
1133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,729!
1134
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,729!
1135
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,729!
1136

1137
  terrno = TSDB_CODE_SUCCESS;
1,729✔
1138

1139
SUB_ENCODE_OVER:
1,729✔
1140
  taosMemoryFreeClear(buf);
1,729!
1141
  if (terrno != TSDB_CODE_SUCCESS) {
1,729!
1142
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1143
    sdbFreeRaw(pRaw);
×
1144
    return NULL;
×
1145
  }
1146

1147
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,729✔
1148
  return pRaw;
1,729✔
1149
}
1150

1151
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,355✔
1152
  int32_t code = 0;
1,355✔
1153
  int32_t lino = 0;
1,355✔
1154
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,355✔
1155
  SSdbRow         *pRow = NULL;
1,355✔
1156
  SMqSubscribeObj *pSub = NULL;
1,355✔
1157
  void            *buf = NULL;
1,355✔
1158

1159
  int8_t sver = 0;
1,355✔
1160
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,355!
1161

1162
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,355!
1163
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1164
    goto SUB_DECODE_OVER;
×
1165
  }
1166

1167
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,355✔
1168
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,355!
1169

1170
  pSub = sdbGetRowObj(pRow);
1,355✔
1171
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,355!
1172

1173
  int32_t dataPos = 0;
1,355✔
1174
  int32_t tlen;
1175
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,355!
1176
  buf = taosMemoryMalloc(tlen);
1,355✔
1177
  if (buf == NULL) goto SUB_DECODE_OVER;
1,355!
1178
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,355!
1179
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,355!
1180

1181
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,355!
1182
    goto SUB_DECODE_OVER;
×
1183
  }
1184

1185
  // update epset saved in mnode
1186
  if (pSub->unassignedVgs != NULL) {
1,355!
1187
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,355✔
1188
    for (int32_t i = 0; i < size; ++i) {
3,541✔
1189
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
2,186✔
1190
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
2,186✔
1191
    }
1192
  }
1193
  if (pSub->consumerHash != NULL) {
1,355!
1194
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,355✔
1195
    while (pIter) {
2,064✔
1196
      SMqConsumerEp *pConsumerEp = pIter;
709✔
1197
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
709✔
1198
      for (int32_t i = 0; i < size; ++i) {
2,393✔
1199
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,684✔
1200
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,684✔
1201
      }
1202
      pIter = taosHashIterate(pSub->consumerHash, pIter);
709✔
1203
    }
1204
  }
1205

1206
  terrno = TSDB_CODE_SUCCESS;
1,355✔
1207

1208
SUB_DECODE_OVER:
1,355✔
1209
  taosMemoryFreeClear(buf);
1,355!
1210
  if (terrno != TSDB_CODE_SUCCESS) {
1,355!
1211
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1212
    taosMemoryFreeClear(pRow);
×
1213
    return NULL;
×
1214
  }
1215

1216
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,355✔
1217
  return pRow;
1,355✔
1218
}
1219

1220
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
509✔
1221
  mTrace("subscribe:%s, perform insert action", pSub->key);
509✔
1222
  return 0;
509✔
1223
}
1224

1225
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,355✔
1226
  mTrace("subscribe:%s, perform delete action", pSub->key);
1,355✔
1227
  tDeleteSubscribeObj(pSub);
1,355✔
1228
  return 0;
1,355✔
1229
}
1230

1231
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
600✔
1232
  mTrace("subscribe:%s, perform update action", pOldSub->key);
600✔
1233
  taosWLockLatch(&pOldSub->lock);
600✔
1234

1235
  SHashObj *tmp = pOldSub->consumerHash;
600✔
1236
  pOldSub->consumerHash = pNewSub->consumerHash;
600✔
1237
  pNewSub->consumerHash = tmp;
600✔
1238

1239
  SArray *tmp1 = pOldSub->unassignedVgs;
600✔
1240
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
600✔
1241
  pNewSub->unassignedVgs = tmp1;
600✔
1242

1243
  SArray *tmp2 = pOldSub->offsetRows;
600✔
1244
  pOldSub->offsetRows = pNewSub->offsetRows;
600✔
1245
  pNewSub->offsetRows = tmp2;
600✔
1246

1247
  taosWUnLockLatch(&pOldSub->lock);
600✔
1248
  return 0;
600✔
1249
}
1250

1251
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
11,239✔
1252
  SSdb            *pSdb = pMnode->pSdb;
11,239✔
1253
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
11,239✔
1254
  if (*pSub == NULL) {
11,239✔
1255
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
964✔
1256
  }
1257
  return 0;
10,275✔
1258
}
1259

1260
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
650✔
1261
  int32_t num = 0;
650✔
1262
  SSdb   *pSdb = pMnode->pSdb;
650✔
1263

1264
  void            *pIter = NULL;
650✔
1265
  SMqSubscribeObj *pSub = NULL;
650✔
1266
  while (1) {
1,049✔
1267
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
1,699✔
1268
    if (pIter == NULL) break;
1,699✔
1269

1270
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,049✔
1271
    char cgroup[TSDB_CGROUP_LEN] = {0};
1,049✔
1272
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
1,049✔
1273
    if (strcmp(topic, topicName) != 0) {
1,049✔
1274
      sdbRelease(pSdb, pSub);
743✔
1275
      continue;
743✔
1276
    }
1277

1278
    num++;
306✔
1279
    sdbRelease(pSdb, pSub);
306✔
1280
  }
1281

1282
  return num;
650✔
1283
}
1284

1285
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
10,275✔
1286
  SSdb *pSdb = pMnode->pSdb;
10,275✔
1287
  sdbRelease(pSdb, pSub);
10,275✔
1288
}
10,275✔
1289

1290
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
246✔
1291
  int32_t  code = 0;
246✔
1292
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
246✔
1293
  MND_TMQ_NULL_CHECK(pCommitRaw);
246!
1294
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
246✔
1295
  if (code != 0){
246!
1296
    sdbFreeRaw(pCommitRaw);
×
1297
    goto END;
×
1298
  }
1299
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
246✔
1300
END:
246✔
1301
  return code;
246✔
1302
}
1303

1304
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
331✔
1305
  SSdb            *pSdb = pMnode->pSdb;
331✔
1306
  int32_t          code = 0;
331✔
1307
  void            *pIter = NULL;
331✔
1308
  SMqSubscribeObj *pSub = NULL;
331✔
1309
  while (1) {
467✔
1310
    sdbRelease(pSdb, pSub);
798✔
1311
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
798✔
1312
    if (pIter == NULL) break;
798✔
1313

1314
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
467✔
1315
    char cgroup[TSDB_CGROUP_LEN] = {0};
467✔
1316
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
467✔
1317
    if (strcmp(topic, topicName) != 0) {
467✔
1318
      continue;
226✔
1319
    }
1320

1321
    // iter all vnode to delete handle
1322
    if (taosHashGetSize(pSub->consumerHash) != 0) {
241!
1323
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1324
      goto END;
×
1325
    }
1326

1327
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
241!
1328
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
241!
1329
  }
1330

1331
END:
331✔
1332
  sdbRelease(pSdb, pSub);
331✔
1333
  sdbCancelFetch(pSdb, pIter);
331✔
1334

1335
  TAOS_RETURN(code);
331✔
1336
}
1337

1338
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
749✔
1339
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1340
  int32_t code = 0;
749✔
1341
  int32_t sz = taosArrayGetSize(vgs);
749✔
1342
  for (int32_t j = 0; j < sz; j++) {
1,351✔
1343
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
602✔
1344
    MND_TMQ_NULL_CHECK(pVgEp);
602!
1345

1346
    SColumnInfoData *pColInfo = NULL;
602✔
1347
    int32_t          cols = 0;
602✔
1348

1349
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1350
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1351
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
602!
1352

1353
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1354
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1355
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
602!
1356

1357
    // vg id
1358
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1359
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1360
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
602!
1361

1362
    // consumer id
1363
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
602✔
1364
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
602✔
1365
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
602✔
1366

1367
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1368
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1369
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
602!
1370

1371
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
602✔
1372
    if (user) STR_TO_VARSTR(userStr, user);
602✔
1373
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1374
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1375
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
602!
1376

1377
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
602✔
1378
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
602✔
1379
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
602✔
1380
    MND_TMQ_NULL_CHECK(pColInfo);
602!
1381
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
602!
1382

1383
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
602!
1384
          varDataVal(cgroup), pVgEp->vgId);
1385

1386
    // offset
1387
    OffsetRows *data = NULL;
602✔
1388
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,636✔
1389
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
1,034✔
1390
      MND_TMQ_NULL_CHECK(tmp);
1,034!
1391
      if (tmp->vgId != pVgEp->vgId) {
1,034✔
1392
        mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
688!
1393
        continue;
688✔
1394
      }
1395
      data = tmp;
346✔
1396
    }
1397
    if (data) {
602✔
1398
      // vg id
1399
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
346✔
1400
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
346✔
1401
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
346✔
1402
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
346✔
1403
      varDataSetLen(buf, strlen(varDataVal(buf)));
346✔
1404
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
346✔
1405
      MND_TMQ_NULL_CHECK(pColInfo);
346!
1406
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
346!
1407
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
346✔
1408
      MND_TMQ_NULL_CHECK(pColInfo);
346!
1409
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
346!
1410
    } else {
1411
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
256✔
1412
      MND_TMQ_NULL_CHECK(pColInfo);
256!
1413
      colDataSetNULL(pColInfo, *numOfRows);
256!
1414
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
256✔
1415
      MND_TMQ_NULL_CHECK(pColInfo);
256!
1416
      colDataSetNULL(pColInfo, *numOfRows);
256!
1417
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
256!
1418
    }
1419
    (*numOfRows)++;
602✔
1420
  }
1421
  return 0;
749✔
1422
END:
×
1423
  return code;
×
1424
}
1425

1426
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
5,858✔
1427
  SMnode          *pMnode = pReq->info.node;
5,858✔
1428
  SSdb            *pSdb = pMnode->pSdb;
5,858✔
1429
  int32_t          numOfRows = 0;
5,858✔
1430
  SMqSubscribeObj *pSub = NULL;
5,858✔
1431
  int32_t          code = 0;
5,858✔
1432

1433
  mInfo("mnd show subscriptions begin");
5,858!
1434

1435
  while (numOfRows < rowsCapacity) {
6,270!
1436
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
6,270✔
1437
    if (pShow->pIter == NULL) {
6,269✔
1438
      break;
5,859✔
1439
    }
1440

1441
    taosRLockLatch(&pSub->lock);
410✔
1442

1443
    if (numOfRows + pSub->vgNum > rowsCapacity) {
410!
1444
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1445
    }
1446

1447
    // topic and cgroup
1448
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
410✔
1449
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
410✔
1450
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
410✔
1451
    varDataSetLen(topic, strlen(varDataVal(topic)));
410✔
1452
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
410✔
1453

1454
    SMqConsumerEp *pConsumerEp = NULL;
410✔
1455
    void          *pIter = NULL;
410✔
1456

1457
    while (1) {
339✔
1458
      pIter = taosHashIterate(pSub->consumerHash, pIter);
749✔
1459
      if (pIter == NULL) break;
749✔
1460
      pConsumerEp = (SMqConsumerEp *)pIter;
339✔
1461

1462
      char          *user = NULL;
339✔
1463
      char          *fqdn = NULL;
339✔
1464
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
339✔
1465
      if (pConsumer != NULL) {
339!
1466
        user = pConsumer->user;
339✔
1467
        fqdn = pConsumer->fqdn;
339✔
1468
        sdbRelease(pSdb, pConsumer);
339✔
1469
      }
1470
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
339!
1471
                  pConsumerEp->offsetRows));
1472
    }
1473

1474
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
410!
1475

1476
    pBlock->info.rows = numOfRows;
410✔
1477

1478
    taosRUnLockLatch(&pSub->lock);
410✔
1479
    sdbRelease(pSdb, pSub);
410✔
1480
  }
1481

1482
  mInfo("mnd end show subscriptions");
5,859!
1483

1484
  pShow->numOfRows += numOfRows;
5,861✔
1485
  return numOfRows;
5,861✔
1486

1487
END:
×
1488
  return code;
×
1489
}
1490

1491
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1492
  SSdb *pSdb = pMnode->pSdb;
×
1493
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1494
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc