• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4488

12 Jul 2025 07:47AM UTC coverage: 62.207% (-0.7%) from 62.948%
#4488

push

travis-ci

web-flow
docs: update stream docs (#31822)

157961 of 324087 branches covered (48.74%)

Branch coverage included in aggregate %.

244465 of 322830 relevant lines covered (75.73%)

6561668.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.48
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
1,038✔
46
  if (pTrans == NULL || pSub == NULL) {
1,038!
47
    return TSDB_CODE_INVALID_PARA;
×
48
  }
49
  int32_t  code = 0;
1,038✔
50
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
1,038✔
51
  MND_TMQ_NULL_CHECK(pCommitRaw);
1,038!
52
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
1,038✔
53
  if (code != 0) {
1,038!
54
    sdbFreeRaw(pCommitRaw);
×
55
    goto END;
×
56
  }
57
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
1,038✔
58

59
END:
1,038✔
60
  return code;
1,038✔
61
}
62

63
int32_t mndInitSubscribe(SMnode *pMnode) {
2,467✔
64
  SSdbTable table = {
2,467✔
65
      .sdbType = SDB_SUBSCRIBE,
66
      .keyType = SDB_KEY_BINARY,
67
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
68
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
69
      .insertFp = (SdbInsertFp)mndSubActionInsert,
70
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
71
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
72
  };
73

74
  if (pMnode == NULL) {
2,467!
75
    return TSDB_CODE_INVALID_PARA;
×
76
  }
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
2,467✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
2,467✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
2,467✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
2,467✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
2,467✔
82

83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
2,467✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
2,467✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
2,467✔
87
}
88

89
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
508✔
90
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
508!
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
93
  int32_t code = 0;
508✔
94
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
508!
95
  (*pSub)->dbUid = pTopic->dbUid;
508✔
96
  (*pSub)->stbUid = pTopic->stbUid;
508✔
97
  (*pSub)->subType = pTopic->subType;
508✔
98
  (*pSub)->withMeta = pTopic->withMeta;
508✔
99

100
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
508!
101
  return code;
508✔
102

103
END:
×
104
  tDeleteSubscribeObj(*pSub);
×
105
  taosMemoryFree(*pSub);
×
106
  return code;
×
107
}
108

109
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,759✔
110
                                    SSubplan *pPlan) {
111
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
2,759!
112
    return TSDB_CODE_INVALID_PARA;
×
113
  }
114
  SMqRebVgReq req = {0};
2,759✔
115
  int32_t     code = 0;
2,759✔
116
  SEncoder encoder = {0};
2,759✔
117

118
  req.oldConsumerId = pRebVg->oldConsumerId;
2,759✔
119
  req.newConsumerId = pRebVg->newConsumerId;
2,759✔
120
  req.vgId = pRebVg->pVgEp->vgId;
2,759✔
121
  if (pPlan) {
2,759✔
122
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
2,288✔
123
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
2,288✔
124
    int32_t msgLen = 0;
2,288✔
125
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
2,288!
126
  } else {
127
    req.qmsg = taosStrdup("");
471!
128
    MND_TMQ_NULL_CHECK(req.qmsg);
471!
129
  }
130
  req.subType = pSub->subType;
2,759✔
131
  req.withMeta = pSub->withMeta;
2,759✔
132
  req.suid = pSub->stbUid;
2,759✔
133
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,759✔
134

135
  int32_t tlen = 0;
2,759✔
136
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,759!
137
  if (code < 0) {
2,759!
138
    goto END;
×
139
  }
140

141
  tlen += sizeof(SMsgHead);
2,759✔
142
  void *buf = taosMemoryMalloc(tlen);
2,759!
143
  MND_TMQ_NULL_CHECK(buf);
2,759!
144
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,759✔
145
  pMsgHead->contLen = htonl(tlen);
2,759✔
146
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,759✔
147

148
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,759✔
149
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,759!
150
  *pBuf = buf;
2,759✔
151
  *pLen = tlen;
2,759✔
152

153
END:
2,759✔
154
  tEncoderClear(&encoder);
2,759✔
155
  taosMemoryFree(req.qmsg);
2,759!
156
  return code;
2,759✔
157
}
158

159
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,761✔
160
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
161
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
2,761!
162
    return TSDB_CODE_INVALID_PARA;
×
163
  }
164
  int32_t code = 0;
2,761✔
165
  void   *buf  = NULL;
2,761✔
166

167
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,761✔
168
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
2!
169
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
170
    goto END;
×
171
  }
172

173
  int32_t tlen = 0;
2,759✔
174
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,759!
175
  int32_t vgId = pRebVg->pVgEp->vgId;
2,759✔
176
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,759✔
177
  if (pVgObj == NULL) {
2,759!
178
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
179
    goto END;
×
180
  }
181

182
  STransAction action = {0};
2,759✔
183
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,759✔
184
  action.pCont = buf;
2,759✔
185
  action.contLen = tlen;
2,759✔
186
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,759✔
187

188
  mndReleaseVgroup(pMnode, pVgObj);
2,759✔
189
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,759!
190
  return code;
2,759✔
191

192
END:
×
193
  taosMemoryFree(buf);
×
194
  return code;
×
195
}
196

197
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
3,600✔
198
  if (key == NULL || topic == NULL || cgroup == NULL) {
3,600!
199
    return;
×
200
  }
201
  int32_t i = 0;
3,600✔
202
  while (key[i] != TMQ_SEPARATOR_CHAR) {
26,139✔
203
    i++;
22,539✔
204
  }
205
  (void)memcpy(cgroup, key, i);
3,600✔
206
  cgroup[i] = 0;
3,600✔
207
  if (fullName) {
3,600✔
208
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
3,348✔
209
  } else {
210
    while (key[i] != '.') {
756✔
211
      i++;
504✔
212
    }
213
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
252✔
214
  }
215
}
216

217
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,410✔
218
  if (pHash == NULL || key == NULL) {
1,410!
219
    return TSDB_CODE_INVALID_PARA;
×
220
  }
221
  int32_t code = 0;
1,410✔
222
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,410✔
223
  if (pRebInfo == NULL) {
1,410✔
224
    pRebInfo = tNewSMqRebSubscribe(key);
1,341✔
225
    if (pRebInfo == NULL) {
1,341!
226
      code = terrno;
×
227
      goto END;
×
228
    }
229
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,341✔
230
    taosMemoryFreeClear(pRebInfo);
1,341!
231
    if (code != 0) {
1,341!
232
      goto END;
×
233
    }
234
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,341✔
235
    MND_TMQ_NULL_CHECK(pRebInfo);
1,341!
236
  }
237
  if (pReb){
1,410✔
238
    *pReb = pRebInfo;
1,162✔
239
  }
240

241
END:
248✔
242
  return code;
1,410✔
243
}
244

245
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
3,324✔
246
  if (vgs == NULL || pHash == NULL || key == NULL) {
3,324!
247
    return TSDB_CODE_INVALID_PARA;
×
248
  }
249
  int32_t         code = 0;
3,324✔
250
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
3,324✔
251
  MND_TMQ_NULL_CHECK(pVgEp);
3,324!
252
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
3,324✔
253
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
3,324!
254
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
3,324!
255
END:
×
256
  return code;
3,324✔
257
}
258

259
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,341✔
260
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
1,341!
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263
  int32_t code = 0;
1,341✔
264
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,341✔
265
  int32_t actualRemoved = 0;
1,341✔
266
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,893✔
267
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
552✔
268
    MND_TMQ_NULL_CHECK(consumerId);
552!
269
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
552✔
270
    if (pConsumerEp == NULL) {
552!
271
      continue;
×
272
    }
273

274
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
552✔
275
    for (int32_t j = 0; j < consumerVgNum; j++) {
1,924✔
276
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,372!
277
    }
278

279
    taosArrayDestroy(pConsumerEp->vgs);
552✔
280
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
552!
281
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
1,104!
282
    actualRemoved++;
552✔
283
  }
284

285
  if (numOfRemoved != actualRemoved) {
1,341!
286
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
287
           actualRemoved);
288
  } else {
289
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,341!
290
  }
291
END:
×
292
  return code;
1,341✔
293
}
294

295
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,341✔
296
  if (pOutput == NULL || pInput == NULL) {
1,341!
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
299
  int32_t code = 0;
1,341✔
300
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,341✔
301

302
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,951✔
303
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
610✔
304
    MND_TMQ_NULL_CHECK(consumerId);
610!
305
    SMqConsumerEp newConsumerEp = {0};
610✔
306
    newConsumerEp.consumerId = *consumerId;
610✔
307
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
610✔
308
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
610!
309
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
610!
310
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
1,220!
311
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
610!
312
  }
313
END:
1,341✔
314
  return code;
1,341✔
315
}
316

317
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,341✔
318
  if (pOutput == NULL || pHash == NULL) {
1,341!
319
    return TSDB_CODE_INVALID_PARA;
×
320
  }
321
  int32_t code = 0;
1,341✔
322
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,341✔
323
  for (int32_t i = 0; i < numOfVgroups; i++) {
3,277✔
324
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,936!
325
  }
326
END:
1,341✔
327
  return code;
1,341✔
328
}
329

330
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,341✔
331
                                     int32_t remainderVgCnt) {
332
  if (pOutput == NULL || pHash == NULL) {
1,341!
333
    return TSDB_CODE_INVALID_PARA;
×
334
  }
335
  int32_t code = 0;
1,341✔
336
  int32_t cnt = 0;
1,341✔
337
  void   *pIter = NULL;
1,341✔
338

339
  while (1) {
309✔
340
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,650✔
341
    if (pIter == NULL) {
1,650✔
342
      break;
1,341✔
343
    }
344

345
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
309✔
346
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
309✔
347

348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
618!
349
    if (consumerVgNum > minVgCnt) {
309✔
350
      if (cnt < remainderVgCnt) {
11✔
351
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
3!
352
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
353
        }
354
        cnt++;
3✔
355
      } else {
356
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
24✔
357
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
16!
358
        }
359
      }
360
    }
361
  }
362
END:
1,341✔
363
  return code;
1,341✔
364
}
365

366
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,341✔
367
  if (pMnode == NULL || pOutput == NULL) {
1,341!
368
    return TSDB_CODE_INVALID_PARA;
×
369
  }
370
  int32_t code = 0;
1,341✔
371
  int32_t totalVgNum = 0;
1,341✔
372
  SVgObj *pVgroup = NULL;
1,341✔
373
  SMqVgEp *pVgEp = NULL;
1,341✔
374
  void   *pIter = NULL;
1,341✔
375
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,341✔
376
  MND_TMQ_NULL_CHECK(newVgs);
1,341!
377
  while (1) {
378
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
8,139✔
379
    if (pIter == NULL) {
8,139✔
380
      break;
1,341✔
381
    }
382
    if (pVgroup->mountVgId) {
6,798!
383
      sdbRelease(pMnode->pSdb, pVgroup);
×
384
      continue;
×
385
    }
386

387
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
6,798✔
388
      sdbRelease(pMnode->pSdb, pVgroup);
3,410✔
389
      continue;
3,410✔
390
    }
391

392
    totalVgNum++;
3,388✔
393
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
3,388!
394
    MND_TMQ_NULL_CHECK(pVgEp);
3,388!
395
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,388✔
396
    pVgEp->vgId = pVgroup->vgId;
3,388✔
397
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
3,388!
398
    pVgEp = NULL;
3,388✔
399
    sdbRelease(pMnode->pSdb, pVgroup);
3,388✔
400
  }
401

402
  pIter = NULL;
1,341✔
403
  while (1) {
861✔
404
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,202✔
405
    if (pIter == NULL) break;
2,202✔
406
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
861✔
407
    int32_t j = 0;
861✔
408
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
2,585✔
409
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,724✔
410
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,724!
411
      bool     find = false;
1,724✔
412
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
2,341✔
413
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
2,069✔
414
        MND_TMQ_NULL_CHECK(pnewVgEp);
2,069!
415
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
2,069✔
416
          tDeleteSMqVgEp(pnewVgEp);
1,452✔
417
          taosArrayRemove(newVgs, k);
1,452✔
418
          find = true;
1,452✔
419
          break;
1,452✔
420
        }
421
      }
422
      if (!find) {
1,724✔
423
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
272!
424
        tDeleteSMqVgEp(pVgEpTmp);
272✔
425
        taosArrayRemove(pConsumerEp->vgs, j);
272✔
426
        continue;
272✔
427
      }
428
      j++;
1,452✔
429
    }
430
  }
431

432
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,341✔
433
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
272!
434
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
272!
435
    taosArrayDestroy(newVgs);
272✔
436
  } else {
437
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
1,069✔
438
  }
439
  return totalVgNum;
1,341✔
440

441
END:
×
442
  sdbRelease(pMnode->pSdb, pVgroup);
×
443
  taosMemoryFree(pVgEp);
×
444
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
445
  return code;
×
446
}
447

448
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,341✔
449
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,341!
450
    return TSDB_CODE_INVALID_PARA;
×
451
  }
452
  SMqSubscribeObj *pSub = NULL;
1,341✔
453
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,341✔
454
  if( code != 0){
1,341✔
455
    return 0;
508✔
456
  }
457
  taosRLockLatch(&pSub->lock);
833✔
458
  if (pOutput->pSub->offsetRows == NULL) {
833✔
459
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
702✔
460
    if(pOutput->pSub->offsetRows == NULL) {
702!
461
      taosRUnLockLatch(&pSub->lock);
×
462
      code = terrno;
×
463
      goto END;
×
464
    }
465
  }
466
  void *pIter = NULL;
833✔
467
  while (1) {
861✔
468
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,694✔
469
    if (pIter == NULL) break;
1,694✔
470
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
861✔
471
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
861✔
472

473
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
2,522✔
474
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,661✔
475
      MND_TMQ_NULL_CHECK(d1);
1,661!
476
      bool        jump = false;
1,661✔
477
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
2,189✔
478
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
563✔
479
        MND_TMQ_NULL_CHECK(pVgEp);
563!
480
        if (pVgEp->vgId == d1->vgId) {
563✔
481
          jump = true;
35✔
482
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
35!
483
                pConsumerEp->consumerId, pVgEp->vgId);
484
          break;
35✔
485
        }
486
      }
487
      if (jump) continue;
1,661✔
488
      bool find = false;
1,626✔
489
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
3,719✔
490
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
2,266✔
491
        MND_TMQ_NULL_CHECK(d2);
2,266!
492
        if (d1->vgId == d2->vgId) {
2,266✔
493
          d2->rows += d1->rows;
173✔
494
          d2->offset = d1->offset;
173✔
495
          d2->ever = d1->ever;
173✔
496
          find = true;
173✔
497
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
173!
498
          break;
173✔
499
        }
500
      }
501
      if (!find) {
1,626✔
502
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,906!
503
      }
504
    }
505
  }
506
  taosRUnLockLatch(&pSub->lock);
833✔
507
  mndReleaseSubscribe(pMnode, pSub);
833✔
508

509
END:
833✔
510
  return code;
833✔
511
}
512

513
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,341✔
514
  if (pOutput == NULL) return;
1,341!
515
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,341!
516
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
4,665✔
517
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
3,324✔
518
    if (pOutputRebVg == NULL) continue;
3,324!
519
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
3,324!
520
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
521
  }
522

523
  void *pIter = NULL;
1,341✔
524
  while (1) {
919✔
525
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,260✔
526
    if (pIter == NULL) break;
2,260✔
527
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
919✔
528
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
919✔
529
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
919!
530
          pConsumerEp->consumerId, sz);
531
    for (int32_t i = 0; i < sz; i++) {
2,959✔
532
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
2,040✔
533
      if (pVgEp == NULL) continue;
2,040!
534
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
2,040!
535
            pConsumerEp->consumerId);
536
    }
537
  }
538
}
539

540
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,341✔
541
                           int32_t *remainderVgCnt) {
542
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
1,341!
543
    return;
×
544
  }
545
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,341✔
546
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,341✔
547
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,341✔
548

549
  // calc num
550
  if (numOfFinal != 0) {
1,341✔
551
    *minVgCnt = totalVgNum / numOfFinal;
854✔
552
    *remainderVgCnt = totalVgNum % numOfFinal;
854✔
553
  } else {
554
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
487!
555
  }
556
  mInfo(
1,341!
557
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
558
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
559
}
560

561
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,341✔
562
  if (pOutput == NULL || pHash == NULL) {
1,341!
563
    return TSDB_CODE_INVALID_PARA;
×
564
  }
565
  SMqRebOutputVg *pRebVg = NULL;
1,341✔
566
  void           *pAssignIter = NULL;
1,341✔
567
  void           *pIter = NULL;
1,341✔
568
  int32_t         code = 0;
1,341✔
569

570
  while (1) {
919✔
571
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,260✔
572
    if (pIter == NULL) {
2,260✔
573
      break;
1,341✔
574
    }
575
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
919✔
576
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,868✔
577
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,949✔
578
      if (pAssignIter == NULL) {
1,949!
579
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
580
        break;
×
581
      }
582

583
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,949✔
584
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,949✔
585
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,898!
586
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,949!
587
            pConsumerEp->consumerId);
588
    }
589
  }
590

591
  while (1) {
29✔
592
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,370✔
593
    if (pIter == NULL) {
1,370✔
594
      break;
487✔
595
    }
596
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
883✔
597
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
883✔
598
      pAssignIter = taosHashIterate(pHash, pAssignIter);
881✔
599
      if (pAssignIter == NULL) {
881✔
600
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
854!
601
        break;
854✔
602
      }
603

604
      pRebVg = (SMqRebOutputVg *)pAssignIter;
27✔
605
      pRebVg->newConsumerId = pConsumerEp->consumerId;
27✔
606
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
54!
607
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
27!
608
            pConsumerEp->consumerId);
609
    }
610
  }
611

612
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,341✔
613
  if (pAssignIter != NULL) {
1,341!
614
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
615
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
616
    goto END;
×
617
  }
618
  while (1) {
3,324✔
619
    pAssignIter = taosHashIterate(pHash, pAssignIter);
4,665✔
620
    if (pAssignIter == NULL) {
4,665✔
621
      break;
1,341✔
622
    }
623

624
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
3,324✔
625
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
6,648!
626
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
3,324✔
627
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
2,696!
628
    }
629
  }
630

631
END:
1,341✔
632
  return code;
1,341✔
633
}
634

635
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,341✔
636
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,341!
637
    return TSDB_CODE_INVALID_PARA;
×
638
  }
639
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,341✔
640
  if (totalVgNum < 0){
1,341!
641
    return totalVgNum;
×
642
  }
643
  const char *pSubKey = pOutput->pSub->key;
1,341✔
644
  int32_t     minVgCnt = 0;
1,341✔
645
  int32_t     remainderVgCnt = 0;
1,341✔
646
  int32_t     code = 0;
1,341✔
647
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,341✔
648
  MND_TMQ_NULL_CHECK(pHash);
1,341!
649
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,341!
650
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,341!
651
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,341✔
652
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,341!
653
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,341!
654
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,341!
655
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,341!
656
  printRebalanceLog(pOutput);
1,341✔
657
  taosHashCleanup(pHash);
1,341✔
658

659
END:
1,341✔
660
  return code;
1,341✔
661
}
662

663
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
3,114✔
664
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
3,114!
665
    return TSDB_CODE_INVALID_PARA;
×
666
  }
667
  int32_t         code = 0;
3,114✔
668
  SMqConsumerObj *pConsumerNew = NULL;
3,114✔
669
  int32_t         consumerNum = taosArrayGetSize(consumers);
3,114✔
670
  for (int32_t i = 0; i < consumerNum; i++) {
4,282✔
671
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
1,168✔
672
    MND_TMQ_NULL_CHECK(consumerId);
1,168!
673
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
1,168!
674
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,168!
675
    tDeleteSMqConsumerObj(pConsumerNew);
1,168✔
676
  }
677
  pConsumerNew = NULL;
3,114✔
678

679
END:
3,114✔
680
  tDeleteSMqConsumerObj(pConsumerNew);
3,114✔
681
  return code;
3,114✔
682
}
683

684
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
1,038✔
685
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
1,038!
686
    return TSDB_CODE_INVALID_PARA;
×
687
  }
688
  int32_t code = 0;
1,038✔
689
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
1,038!
690
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
1,038!
691
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
1,038!
692
END:
1,038✔
693
  return code;
1,038✔
694
}
695

696
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,341✔
697
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
1,341!
698
    return TSDB_CODE_INVALID_PARA;
×
699
  }
700
  struct SSubplan *pPlan = NULL;
1,341✔
701
  int32_t          code = 0;
1,341✔
702
  STrans          *pTrans = NULL;
1,341✔
703

704
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,341✔
705
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
1,066!
706
  }
707

708
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,341✔
709
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,341✔
710
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,341✔
711

712
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,341✔
713
  if (pTrans == NULL) {
1,341!
714
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
715
    if (terrno != 0) code = terrno;
×
716
    goto END;
×
717
  }
718

719
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,341✔
720
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,341✔
721

722
  // 1. redo action: action to all vg
723
  const SArray *rebVgs = pOutput->rebVgs;
1,038✔
724
  int32_t       vgNum = taosArrayGetSize(rebVgs);
1,038✔
725
  for (int32_t i = 0; i < vgNum; i++) {
3,799✔
726
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,761✔
727
    MND_TMQ_NULL_CHECK(pRebVg);
2,761!
728
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,761!
729
  }
730

731
  // 2. commit log: subscribe and vg assignment
732
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
1,038!
733

734
  // 3. commit log: consumer to update status and epoch
735
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
1,038!
736

737
  // 4. set cb
738
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
1,038✔
739

740
  // 5. execution
741
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1,038!
742

743
END:
1,038✔
744
  nodesDestroyNode((SNode *)pPlan);
1,341✔
745
  mndTransDrop(pTrans);
1,341✔
746
  TAOS_RETURN(code);
1,341✔
747
}
748

749
static void freeRebalanceItem(void *param) {
1,341✔
750
  if (param == NULL) return;
1,341!
751
  SMqRebInfo *pInfo = param;
1,341✔
752
  taosArrayDestroy(pInfo->newConsumers);
1,341✔
753
  taosArrayDestroy(pInfo->removedConsumers);
1,341✔
754
}
755

756
// type = 0 remove  type = 1 add
757
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
2,065✔
758
  if (rebSubHash == NULL || topicList == NULL) {
2,065!
759
    return TSDB_CODE_INVALID_PARA;
×
760
  }
761
  taosRLockLatch(&pConsumer->lock);
2,065✔
762
  int32_t code = 0;
2,065✔
763
  int32_t topicNum = taosArrayGetSize(topicList);
2,065✔
764
  for (int32_t i = 0; i < topicNum; i++) {
3,227✔
765
    char *removedTopic = taosArrayGetP(topicList, i);
1,162✔
766
    MND_TMQ_NULL_CHECK(removedTopic);
1,162!
767
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,162✔
768
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
1,162✔
769
    SMqRebInfo *pRebSub = NULL;
1,162✔
770
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
1,162!
771
    if (type == 0)
1,162✔
772
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
1,104!
773
    else if (type == 1)
610!
774
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
1,220!
775
  }
776

777
END:
2,065✔
778
  taosRUnLockLatch(&pConsumer->lock);
2,065✔
779
  return code;
2,065✔
780
}
781

782
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,411✔
783
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
2,411!
784
    return;
×
785
  }
786
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,411✔
787
  for (int32_t i = 0; i < newTopicNum; i++) {
5,007✔
788
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,596✔
789
    if (topic == NULL){
2,596!
790
      continue;
×
791
    }
792
    SMqSubscribeObj *pSub = NULL;
2,596✔
793
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,596✔
794
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,596✔
795
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,596✔
796
    if (code != 0) {
2,596!
797
      continue;
×
798
    }
799
    taosRLockLatch(&pSub->lock);
2,596✔
800

801
    // iterate all vg assigned to the consumer of that topic
802
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,596✔
803
    if (pConsumerEp == NULL){
2,596!
804
      taosRUnLockLatch(&pSub->lock);
×
805
      mndReleaseSubscribe(pMnode, pSub);
×
806
      continue;
×
807
    }
808
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,596✔
809
    for (int32_t j = 0; j < vgNum; j++) {
8,013✔
810
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
5,417✔
811
      if (pVgEp == NULL) {
5,417!
812
        continue;
×
813
      }
814
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
5,417✔
815
      if (!pVgroup) {
5,417✔
816
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
248✔
817
        if (code != 0){
248!
818
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
819
        }else{
820
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
248!
821
        }
822
      }
823
      mndReleaseVgroup(pMnode, pVgroup);
5,417✔
824
    }
825
    taosRUnLockLatch(&pSub->lock);
2,596✔
826
    mndReleaseSubscribe(pMnode, pSub);
2,596✔
827
  }
828
}
829

830
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
29,450✔
831
  if (pMsg == NULL || rebSubHash == NULL) {
29,450!
832
    return TSDB_CODE_INVALID_PARA;
×
833
  }
834
  SMnode         *pMnode = pMsg->info.node;
29,450✔
835
  SSdb           *pSdb = pMnode->pSdb;
29,450✔
836
  SMqConsumerObj *pConsumer = NULL;
29,450✔
837
  void           *pIter = NULL;
29,450✔
838
  int32_t         code = 0;
29,450✔
839

840
  // iterate all consumers, find all modification
841
  while (1) {
3,838✔
842
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
33,288✔
843
    if (pIter == NULL) {
33,288✔
844
      break;
29,450✔
845
    }
846

847
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,838✔
848
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,838✔
849
    int32_t status = atomic_load_32(&pConsumer->status);
3,838✔
850

851
    mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,838!
852
           ", hbstatus:%d, pollStatus:%d",
853
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
854
           pConsumer->createTime, hbStatus, pollStatus);
855

856
    if (status == MQ_CONSUMER_STATUS_READY) {
3,838✔
857
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
2,811✔
858
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
389!
859
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,422✔
860
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,412✔
861
        mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
11!
862
           ", hb lost cnt:%d, or long time no poll cnt:%d",
863
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
864
           pConsumer->createTime, hbStatus, pollStatus);
865
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
11!
866
      } else {
867
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,411✔
868
      }
869
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
1,027!
870
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
1,027!
871
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
1,027!
872
    } else {
873
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
874
    }
875

876
    mndReleaseConsumer(pMnode, pConsumer);
3,838✔
877
  }
878
END:
29,450✔
879
  return code;
29,450✔
880
}
881

882
bool mndRebTryStart() {
29,450✔
883
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
29,450✔
884
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
29,450!
885
}
886

887
void mndRebCntInc() {
1,050✔
888
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
1,050✔
889
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
1,050!
890
}
1,050✔
891

892
void mndRebCntDec() {
30,500✔
893
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
30,500✔
894
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
30,500!
895
}
30,500✔
896

897
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,341✔
898
  if (rebOutput == NULL) {
1,341!
899
    return;
×
900
  }
901
  taosArrayDestroy(rebOutput->newConsumers);
1,341✔
902
  taosArrayDestroy(rebOutput->modifyConsumers);
1,341✔
903
  taosArrayDestroy(rebOutput->removedConsumers);
1,341✔
904
  taosArrayDestroy(rebOutput->rebVgs);
1,341✔
905
  tDeleteSubscribeObj(rebOutput->pSub);
1,341✔
906
  taosMemoryFree(rebOutput->pSub);
1,341!
907
}
908

909
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,341✔
910
  if (rebOutput == NULL) {
1,341!
911
    return TSDB_CODE_INVALID_PARA;
×
912
  }
913
  int32_t code = 0;
1,341✔
914
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,341✔
915
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,341!
916
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,341✔
917
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,341!
918
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,341✔
919
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,341!
920
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,341✔
921
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,341!
922
  return code;
1,341✔
923

924
END:
×
925
  clearRebOutput(rebOutput);
×
926
  return code;
×
927
}
928

929
// This function only works when there are dirty consumers
930
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
833✔
931
  if (pMnode == NULL || pSub == NULL) {
833!
932
    return TSDB_CODE_INVALID_PARA;
×
933
  }
934
  int32_t code = 0;
833✔
935
  void   *pIter = NULL;
833✔
936
  while (1) {
861✔
937
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,694✔
938
    if (pIter == NULL) {
1,694✔
939
      break;
833✔
940
    }
941

942
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
861✔
943
    SMqConsumerObj *pConsumer = NULL;
861✔
944
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
861✔
945
    if (code == 0) {
861!
946
      mndReleaseConsumer(pMnode, pConsumer);
861✔
947
      continue;
861✔
948
    }
949
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
950
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
951

952
    taosArrayDestroy(pConsumerEp->vgs);
×
953
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
954
  }
955
END:
833✔
956
  return code;
833✔
957
}
958

959
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,341✔
960
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
1,341!
961
    return TSDB_CODE_INVALID_PARA;
×
962
  }
963
  const char      *key = rebInput->pRebInfo->key;
1,341✔
964
  SMqSubscribeObj *pSub = NULL;
1,341✔
965
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,341✔
966

967
  if (code != 0) {
1,341✔
968
    // split sub key and extract topic
969
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
508✔
970
    char cgroup[TSDB_CGROUP_LEN] = {0};
508✔
971
    mndSplitSubscribeKey(key, topic, cgroup, true);
508✔
972
    SMqTopicObj *pTopic = NULL;
508✔
973
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
508!
974
    taosRLockLatch(&pTopic->lock);
508✔
975

976
    rebInput->oldConsumerNum = 0;
508✔
977
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
508✔
978
    if (code != 0) {
508!
979
      mError("tmq rebalance mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
980
      taosRUnLockLatch(&pTopic->lock);
×
981
      mndReleaseTopic(pMnode, pTopic);
×
982
      return code;
×
983
    }
984

985
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
508✔
986
    taosRUnLockLatch(&pTopic->lock);
508✔
987
    mndReleaseTopic(pMnode, pTopic);
508✔
988

989
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
508!
990
  } else {
991
    taosRLockLatch(&pSub->lock);
833✔
992
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
833✔
993
    if(code != 0){
833!
994
      taosRUnLockLatch(&pSub->lock);
×
995
      goto END;
×
996
    }
997
    code = checkConsumer(pMnode, rebOutput->pSub);
833✔
998
    if(code != 0){
833!
999
      taosRUnLockLatch(&pSub->lock);
×
1000
      goto END;
×
1001
    }
1002
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
833✔
1003
    taosRUnLockLatch(&pSub->lock);
833✔
1004

1005
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
833!
1006
    mndReleaseSubscribe(pMnode, pSub);
833✔
1007
  }
1008

1009
END:
1,341✔
1010
  return code;
1,341✔
1011
}
1012

1013
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
29,450✔
1014
  if (pMsg == NULL) {
29,450!
1015
    return TSDB_CODE_INVALID_PARA;
×
1016
  }
1017
  int     code = 0;
29,450✔
1018
  void   *pIter = NULL;
29,450✔
1019
  SMnode *pMnode = pMsg->info.node;
29,450✔
1020
  PRINT_LOG_START;
29,450✔
1021
  if (!mndRebTryStart()) {
29,450!
1022
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
1023
    return code;
×
1024
  }
1025

1026
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
29,450✔
1027
  MND_TMQ_NULL_CHECK(rebSubHash);
29,450!
1028

1029
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
29,450✔
1030

1031
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
29,450!
1032
  if (taosHashGetSize(rebSubHash) > 0) {
29,450✔
1033
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
1,126!
1034
  }
1035

1036
  while (1) {
1,341✔
1037
    pIter = taosHashIterate(rebSubHash, pIter);
30,791✔
1038
    if (pIter == NULL) {
30,791✔
1039
      break;
29,450✔
1040
    }
1041

1042
    SMqRebInputObj  rebInput = {0};
1,341✔
1043
    SMqRebOutputObj rebOutput = {0};
1,341✔
1044
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,341!
1045
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,341✔
1046
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,341✔
1047
    if (code != 0) {
1,341!
1048
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1049
    }
1050

1051
    if (code == 0){
1,341!
1052
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,341✔
1053
      if (code != 0) {
1,341!
1054
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1055
      }
1056
    }
1057

1058
    if (code == 0){
1,341!
1059
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,341✔
1060
      if (code != 0) {
1,341✔
1061
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
303!
1062
      }
1063
    }
1064

1065
    clearRebOutput(&rebOutput);
1,341✔
1066
  }
1067

1068
  if (taosHashGetSize(rebSubHash) > 0) {
29,450✔
1069
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
1,126!
1070
  }
1071

1072
END:
28,324✔
1073
  taosHashCancelIterate(rebSubHash, pIter);
29,450✔
1074
  taosHashCleanup(rebSubHash);
29,450✔
1075
  mndRebCntDec();
29,450✔
1076

1077
  PRINT_LOG_END(code);
29,450!
1078
  TAOS_RETURN(code);
29,450✔
1079
}
1080

1081
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
256✔
1082
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
256!
1083
    return TSDB_CODE_INVALID_PARA;
×
1084
  }
1085
  void   *pIter = NULL;
256✔
1086
  SVgObj *pVgObj = NULL;
256✔
1087
  int32_t code = 0;
256✔
1088
  while (1) {
1,727✔
1089
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
1,983✔
1090
    if (pIter == NULL) {
1,983✔
1091
      break;
256✔
1092
    }
1093
    if (pVgObj->mountVgId) {
1,727!
1094
      sdbRelease(pMnode->pSdb, pVgObj);
×
1095
      continue;
848✔
1096
    }
1097

1098
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,727✔
1099
      sdbRelease(pMnode->pSdb, pVgObj);
848✔
1100
      continue;
848✔
1101
    }
1102
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
879!
1103
    MND_TMQ_NULL_CHECK(pReq);
879!
1104
    pReq->head.vgId = htonl(pVgObj->vgId);
879✔
1105
    pReq->vgId = pVgObj->vgId;
879✔
1106
    pReq->consumerId = -1;
879✔
1107
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
879✔
1108

1109
    STransAction action = {0};
879✔
1110
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
879✔
1111
    action.pCont = pReq;
879✔
1112
    action.contLen = sizeof(SMqVDeleteReq);
879✔
1113
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
879✔
1114
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
879✔
1115

1116
    sdbRelease(pMnode->pSdb, pVgObj);
879✔
1117
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
879!
1118
  }
1119

1120
END:
256✔
1121
  sdbRelease(pMnode->pSdb, pVgObj);
256✔
1122
  sdbCancelFetch(pMnode->pSdb, pIter);
256✔
1123
  return code;
256✔
1124
}
1125

1126
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
6✔
1127
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
6!
1128
    return TSDB_CODE_INVALID_PARA;
×
1129
  }
1130
  void           *pIter = NULL;
6✔
1131
  SMqConsumerObj *pConsumer = NULL;
6✔
1132
  SMqConsumerObj *pConsumerNew = NULL;
6✔
1133
  int             code = 0;
6✔
1134
  while (1) {
1135
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
8✔
1136
    if (pIter == NULL) {
8✔
1137
      break;
6✔
1138
    }
1139

1140
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
2!
1141
      sdbRelease(pMnode->pSdb, pConsumer);
×
1142
      continue;
×
1143
    }
1144

1145
    if (deleteConsumer) {
2✔
1146
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
1!
1147
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
1!
1148
      tDeleteSMqConsumerObj(pConsumerNew);
1✔
1149
      pConsumerNew = NULL;
1✔
1150
    } else {
1151
      bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1152
      if (found){
1!
1153
        mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1154
               topic, pConsumer->consumerId, pConsumer->cgroup);
1155
        code = TSDB_CODE_MND_CGROUP_USED;
×
1156
        goto END;
×
1157
      }
1158
    }
1159

1160

1161
    sdbRelease(pMnode->pSdb, pConsumer);
2✔
1162
  }
1163

1164
END:
6✔
1165
  tDeleteSMqConsumerObj(pConsumerNew);
6✔
1166
  sdbRelease(pMnode->pSdb, pConsumer);
6✔
1167
  sdbCancelFetch(pMnode->pSdb, pIter);
6✔
1168
  return code;
6✔
1169
}
1170

1171
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
6✔
1172
  if (pMsg == NULL) {
6!
1173
    return TSDB_CODE_INVALID_PARA;
×
1174
  }
1175
  SMnode         *pMnode = pMsg->info.node;
6✔
1176
  SMDropCgroupReq dropReq = {0};
6✔
1177
  STrans         *pTrans = NULL;
6✔
1178
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
6✔
1179
  SMqSubscribeObj *pSub = NULL;
6✔
1180

1181
  PRINT_LOG_START
6!
1182
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
6!
1183
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
6✔
1184
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
6✔
1185
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
6✔
1186
  if (code != 0) {
6!
1187
    if (dropReq.igNotExists) {
×
1188
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1189
      return 0;
×
1190
    } else {
1191
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1192
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1193
      return code;
×
1194
    }
1195
  }
1196

1197
  taosWLockLatch(&pSub->lock);
6✔
1198
  if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
6!
1199
    code = TSDB_CODE_MND_CGROUP_USED;
×
1200
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1201
    goto END;
×
1202
  }
1203

1204
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
6✔
1205
  MND_TMQ_NULL_CHECK(pTrans);
6!
1206
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
6!
1207
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
6✔
1208
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
6!
1209
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
6!
1210
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
6!
1211
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
6!
1212
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
6!
1213

1214
END:
6✔
1215
  taosWUnLockLatch(&pSub->lock);
6✔
1216
  mndReleaseSubscribe(pMnode, pSub);
6✔
1217
  mndTransDrop(pTrans);
6✔
1218
  PRINT_LOG_END(code);
6!
1219

1220
  if (code != 0) {
6!
1221
    TAOS_RETURN(code);
×
1222
  }
1223
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
6✔
1224
}
1225

1226
void mndCleanupSubscribe(SMnode *pMnode) {}
2,466✔
1227

1228
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,697✔
1229
  if (pSub == NULL) {
1,697!
1230
    return NULL;
×
1231
  }
1232
  int32_t code = 0;
1,697✔
1233
  int32_t lino = 0;
1,697✔
1234
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,697✔
1235
  void   *buf = NULL;
1,697✔
1236
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,697✔
1237
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,697!
1238
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,697✔
1239

1240
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,697✔
1241
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,697!
1242

1243
  buf = taosMemoryMalloc(tlen);
1,697!
1244
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,697!
1245

1246
  void *abuf = buf;
1,697✔
1247
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,697!
1248
    goto SUB_ENCODE_OVER;
×
1249
  }
1250

1251
  int32_t dataPos = 0;
1,697✔
1252
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,697!
1253
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,697!
1254
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,697!
1255
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,697!
1256

1257
  terrno = TSDB_CODE_SUCCESS;
1,697✔
1258

1259
SUB_ENCODE_OVER:
1,697✔
1260
  taosMemoryFreeClear(buf);
1,697!
1261
  if (terrno != TSDB_CODE_SUCCESS) {
1,697!
1262
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1263
    sdbFreeRaw(pRaw);
×
1264
    return NULL;
×
1265
  }
1266

1267
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,697✔
1268
  return pRaw;
1,697✔
1269
}
1270

1271
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,329✔
1272
  if (pRaw == NULL) {
1,329!
1273
    return NULL;
×
1274
  }
1275
  int32_t code = 0;
1,329✔
1276
  int32_t lino = 0;
1,329✔
1277
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,329✔
1278
  SSdbRow         *pRow = NULL;
1,329✔
1279
  SMqSubscribeObj *pSub = NULL;
1,329✔
1280
  void            *buf = NULL;
1,329✔
1281

1282
  int8_t sver = 0;
1,329✔
1283
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,329!
1284

1285
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,329!
1286
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1287
    goto SUB_DECODE_OVER;
×
1288
  }
1289

1290
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,329✔
1291
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,329!
1292

1293
  pSub = sdbGetRowObj(pRow);
1,329✔
1294
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,329!
1295

1296
  int32_t dataPos = 0;
1,329✔
1297
  int32_t tlen;
1298
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,329!
1299
  buf = taosMemoryMalloc(tlen);
1,329!
1300
  if (buf == NULL) goto SUB_DECODE_OVER;
1,329!
1301
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,329!
1302
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,329!
1303

1304
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,329!
1305
    goto SUB_DECODE_OVER;
×
1306
  }
1307

1308
  // update epset saved in mnode
1309
  if (pSub->unassignedVgs != NULL) {
1,329!
1310
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,329✔
1311
    for (int32_t i = 0; i < size; ++i) {
3,469✔
1312
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
2,140✔
1313
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
2,140✔
1314
    }
1315
  }
1316
  if (pSub->consumerHash != NULL) {
1,329!
1317
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,329✔
1318
    while (pIter) {
2,033✔
1319
      SMqConsumerEp *pConsumerEp = pIter;
704✔
1320
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
704✔
1321
      for (int32_t i = 0; i < size; ++i) {
2,333✔
1322
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,629✔
1323
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,629✔
1324
      }
1325
      pIter = taosHashIterate(pSub->consumerHash, pIter);
704✔
1326
    }
1327
  }
1328

1329
  terrno = TSDB_CODE_SUCCESS;
1,329✔
1330

1331
SUB_DECODE_OVER:
1,329✔
1332
  taosMemoryFreeClear(buf);
1,329!
1333
  if (terrno != TSDB_CODE_SUCCESS) {
1,329!
1334
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1335
    taosMemoryFreeClear(pRow);
×
1336
    return NULL;
×
1337
  }
1338

1339
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,329✔
1340
  return pRow;
1,329✔
1341
}
1342

1343
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
528✔
1344
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
528!
1345
  return 0;
528✔
1346
}
1347

1348
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,329✔
1349
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
1,329!
1350
  tDeleteSubscribeObj(pSub);
1,329✔
1351
  return 0;
1,329✔
1352
}
1353

1354
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
545✔
1355
  if (pOldSub == NULL || pNewSub == NULL) return -1;
545!
1356
  mTrace("subscribe:%s, perform update action", pOldSub->key);
545✔
1357
  taosWLockLatch(&pOldSub->lock);
545✔
1358

1359
  SHashObj *tmp = pOldSub->consumerHash;
545✔
1360
  pOldSub->consumerHash = pNewSub->consumerHash;
545✔
1361
  pNewSub->consumerHash = tmp;
545✔
1362

1363
  SArray *tmp1 = pOldSub->unassignedVgs;
545✔
1364
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
545✔
1365
  pNewSub->unassignedVgs = tmp1;
545✔
1366

1367
  SArray *tmp2 = pOldSub->offsetRows;
545✔
1368
  pOldSub->offsetRows = pNewSub->offsetRows;
545✔
1369
  pNewSub->offsetRows = tmp2;
545✔
1370

1371
  taosWUnLockLatch(&pOldSub->lock);
545✔
1372
  return 0;
545✔
1373
}
1374

1375
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
11,594✔
1376
  if (pMnode == NULL || key == NULL || pSub == NULL){
11,594!
1377
    return TSDB_CODE_INVALID_PARA;
×
1378
  }
1379
  SSdb            *pSdb = pMnode->pSdb;
11,594✔
1380
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
11,594✔
1381
  if (*pSub == NULL) {
11,594✔
1382
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
1,016✔
1383
  }
1384
  return 0;
10,578✔
1385
}
1386

1387
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
620✔
1388
  if (pMnode == NULL || topicName == NULL) return 0;
620!
1389
  int32_t num = 0;
620✔
1390
  SSdb   *pSdb = pMnode->pSdb;
620✔
1391

1392
  void            *pIter = NULL;
620✔
1393
  SMqSubscribeObj *pSub = NULL;
620✔
1394
  while (1) {
1,014✔
1395
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
1,634✔
1396
    if (pIter == NULL) break;
1,634✔
1397

1398
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,014✔
1399
    char cgroup[TSDB_CGROUP_LEN] = {0};
1,014✔
1400
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
1,014✔
1401
    if (strcmp(topic, topicName) != 0) {
1,014✔
1402
      sdbRelease(pSdb, pSub);
746✔
1403
      continue;
746✔
1404
    }
1405

1406
    num++;
268✔
1407
    sdbRelease(pSdb, pSub);
268✔
1408
  }
1409

1410
  return num;
620✔
1411
}
1412

1413
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
10,578✔
1414
  if (pMnode == NULL || pSub == NULL) return;
10,578!
1415
  SSdb *pSdb = pMnode->pSdb;
10,578✔
1416
  sdbRelease(pSdb, pSub);
10,578✔
1417
}
1418

1419
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
256✔
1420
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
256!
1421
  int32_t  code = 0;
256✔
1422
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
256✔
1423
  MND_TMQ_NULL_CHECK(pCommitRaw);
256!
1424
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
256✔
1425
  if (code != 0){
256!
1426
    sdbFreeRaw(pCommitRaw);
×
1427
    goto END;
×
1428
  }
1429
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
256✔
1430
END:
256✔
1431
  return code;
256✔
1432
}
1433

1434
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
344✔
1435
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
344!
1436
  SSdb            *pSdb = pMnode->pSdb;
344✔
1437
  int32_t          code = 0;
344✔
1438
  void            *pIter = NULL;
344✔
1439
  SMqSubscribeObj *pSub = NULL;
344✔
1440
  while (1) {
476✔
1441
    sdbRelease(pSdb, pSub);
820✔
1442
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
820✔
1443
    if (pIter == NULL) break;
820✔
1444

1445
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
485✔
1446
    char cgroup[TSDB_CGROUP_LEN] = {0};
485✔
1447
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
485✔
1448
    if (strcmp(topic, topicName) != 0) {
485✔
1449
      continue;
226✔
1450
    }
1451

1452
    // iter all vnode to delete handle
1453
    if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
259✔
1454
      code = TSDB_CODE_MND_IN_REBALANCE;
9✔
1455
      goto END;
9✔
1456
    }
1457

1458
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
250!
1459
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
250!
1460
  }
1461

1462
END:
344✔
1463
  sdbRelease(pSdb, pSub);
344✔
1464
  sdbCancelFetch(pSdb, pIter);
344✔
1465

1466
  TAOS_RETURN(code);
344✔
1467
}
1468

1469
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
438✔
1470
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1471
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
438!
1472
    return TSDB_CODE_INVALID_PARA;
×
1473
  }
1474
  int32_t code = 0;
438✔
1475
  int32_t sz = taosArrayGetSize(vgs);
438✔
1476
  for (int32_t j = 0; j < sz; j++) {
882✔
1477
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
444✔
1478
    MND_TMQ_NULL_CHECK(pVgEp);
444!
1479

1480
    SColumnInfoData *pColInfo = NULL;
444✔
1481
    int32_t          cols = 0;
444✔
1482

1483
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1484
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1485
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
444!
1486

1487
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1488
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1489
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
444!
1490

1491
    // vg id
1492
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1493
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1494
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
444!
1495

1496
    // consumer id
1497
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
444✔
1498
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
444✔
1499
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
444✔
1500

1501
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1502
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1503
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
444!
1504

1505
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
444✔
1506
    if (user) STR_TO_VARSTR(userStr, user);
444✔
1507
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1508
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1509
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
444!
1510

1511
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
444✔
1512
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
444✔
1513
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
444✔
1514
    MND_TMQ_NULL_CHECK(pColInfo);
444!
1515
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
444!
1516

1517
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
444!
1518
          varDataVal(cgroup), pVgEp->vgId);
1519

1520
    // offset
1521
    OffsetRows *data = NULL;
444✔
1522
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,454✔
1523
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
1,010✔
1524
      MND_TMQ_NULL_CHECK(tmp);
1,010!
1525
      if (tmp->vgId != pVgEp->vgId) {
1,010✔
1526
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1527
        continue;
736✔
1528
      }
1529
      data = tmp;
274✔
1530
    }
1531
    if (data) {
444✔
1532
      // vg id
1533
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
274✔
1534
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
274✔
1535
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
274✔
1536
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
274✔
1537
      varDataSetLen(buf, strlen(varDataVal(buf)));
274✔
1538
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
274✔
1539
      MND_TMQ_NULL_CHECK(pColInfo);
274!
1540
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
274!
1541
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
274✔
1542
      MND_TMQ_NULL_CHECK(pColInfo);
274!
1543
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
274!
1544
    } else {
1545
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
170✔
1546
      MND_TMQ_NULL_CHECK(pColInfo);
170!
1547
      colDataSetNULL(pColInfo, *numOfRows);
170!
1548
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
170✔
1549
      MND_TMQ_NULL_CHECK(pColInfo);
170!
1550
      colDataSetNULL(pColInfo, *numOfRows);
170!
1551
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
170!
1552
    }
1553
    (*numOfRows)++;
444✔
1554
  }
1555
  return 0;
438✔
1556
END:
×
1557
  return code;
×
1558
}
1559

1560
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
89✔
1561
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
89!
1562
    return TSDB_CODE_INVALID_PARA;
×
1563
  }
1564
  SMnode          *pMnode = pReq->info.node;
89✔
1565
  SSdb            *pSdb = pMnode->pSdb;
89✔
1566
  int32_t          numOfRows = 0;
89✔
1567
  SMqSubscribeObj *pSub = NULL;
89✔
1568
  int32_t          code = 0;
89✔
1569

1570
  mInfo("mnd show subscriptions begin");
89!
1571

1572
  while (numOfRows < rowsCapacity) {
341!
1573
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
341✔
1574
    if (pShow->pIter == NULL) {
341✔
1575
      break;
89✔
1576
    }
1577

1578
    taosRLockLatch(&pSub->lock);
252✔
1579

1580
    if (numOfRows + pSub->vgNum > rowsCapacity) {
252!
1581
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1582
    }
1583

1584
    // topic and cgroup
1585
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
252✔
1586
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
252✔
1587
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
252✔
1588
    varDataSetLen(topic, strlen(varDataVal(topic)));
252✔
1589
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
252✔
1590

1591
    SMqConsumerEp *pConsumerEp = NULL;
252✔
1592
    void          *pIter = NULL;
252✔
1593

1594
    while (1) {
186✔
1595
      pIter = taosHashIterate(pSub->consumerHash, pIter);
438✔
1596
      if (pIter == NULL) break;
438✔
1597
      pConsumerEp = (SMqConsumerEp *)pIter;
186✔
1598

1599
      char          *user = NULL;
186✔
1600
      char          *fqdn = NULL;
186✔
1601
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
186✔
1602
      if (pConsumer != NULL) {
186!
1603
        user = pConsumer->user;
186✔
1604
        fqdn = pConsumer->fqdn;
186✔
1605
        sdbRelease(pSdb, pConsumer);
186✔
1606
      }
1607
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
186!
1608
                  pConsumerEp->offsetRows));
1609
    }
1610

1611
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
252!
1612

1613
    pBlock->info.rows = numOfRows;
252✔
1614

1615
    taosRUnLockLatch(&pSub->lock);
252✔
1616
    sdbRelease(pSdb, pSub);
252✔
1617
  }
1618

1619
  mInfo("mnd end show subscriptions");
89!
1620

1621
  pShow->numOfRows += numOfRows;
89✔
1622
  return numOfRows;
89✔
1623

1624
END:
×
1625
  taosRUnLockLatch(&pSub->lock);
×
1626
  sdbRelease(pSdb, pSub);
×
1627

1628
  return code;
×
1629
}
1630

1631
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1632
  if (pMnode == NULL) {
×
1633
    return;
×
1634
  }
1635
  SSdb *pSdb = pMnode->pSdb;
×
1636
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1637
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc