• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.38
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
1,020✔
46
  if (pTrans == NULL || pSub == NULL) {
1,020!
47
    return TSDB_CODE_INVALID_PARA;
×
48
  }
49
  int32_t  code = 0;
1,020✔
50
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
1,020✔
51
  MND_TMQ_NULL_CHECK(pCommitRaw);
1,020!
52
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
1,020✔
53
  if (code != 0) {
1,020!
54
    sdbFreeRaw(pCommitRaw);
×
55
    goto END;
×
56
  }
57
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
1,020✔
58

59
END:
1,020✔
60
  return code;
1,020✔
61
}
62

63
int32_t mndInitSubscribe(SMnode *pMnode) {
1,748✔
64
  SSdbTable table = {
1,748✔
65
      .sdbType = SDB_SUBSCRIBE,
66
      .keyType = SDB_KEY_BINARY,
67
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
68
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
69
      .insertFp = (SdbInsertFp)mndSubActionInsert,
70
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
71
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
72
  };
73

74
  if (pMnode == NULL) {
1,748!
75
    return TSDB_CODE_INVALID_PARA;
×
76
  }
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
1,748✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
1,748✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
1,748✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
1,748✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
1,748✔
82

83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
1,748✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
1,748✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
87
}
88

89
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
498✔
90
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
498!
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
93
  int32_t code = 0;
498✔
94
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
498!
95
  (*pSub)->dbUid = pTopic->dbUid;
498✔
96
  (*pSub)->stbUid = pTopic->stbUid;
498✔
97
  (*pSub)->subType = pTopic->subType;
498✔
98
  (*pSub)->withMeta = pTopic->withMeta;
498✔
99

100
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
498!
101
  return code;
498✔
102

103
END:
×
104
  tDeleteSubscribeObj(*pSub);
×
105
  taosMemoryFree(*pSub);
×
106
  return code;
×
107
}
108

109
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
2,629✔
110
                                    SSubplan *pPlan) {
111
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
2,629!
112
    return TSDB_CODE_INVALID_PARA;
×
113
  }
114
  SMqRebVgReq req = {0};
2,629✔
115
  int32_t     code = 0;
2,629✔
116
  SEncoder encoder = {0};
2,629✔
117

118
  req.oldConsumerId = pRebVg->oldConsumerId;
2,629✔
119
  req.newConsumerId = pRebVg->newConsumerId;
2,629✔
120
  req.vgId = pRebVg->pVgEp->vgId;
2,629✔
121
  if (pPlan) {
2,629✔
122
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
2,163✔
123
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
2,163✔
124
    int32_t msgLen = 0;
2,163✔
125
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
2,163!
126
  } else {
127
    req.qmsg = taosStrdup("");
466!
128
    MND_TMQ_NULL_CHECK(req.qmsg);
466!
129
  }
130
  req.subType = pSub->subType;
2,629✔
131
  req.withMeta = pSub->withMeta;
2,629✔
132
  req.suid = pSub->stbUid;
2,629✔
133
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
2,629✔
134

135
  int32_t tlen = 0;
2,629✔
136
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
2,629!
137
  if (code < 0) {
2,629!
138
    goto END;
×
139
  }
140

141
  tlen += sizeof(SMsgHead);
2,629✔
142
  void *buf = taosMemoryMalloc(tlen);
2,629!
143
  MND_TMQ_NULL_CHECK(buf);
2,629!
144
  SMsgHead *pMsgHead = (SMsgHead *)buf;
2,629✔
145
  pMsgHead->contLen = htonl(tlen);
2,629✔
146
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
2,629✔
147

148
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
2,629✔
149
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
2,629!
150
  *pBuf = buf;
2,629✔
151
  *pLen = tlen;
2,629✔
152

153
END:
2,629✔
154
  tEncoderClear(&encoder);
2,629✔
155
  taosMemoryFree(req.qmsg);
2,629!
156
  return code;
2,629✔
157
}
158

159
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
2,631✔
160
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
161
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
2,631!
162
    return TSDB_CODE_INVALID_PARA;
×
163
  }
164
  int32_t code = 0;
2,631✔
165
  void   *buf  = NULL;
2,631✔
166

167
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
2,631✔
168
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
2!
169
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
170
    goto END;
×
171
  }
172

173
  int32_t tlen = 0;
2,629✔
174
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
2,629!
175
  int32_t vgId = pRebVg->pVgEp->vgId;
2,629✔
176
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
2,629✔
177
  if (pVgObj == NULL) {
2,629!
178
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
179
    goto END;
×
180
  }
181

182
  STransAction action = {0};
2,629✔
183
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
2,629✔
184
  action.pCont = buf;
2,629✔
185
  action.contLen = tlen;
2,629✔
186
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
2,629✔
187

188
  mndReleaseVgroup(pMnode, pVgObj);
2,629✔
189
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
2,629!
190
  return code;
2,629✔
191

192
END:
×
193
  taosMemoryFree(buf);
×
194
  return code;
×
195
}
196

197
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
3,480✔
198
  if (key == NULL || topic == NULL || cgroup == NULL) {
3,480!
199
    return;
×
200
  }
201
  int32_t i = 0;
3,480✔
202
  while (key[i] != TMQ_SEPARATOR_CHAR) {
24,217✔
203
    i++;
20,737✔
204
  }
205
  (void)memcpy(cgroup, key, i);
3,480✔
206
  cgroup[i] = 0;
3,480✔
207
  if (fullName) {
3,480✔
208
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
3,305✔
209
  } else {
210
    while (key[i] != '.') {
525✔
211
      i++;
350✔
212
    }
213
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
175✔
214
  }
215
}
216

217
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
1,368✔
218
  if (pHash == NULL || key == NULL) {
1,368!
219
    return TSDB_CODE_INVALID_PARA;
×
220
  }
221
  int32_t code = 0;
1,368✔
222
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,368✔
223
  if (pRebInfo == NULL) {
1,368✔
224
    pRebInfo = tNewSMqRebSubscribe(key);
1,297✔
225
    if (pRebInfo == NULL) {
1,297!
226
      code = terrno;
×
227
      goto END;
×
228
    }
229
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
1,297✔
230
    taosMemoryFreeClear(pRebInfo);
1,297!
231
    if (code != 0) {
1,297!
232
      goto END;
×
233
    }
234
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
1,297✔
235
    MND_TMQ_NULL_CHECK(pRebInfo);
1,297!
236
  }
237
  if (pReb){
1,368✔
238
    *pReb = pRebInfo;
1,133✔
239
  }
240

241
END:
235✔
242
  return code;
1,368✔
243
}
244

245
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
3,161✔
246
  if (vgs == NULL || pHash == NULL || key == NULL) {
3,161!
247
    return TSDB_CODE_INVALID_PARA;
×
248
  }
249
  int32_t         code = 0;
3,161✔
250
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
3,161✔
251
  MND_TMQ_NULL_CHECK(pVgEp);
3,161!
252
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
3,161✔
253
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
3,161!
254
  mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
3,161!
255
END:
×
256
  return code;
3,161✔
257
}
258

259
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
1,297✔
260
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
1,297!
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
263
  int32_t code = 0;
1,297✔
264
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,297✔
265
  int32_t actualRemoved = 0;
1,297✔
266
  for (int32_t i = 0; i < numOfRemoved; i++) {
1,829✔
267
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
532✔
268
    MND_TMQ_NULL_CHECK(consumerId);
532!
269
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
532✔
270
    if (pConsumerEp == NULL) {
532!
271
      continue;
×
272
    }
273

274
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
532✔
275
    for (int32_t j = 0; j < consumerVgNum; j++) {
1,835✔
276
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
1,303!
277
    }
278

279
    taosArrayDestroy(pConsumerEp->vgs);
532✔
280
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
532!
281
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
1,064!
282
    actualRemoved++;
532✔
283
  }
284

285
  if (numOfRemoved != actualRemoved) {
1,297!
286
    mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
287
           actualRemoved);
288
  } else {
289
    mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
1,297!
290
  }
291
END:
×
292
  return code;
1,297✔
293
}
294

295
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
1,297✔
296
  if (pOutput == NULL || pInput == NULL) {
1,297!
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
299
  int32_t code = 0;
1,297✔
300
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,297✔
301

302
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
1,898✔
303
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
601✔
304
    MND_TMQ_NULL_CHECK(consumerId);
601!
305
    SMqConsumerEp newConsumerEp = {0};
601✔
306
    newConsumerEp.consumerId = *consumerId;
601✔
307
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
601✔
308
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
601!
309
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
601!
310
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
1,202!
311
    mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
601!
312
  }
313
END:
1,297✔
314
  return code;
1,297✔
315
}
316

317
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
1,297✔
318
  if (pOutput == NULL || pHash == NULL) {
1,297!
319
    return TSDB_CODE_INVALID_PARA;
×
320
  }
321
  int32_t code = 0;
1,297✔
322
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
1,297✔
323
  for (int32_t i = 0; i < numOfVgroups; i++) {
3,141✔
324
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
1,844!
325
  }
326
END:
1,297✔
327
  return code;
1,297✔
328
}
329

330
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
1,297✔
331
                                     int32_t remainderVgCnt) {
332
  if (pOutput == NULL || pHash == NULL) {
1,297!
333
    return TSDB_CODE_INVALID_PARA;
×
334
  }
335
  int32_t code = 0;
1,297✔
336
  int32_t cnt = 0;
1,297✔
337
  void   *pIter = NULL;
1,297✔
338

339
  while (1) {
292✔
340
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,589✔
341
    if (pIter == NULL) {
1,589✔
342
      break;
1,297✔
343
    }
344

345
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
292✔
346
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
292✔
347

348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
584!
349
    if (consumerVgNum > minVgCnt) {
292✔
350
      if (cnt < remainderVgCnt) {
9✔
351
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
2!
352
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
353
        }
354
        cnt++;
2✔
355
      } else {
356
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
21✔
357
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
14!
358
        }
359
      }
360
    }
361
  }
362
END:
1,297✔
363
  return code;
1,297✔
364
}
365

366
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
1,297✔
367
  if (pMnode == NULL || pOutput == NULL) {
1,297!
368
    return TSDB_CODE_INVALID_PARA;
×
369
  }
370
  int32_t code = 0;
1,297✔
371
  int32_t totalVgNum = 0;
1,297✔
372
  SVgObj *pVgroup = NULL;
1,297✔
373
  SMqVgEp *pVgEp = NULL;
1,297✔
374
  void   *pIter = NULL;
1,297✔
375
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
1,297✔
376
  MND_TMQ_NULL_CHECK(newVgs);
1,297!
377
  while (1) {
378
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
8,345✔
379
    if (pIter == NULL) {
8,345✔
380
      break;
1,297✔
381
    }
382

383
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
7,048✔
384
      sdbRelease(pMnode->pSdb, pVgroup);
3,828✔
385
      continue;
3,828✔
386
    }
387

388
    totalVgNum++;
3,220✔
389
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
3,220!
390
    MND_TMQ_NULL_CHECK(pVgEp);
3,220!
391
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,220✔
392
    pVgEp->vgId = pVgroup->vgId;
3,220✔
393
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
3,220!
394
    pVgEp = NULL;
3,220✔
395
    sdbRelease(pMnode->pSdb, pVgroup);
3,220✔
396
  }
397

398
  pIter = NULL;
1,297✔
399
  while (1) {
824✔
400
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,121✔
401
    if (pIter == NULL) break;
2,121✔
402
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
824✔
403
    int32_t j = 0;
824✔
404
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
2,463✔
405
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
1,639✔
406
      MND_TMQ_NULL_CHECK(pVgEpTmp);
1,639!
407
      bool     find = false;
1,639✔
408
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
2,230✔
409
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
1,967✔
410
        MND_TMQ_NULL_CHECK(pnewVgEp);
1,967!
411
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
1,967✔
412
          tDeleteSMqVgEp(pnewVgEp);
1,376✔
413
          taosArrayRemove(newVgs, k);
1,376✔
414
          find = true;
1,376✔
415
          break;
1,376✔
416
        }
417
      }
418
      if (!find) {
1,639✔
419
        mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
263!
420
        tDeleteSMqVgEp(pVgEpTmp);
263✔
421
        taosArrayRemove(pConsumerEp->vgs, j);
263✔
422
        continue;
263✔
423
      }
424
      j++;
1,376✔
425
    }
426
  }
427

428
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
1,297✔
429
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
263!
430
    mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
263!
431
    taosArrayDestroy(newVgs);
263✔
432
  } else {
433
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
1,034✔
434
  }
435
  return totalVgNum;
1,297✔
436

437
END:
×
438
  sdbRelease(pMnode->pSdb, pVgroup);
×
439
  taosMemoryFree(pVgEp);
×
440
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
441
  return code;
×
442
}
443

444
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,297✔
445
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,297!
446
    return TSDB_CODE_INVALID_PARA;
×
447
  }
448
  SMqSubscribeObj *pSub = NULL;
1,297✔
449
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
1,297✔
450
  if( code != 0){
1,297✔
451
    return 0;
498✔
452
  }
453
  taosRLockLatch(&pSub->lock);
799✔
454
  if (pOutput->pSub->offsetRows == NULL) {
799✔
455
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
668✔
456
    if(pOutput->pSub->offsetRows == NULL) {
668!
457
      taosRUnLockLatch(&pSub->lock);
×
458
      code = terrno;
×
459
      goto END;
×
460
    }
461
  }
462
  void *pIter = NULL;
799✔
463
  while (1) {
824✔
464
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,623✔
465
    if (pIter == NULL) break;
1,623✔
466
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
824✔
467
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
824✔
468

469
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
2,394✔
470
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
1,570✔
471
      MND_TMQ_NULL_CHECK(d1);
1,570!
472
      bool        jump = false;
1,570✔
473
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
2,057✔
474
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
515✔
475
        MND_TMQ_NULL_CHECK(pVgEp);
515!
476
        if (pVgEp->vgId == d1->vgId) {
515✔
477
          jump = true;
28✔
478
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
28!
479
                pConsumerEp->consumerId, pVgEp->vgId);
480
          break;
28✔
481
        }
482
      }
483
      if (jump) continue;
1,570✔
484
      bool find = false;
1,542✔
485
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
3,111✔
486
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
1,741✔
487
        MND_TMQ_NULL_CHECK(d2);
1,741!
488
        if (d1->vgId == d2->vgId) {
1,741✔
489
          d2->rows += d1->rows;
172✔
490
          d2->offset = d1->offset;
172✔
491
          d2->ever = d1->ever;
172✔
492
          find = true;
172✔
493
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
172!
494
          break;
172✔
495
        }
496
      }
497
      if (!find) {
1,542✔
498
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
2,740!
499
      }
500
    }
501
  }
502
  taosRUnLockLatch(&pSub->lock);
799✔
503
  mndReleaseSubscribe(pMnode, pSub);
799✔
504

505
END:
799✔
506
  return code;
799✔
507
}
508

509
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
1,297✔
510
  if (pOutput == NULL) return;
1,297!
511
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
1,297!
512
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
4,458✔
513
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
3,161✔
514
    if (pOutputRebVg == NULL) continue;
3,161!
515
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
3,161!
516
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
517
  }
518

519
  void *pIter = NULL;
1,297✔
520
  while (1) {
893✔
521
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,190✔
522
    if (pIter == NULL) break;
2,190✔
523
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
893✔
524
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
893✔
525
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
893!
526
          pConsumerEp->consumerId, sz);
527
    for (int32_t i = 0; i < sz; i++) {
2,831✔
528
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
1,938✔
529
      if (pVgEp == NULL) continue;
1,938!
530
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
1,938!
531
            pConsumerEp->consumerId);
532
    }
533
  }
534
}
535

536
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
1,297✔
537
                           int32_t *remainderVgCnt) {
538
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
1,297!
539
    return;
×
540
  }
541
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
1,297✔
542
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
1,297✔
543
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
1,297✔
544

545
  // calc num
546
  if (numOfFinal != 0) {
1,297✔
547
    *minVgCnt = totalVgNum / numOfFinal;
830✔
548
    *remainderVgCnt = totalVgNum % numOfFinal;
830✔
549
  } else {
550
    mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey);
467!
551
  }
552
  mInfo(
1,297!
553
      "[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
554
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
555
}
556

557
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
1,297✔
558
  if (pOutput == NULL || pHash == NULL) {
1,297!
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
561
  SMqRebOutputVg *pRebVg = NULL;
1,297✔
562
  void           *pAssignIter = NULL;
1,297✔
563
  void           *pIter = NULL;
1,297✔
564
  int32_t         code = 0;
1,297✔
565

566
  while (1) {
893✔
567
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
2,190✔
568
    if (pIter == NULL) {
2,190✔
569
      break;
1,297✔
570
    }
571
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
893✔
572
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
2,745✔
573
      pAssignIter = taosHashIterate(pHash, pAssignIter);
1,852✔
574
      if (pAssignIter == NULL) {
1,852!
575
        mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
576
        break;
×
577
      }
578

579
      pRebVg = (SMqRebOutputVg *)pAssignIter;
1,852✔
580
      pRebVg->newConsumerId = pConsumerEp->consumerId;
1,852✔
581
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
3,704!
582
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
1,852!
583
            pConsumerEp->consumerId);
584
    }
585
  }
586

587
  while (1) {
28✔
588
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
1,325✔
589
    if (pIter == NULL) {
1,325✔
590
      break;
467✔
591
    }
592
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
858✔
593
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
858✔
594
      pAssignIter = taosHashIterate(pHash, pAssignIter);
857✔
595
      if (pAssignIter == NULL) {
857✔
596
        mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key);
830!
597
        break;
830✔
598
      }
599

600
      pRebVg = (SMqRebOutputVg *)pAssignIter;
27✔
601
      pRebVg->newConsumerId = pConsumerEp->consumerId;
27✔
602
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
54!
603
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
27!
604
            pConsumerEp->consumerId);
605
    }
606
  }
607

608
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
1,297✔
609
  if (pAssignIter != NULL) {
1,297!
610
    mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
611
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
612
    goto END;
×
613
  }
614
  while (1) {
3,161✔
615
    pAssignIter = taosHashIterate(pHash, pAssignIter);
4,458✔
616
    if (pAssignIter == NULL) {
4,458✔
617
      break;
1,297✔
618
    }
619

620
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
3,161✔
621
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
6,322!
622
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
3,161✔
623
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
2,564!
624
    }
625
  }
626

627
END:
1,297✔
628
  return code;
1,297✔
629
}
630

631
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
1,297✔
632
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
1,297!
633
    return TSDB_CODE_INVALID_PARA;
×
634
  }
635
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
1,297✔
636
  if (totalVgNum < 0){
1,297!
637
    return totalVgNum;
×
638
  }
639
  const char *pSubKey = pOutput->pSub->key;
1,297✔
640
  int32_t     minVgCnt = 0;
1,297✔
641
  int32_t     remainderVgCnt = 0;
1,297✔
642
  int32_t     code = 0;
1,297✔
643
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,297✔
644
  MND_TMQ_NULL_CHECK(pHash);
1,297!
645
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
1,297!
646
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
1,297!
647
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
1,297✔
648
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
1,297!
649
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
1,297!
650
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
1,297!
651
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
1,297!
652
  printRebalanceLog(pOutput);
1,297✔
653
  taosHashCleanup(pHash);
1,297✔
654

655
END:
1,297✔
656
  return code;
1,297✔
657
}
658

659
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
3,060✔
660
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
3,060!
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
663
  int32_t         code = 0;
3,060✔
664
  SMqConsumerObj *pConsumerNew = NULL;
3,060✔
665
  int32_t         consumerNum = taosArrayGetSize(consumers);
3,060✔
666
  for (int32_t i = 0; i < consumerNum; i++) {
4,208✔
667
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
1,148✔
668
    MND_TMQ_NULL_CHECK(consumerId);
1,148!
669
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
1,148!
670
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,148!
671
    tDeleteSMqConsumerObj(pConsumerNew);
1,148✔
672
  }
673
  pConsumerNew = NULL;
3,060✔
674

675
END:
3,060✔
676
  tDeleteSMqConsumerObj(pConsumerNew);
3,060✔
677
  return code;
3,060✔
678
}
679

680
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
1,020✔
681
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
1,020!
682
    return TSDB_CODE_INVALID_PARA;
×
683
  }
684
  int32_t code = 0;
1,020✔
685
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
1,020!
686
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
1,020!
687
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
1,020!
688
END:
1,020✔
689
  return code;
1,020✔
690
}
691

692
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
1,297✔
693
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
1,297!
694
    return TSDB_CODE_INVALID_PARA;
×
695
  }
696
  struct SSubplan *pPlan = NULL;
1,297✔
697
  int32_t          code = 0;
1,297✔
698
  STrans          *pTrans = NULL;
1,297✔
699

700
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
1,297✔
701
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
1,037!
702
  }
703

704
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,297✔
705
  char cgroup[TSDB_CGROUP_LEN] = {0};
1,297✔
706
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
1,297✔
707

708
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
1,297✔
709
  if (pTrans == NULL) {
1,297!
710
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
711
    if (terrno != 0) code = terrno;
×
712
    goto END;
×
713
  }
714

715
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
1,297✔
716
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
1,297✔
717

718
  // 1. redo action: action to all vg
719
  const SArray *rebVgs = pOutput->rebVgs;
1,020✔
720
  int32_t       vgNum = taosArrayGetSize(rebVgs);
1,020✔
721
  for (int32_t i = 0; i < vgNum; i++) {
3,651✔
722
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
2,631✔
723
    MND_TMQ_NULL_CHECK(pRebVg);
2,631!
724
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
2,631!
725
  }
726

727
  // 2. commit log: subscribe and vg assignment
728
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
1,020!
729

730
  // 3. commit log: consumer to update status and epoch
731
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
1,020!
732

733
  // 4. set cb
734
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
1,020✔
735

736
  // 5. execution
737
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1,020!
738

739
END:
1,020✔
740
  nodesDestroyNode((SNode *)pPlan);
1,297✔
741
  mndTransDrop(pTrans);
1,297✔
742
  TAOS_RETURN(code);
1,297✔
743
}
744

745
static void freeRebalanceItem(void *param) {
1,297✔
746
  if (param == NULL) return;
1,297!
747
  SMqRebInfo *pInfo = param;
1,297✔
748
  taosArrayDestroy(pInfo->newConsumers);
1,297✔
749
  taosArrayDestroy(pInfo->removedConsumers);
1,297✔
750
}
751

752
// type = 0 remove  type = 1 add
753
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) {
2,009✔
754
  if (rebSubHash == NULL || topicList == NULL || group == NULL) {
2,009!
755
    return TSDB_CODE_INVALID_PARA;
×
756
  }
757
  int32_t code = 0;
2,009✔
758
  int32_t topicNum = taosArrayGetSize(topicList);
2,009✔
759
  for (int32_t i = 0; i < topicNum; i++) {
3,142✔
760
    char *removedTopic = taosArrayGetP(topicList, i);
1,133✔
761
    MND_TMQ_NULL_CHECK(removedTopic);
1,133!
762
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,133✔
763
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", group, TMQ_SEPARATOR, removedTopic);
1,133✔
764
    SMqRebInfo *pRebSub = NULL;
1,133✔
765
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
1,133!
766
    if (type == 0)
1,133✔
767
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &consumerId));
1,064!
768
    else if (type == 1)
601!
769
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &consumerId));
1,202!
770
  }
771

772
END:
2,009✔
773
  return code;
2,009✔
774
}
775

776
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
2,274✔
777
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
2,274!
778
    return;
×
779
  }
780
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
2,274✔
781
  for (int32_t i = 0; i < newTopicNum; i++) {
4,731✔
782
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,457✔
783
    if (topic == NULL){
2,457!
784
      continue;
×
785
    }
786
    SMqSubscribeObj *pSub = NULL;
2,457✔
787
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,457✔
788
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,457✔
789
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,457✔
790
    if (code != 0) {
2,457!
791
      continue;
×
792
    }
793
    taosRLockLatch(&pSub->lock);
2,457✔
794

795
    // iterate all vg assigned to the consumer of that topic
796
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,457✔
797
    if (pConsumerEp == NULL){
2,457!
798
      taosRUnLockLatch(&pSub->lock);
×
799
      mndReleaseSubscribe(pMnode, pSub);
×
800
      continue;
×
801
    }
802
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,457✔
803
    for (int32_t j = 0; j < vgNum; j++) {
7,751✔
804
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
5,294✔
805
      if (pVgEp == NULL) {
5,294!
806
        continue;
×
807
      }
808
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
5,294✔
809
      if (!pVgroup) {
5,294✔
810
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
235✔
811
        if (code != 0){
235!
812
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
813
        }else{
814
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
235!
815
        }
816
      }
817
      mndReleaseVgroup(pMnode, pVgroup);
5,294✔
818
    }
819
    taosRUnLockLatch(&pSub->lock);
2,457✔
820
    mndReleaseSubscribe(pMnode, pSub);
2,457✔
821
  }
822
}
823

824
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
34,121✔
825
  if (pMsg == NULL || rebSubHash == NULL) {
34,121!
826
    return TSDB_CODE_INVALID_PARA;
×
827
  }
828
  SMnode         *pMnode = pMsg->info.node;
34,121✔
829
  SSdb           *pSdb = pMnode->pSdb;
34,121✔
830
  SMqConsumerObj *pConsumer = NULL;
34,121✔
831
  void           *pIter = NULL;
34,121✔
832
  int32_t         code = 0;
34,121✔
833

834
  // iterate all consumers, find all modification
835
  while (1) {
3,665✔
836
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
37,786✔
837
    if (pIter == NULL) {
37,786✔
838
      break;
34,121✔
839
    }
840

841
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
3,665✔
842
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
3,665✔
843
    int32_t status = atomic_load_32(&pConsumer->status);
3,665✔
844

845
    mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
3,665✔
846
           ", hbstatus:%d, pollStatus:%d",
847
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
848
           pConsumer->createTime, hbStatus, pollStatus);
849

850
    if (status == MQ_CONSUMER_STATUS_READY) {
3,665✔
851
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
2,665✔
852
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
382!
853
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
2,283✔
854
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
2,275✔
855
        taosRLockLatch(&pConsumer->lock);
9✔
856
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
9!
857
        taosRUnLockLatch(&pConsumer->lock);
9✔
858
      } else {
859
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
2,274✔
860
      }
861
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
1,000!
862
      taosRLockLatch(&pConsumer->lock);
1,000✔
863
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
1,000!
864
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
1,000!
865
      taosRUnLockLatch(&pConsumer->lock);
1,000✔
866
    } else {
867
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
868
    }
869

870
    mndReleaseConsumer(pMnode, pConsumer);
3,665✔
871
  }
872
END:
34,121✔
873
  return code;
34,121✔
874
}
875

876
bool mndRebTryStart() {
34,121✔
877
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
34,121✔
878
  if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0;
34,121!
879
}
880

881
void mndRebCntInc() {
1,032✔
882
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
1,032✔
883
  if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
1,032!
884
}
1,032✔
885

886
void mndRebCntDec() {
35,153✔
887
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
35,153✔
888
  if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
35,153!
889
}
35,153✔
890

891
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
1,297✔
892
  if (rebOutput == NULL) {
1,297!
893
    return;
×
894
  }
895
  taosArrayDestroy(rebOutput->newConsumers);
1,297✔
896
  taosArrayDestroy(rebOutput->modifyConsumers);
1,297✔
897
  taosArrayDestroy(rebOutput->removedConsumers);
1,297✔
898
  taosArrayDestroy(rebOutput->rebVgs);
1,297✔
899
  tDeleteSubscribeObj(rebOutput->pSub);
1,297✔
900
  taosMemoryFree(rebOutput->pSub);
1,297!
901
}
902

903
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
1,297✔
904
  if (rebOutput == NULL) {
1,297!
905
    return TSDB_CODE_INVALID_PARA;
×
906
  }
907
  int32_t code = 0;
1,297✔
908
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
1,297✔
909
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
1,297!
910
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
1,297✔
911
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
1,297!
912
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
1,297✔
913
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
1,297!
914
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
1,297✔
915
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
1,297!
916
  return code;
1,297✔
917

918
END:
×
919
  clearRebOutput(rebOutput);
×
920
  return code;
×
921
}
922

923
// This function only works when there are dirty consumers
924
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
799✔
925
  if (pMnode == NULL || pSub == NULL) {
799!
926
    return TSDB_CODE_INVALID_PARA;
×
927
  }
928
  int32_t code = 0;
799✔
929
  void   *pIter = NULL;
799✔
930
  while (1) {
824✔
931
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,623✔
932
    if (pIter == NULL) {
1,623✔
933
      break;
799✔
934
    }
935

936
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
824✔
937
    SMqConsumerObj *pConsumer = NULL;
824✔
938
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
824✔
939
    if (code == 0) {
824!
940
      mndReleaseConsumer(pMnode, pConsumer);
824✔
941
      continue;
824✔
942
    }
943
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
944
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
945

946
    taosArrayDestroy(pConsumerEp->vgs);
×
947
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
948
  }
949
END:
799✔
950
  return code;
799✔
951
}
952

953
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
1,297✔
954
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
1,297!
955
    return TSDB_CODE_INVALID_PARA;
×
956
  }
957
  const char      *key = rebInput->pRebInfo->key;
1,297✔
958
  SMqSubscribeObj *pSub = NULL;
1,297✔
959
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
1,297✔
960

961
  if (code != 0) {
1,297✔
962
    // split sub key and extract topic
963
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
498✔
964
    char cgroup[TSDB_CGROUP_LEN] = {0};
498✔
965
    mndSplitSubscribeKey(key, topic, cgroup, true);
498✔
966
    SMqTopicObj *pTopic = NULL;
498✔
967
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
498!
968
    taosRLockLatch(&pTopic->lock);
498✔
969

970
    rebInput->oldConsumerNum = 0;
498✔
971
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
498✔
972
    if (code != 0) {
498!
973
      mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
974
      taosRUnLockLatch(&pTopic->lock);
×
975
      mndReleaseTopic(pMnode, pTopic);
×
976
      return code;
×
977
    }
978

979
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
498✔
980
    taosRUnLockLatch(&pTopic->lock);
498✔
981
    mndReleaseTopic(pMnode, pTopic);
498✔
982

983
    mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
498!
984
  } else {
985
    taosRLockLatch(&pSub->lock);
799✔
986
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
799✔
987
    if(code != 0){
799!
988
      taosRUnLockLatch(&pSub->lock);
×
989
      goto END;
×
990
    }
991
    code = checkConsumer(pMnode, rebOutput->pSub);
799✔
992
    if(code != 0){
799!
993
      taosRUnLockLatch(&pSub->lock);
×
994
      goto END;
×
995
    }
996
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
799✔
997
    taosRUnLockLatch(&pSub->lock);
799✔
998

999
    mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
799!
1000
    mndReleaseSubscribe(pMnode, pSub);
799✔
1001
  }
1002

1003
END:
1,297✔
1004
  return code;
1,297✔
1005
}
1006

1007
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
34,121✔
1008
  if (pMsg == NULL) {
34,121!
1009
    return TSDB_CODE_INVALID_PARA;
×
1010
  }
1011
  int     code = 0;
34,121✔
1012
  void   *pIter = NULL;
34,121✔
1013
  SMnode *pMnode = pMsg->info.node;
34,121✔
1014
  mDebug("[rebalance] start to process mq timer");
34,121✔
1015
  if (!mndRebTryStart()) {
34,121!
UNCOV
1016
    mInfo("[rebalance] mq rebalance already in progress, do nothing");
×
UNCOV
1017
    return code;
×
1018
  }
1019

1020
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
34,121✔
1021
  MND_TMQ_NULL_CHECK(rebSubHash);
34,121!
1022

1023
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
34,121✔
1024

1025
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
34,121!
1026
  if (taosHashGetSize(rebSubHash) > 0) {
34,121✔
1027
    mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
1,082!
1028
  }
1029

1030
  while (1) {
1,297✔
1031
    pIter = taosHashIterate(rebSubHash, pIter);
35,418✔
1032
    if (pIter == NULL) {
35,418✔
1033
      break;
34,121✔
1034
    }
1035

1036
    SMqRebInputObj  rebInput = {0};
1,297✔
1037
    SMqRebOutputObj rebOutput = {0};
1,297✔
1038
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
1,297!
1039
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
1,297✔
1040
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
1,297✔
1041
    if (code != 0) {
1,297!
1042
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1043
    }
1044

1045
    if (code == 0){
1,297!
1046
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
1,297✔
1047
      if (code != 0) {
1,297!
1048
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1049
      }
1050
    }
1051

1052
    if (code == 0){
1,297!
1053
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
1,297✔
1054
      if (code != 0) {
1,297✔
1055
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
277!
1056
      }
1057
    }
1058

1059
    clearRebOutput(&rebOutput);
1,297✔
1060
  }
1061

1062
  if (taosHashGetSize(rebSubHash) > 0) {
34,121✔
1063
    mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
1,082!
1064
  }
1065

1066
END:
33,039✔
1067
  taosHashCancelIterate(rebSubHash, pIter);
34,121✔
1068
  taosHashCleanup(rebSubHash);
34,121✔
1069
  mndRebCntDec();
34,121✔
1070

1071
  TAOS_RETURN(code);
34,121✔
1072
}
1073

1074
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
249✔
1075
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
249!
1076
    return TSDB_CODE_INVALID_PARA;
×
1077
  }
1078
  void   *pIter = NULL;
249✔
1079
  SVgObj *pVgObj = NULL;
249✔
1080
  int32_t code = 0;
249✔
1081
  while (1) {
1,879✔
1082
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
2,128✔
1083
    if (pIter == NULL) {
2,128✔
1084
      break;
249✔
1085
    }
1086

1087
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
1,879✔
1088
      sdbRelease(pMnode->pSdb, pVgObj);
1,074✔
1089
      continue;
1,074✔
1090
    }
1091
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
805!
1092
    MND_TMQ_NULL_CHECK(pReq);
805!
1093
    pReq->head.vgId = htonl(pVgObj->vgId);
805✔
1094
    pReq->vgId = pVgObj->vgId;
805✔
1095
    pReq->consumerId = -1;
805✔
1096
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
805✔
1097

1098
    STransAction action = {0};
805✔
1099
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
805✔
1100
    action.pCont = pReq;
805✔
1101
    action.contLen = sizeof(SMqVDeleteReq);
805✔
1102
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
805✔
1103
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
805✔
1104

1105
    sdbRelease(pMnode->pSdb, pVgObj);
805✔
1106
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
805!
1107
  }
1108

1109
END:
249✔
1110
  sdbRelease(pMnode->pSdb, pVgObj);
249✔
1111
  sdbCancelFetch(pMnode->pSdb, pIter);
249✔
1112
  return code;
249✔
1113
}
1114

1115
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
5✔
1116
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
5!
1117
    return TSDB_CODE_INVALID_PARA;
×
1118
  }
1119
  void           *pIter = NULL;
5✔
1120
  SMqConsumerObj *pConsumer = NULL;
5✔
1121
  int             code = 0;
5✔
1122
  while (1) {
1✔
1123
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
6✔
1124
    if (pIter == NULL) {
6✔
1125
      break;
5✔
1126
    }
1127

1128
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
1!
1129
      sdbRelease(pMnode->pSdb, pConsumer);
×
1130
      continue;
×
1131
    }
1132

1133
    bool found = checkTopic(pConsumer->assignedTopics, topic);
1✔
1134
    if (found){
1!
1135
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1136
             topic, pConsumer->consumerId, pConsumer->cgroup);
1137
      code = TSDB_CODE_MND_CGROUP_USED;
×
1138
      goto END;
×
1139
    }
1140

1141
    sdbRelease(pMnode->pSdb, pConsumer);
1✔
1142
  }
1143

1144
END:
5✔
1145
  sdbRelease(pMnode->pSdb, pConsumer);
5✔
1146
  sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1147
  return code;
5✔
1148
}
1149

1150
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
5✔
1151
  if (pMsg == NULL) {
5!
1152
    return TSDB_CODE_INVALID_PARA;
×
1153
  }
1154
  SMnode         *pMnode = pMsg->info.node;
5✔
1155
  SMDropCgroupReq dropReq = {0};
5✔
1156
  STrans         *pTrans = NULL;
5✔
1157
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
5✔
1158
  SMqSubscribeObj *pSub = NULL;
5✔
1159

1160
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
5!
1161
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
5✔
1162
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
5✔
1163
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
5✔
1164
  if (code != 0) {
5!
1165
    if (dropReq.igNotExists) {
×
1166
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1167
      return 0;
×
1168
    } else {
1169
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1170
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1171
      return code;
×
1172
    }
1173
  }
1174

1175
  taosWLockLatch(&pSub->lock);
5✔
1176
  if (taosHashGetSize(pSub->consumerHash) != 0) {
5!
1177
    code = TSDB_CODE_MND_CGROUP_USED;
×
1178
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1179
    goto END;
×
1180
  }
1181

1182
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
5✔
1183
  MND_TMQ_NULL_CHECK(pTrans);
5!
1184
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
5!
1185
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
5✔
1186
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
5!
1187
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
5!
1188
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
5!
1189
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
5!
1190
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
5!
1191

1192
END:
5✔
1193
  taosWUnLockLatch(&pSub->lock);
5✔
1194
  mndReleaseSubscribe(pMnode, pSub);
5✔
1195
  mndTransDrop(pTrans);
5✔
1196

1197
  if (code != 0) {
5!
1198
    mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
×
1199
    TAOS_RETURN(code);
×
1200
  }
1201
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
5✔
1202
}
1203

1204
void mndCleanupSubscribe(SMnode *pMnode) {}
1,747✔
1205

1206
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
1,664✔
1207
  if (pSub == NULL) {
1,664!
1208
    return NULL;
×
1209
  }
1210
  int32_t code = 0;
1,664✔
1211
  int32_t lino = 0;
1,664✔
1212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,664✔
1213
  void   *buf = NULL;
1,664✔
1214
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
1,664✔
1215
  if (tlen <= 0) goto SUB_ENCODE_OVER;
1,664!
1216
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
1,664✔
1217

1218
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
1,664✔
1219
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
1,664!
1220

1221
  buf = taosMemoryMalloc(tlen);
1,664!
1222
  if (buf == NULL) goto SUB_ENCODE_OVER;
1,664!
1223

1224
  void *abuf = buf;
1,664✔
1225
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
1,664!
1226
    goto SUB_ENCODE_OVER;
×
1227
  }
1228

1229
  int32_t dataPos = 0;
1,664✔
1230
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
1,664!
1231
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
1,664!
1232
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
1,664!
1233
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
1,664!
1234

1235
  terrno = TSDB_CODE_SUCCESS;
1,664✔
1236

1237
SUB_ENCODE_OVER:
1,664✔
1238
  taosMemoryFreeClear(buf);
1,664!
1239
  if (terrno != TSDB_CODE_SUCCESS) {
1,664!
1240
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1241
    sdbFreeRaw(pRaw);
×
1242
    return NULL;
×
1243
  }
1244

1245
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
1,664✔
1246
  return pRaw;
1,664✔
1247
}
1248

1249
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
1,300✔
1250
  if (pRaw == NULL) {
1,300!
1251
    return NULL;
×
1252
  }
1253
  int32_t code = 0;
1,300✔
1254
  int32_t lino = 0;
1,300✔
1255
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,300✔
1256
  SSdbRow         *pRow = NULL;
1,300✔
1257
  SMqSubscribeObj *pSub = NULL;
1,300✔
1258
  void            *buf = NULL;
1,300✔
1259

1260
  int8_t sver = 0;
1,300✔
1261
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
1,300!
1262

1263
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
1,300!
1264
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1265
    goto SUB_DECODE_OVER;
×
1266
  }
1267

1268
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
1,300✔
1269
  if (pRow == NULL) goto SUB_DECODE_OVER;
1,300!
1270

1271
  pSub = sdbGetRowObj(pRow);
1,300✔
1272
  if (pSub == NULL) goto SUB_DECODE_OVER;
1,300!
1273

1274
  int32_t dataPos = 0;
1,300✔
1275
  int32_t tlen;
1276
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
1,300!
1277
  buf = taosMemoryMalloc(tlen);
1,300!
1278
  if (buf == NULL) goto SUB_DECODE_OVER;
1,300!
1279
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
1,300!
1280
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
1,300!
1281

1282
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
1,300!
1283
    goto SUB_DECODE_OVER;
×
1284
  }
1285

1286
  // update epset saved in mnode
1287
  if (pSub->unassignedVgs != NULL) {
1,300!
1288
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
1,300✔
1289
    for (int32_t i = 0; i < size; ++i) {
3,332✔
1290
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
2,032✔
1291
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
2,032✔
1292
    }
1293
  }
1294
  if (pSub->consumerHash != NULL) {
1,300!
1295
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
1,300✔
1296
    while (pIter) {
1,982✔
1297
      SMqConsumerEp *pConsumerEp = pIter;
682✔
1298
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
682✔
1299
      for (int32_t i = 0; i < size; ++i) {
2,206✔
1300
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
1,524✔
1301
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
1,524✔
1302
      }
1303
      pIter = taosHashIterate(pSub->consumerHash, pIter);
682✔
1304
    }
1305
  }
1306

1307
  terrno = TSDB_CODE_SUCCESS;
1,300✔
1308

1309
SUB_DECODE_OVER:
1,300✔
1310
  taosMemoryFreeClear(buf);
1,300!
1311
  if (terrno != TSDB_CODE_SUCCESS) {
1,300!
1312
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1313
    taosMemoryFreeClear(pRow);
×
1314
    return NULL;
×
1315
  }
1316

1317
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
1,300✔
1318
  return pRow;
1,300✔
1319
}
1320

1321
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
513✔
1322
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
513!
1323
  return 0;
513✔
1324
}
1325

1326
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
1,300✔
1327
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
1,300!
1328
  tDeleteSubscribeObj(pSub);
1,300✔
1329
  return 0;
1,300✔
1330
}
1331

1332
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
536✔
1333
  if (pOldSub == NULL || pNewSub == NULL) return -1;
536!
1334
  mTrace("subscribe:%s, perform update action", pOldSub->key);
536✔
1335
  taosWLockLatch(&pOldSub->lock);
536✔
1336

1337
  SHashObj *tmp = pOldSub->consumerHash;
536✔
1338
  pOldSub->consumerHash = pNewSub->consumerHash;
536✔
1339
  pNewSub->consumerHash = tmp;
536✔
1340

1341
  SArray *tmp1 = pOldSub->unassignedVgs;
536✔
1342
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
536✔
1343
  pNewSub->unassignedVgs = tmp1;
536✔
1344

1345
  SArray *tmp2 = pOldSub->offsetRows;
536✔
1346
  pOldSub->offsetRows = pNewSub->offsetRows;
536✔
1347
  pNewSub->offsetRows = tmp2;
536✔
1348

1349
  taosWUnLockLatch(&pOldSub->lock);
536✔
1350
  return 0;
536✔
1351
}
1352

1353
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
556,425✔
1354
  if (pMnode == NULL || key == NULL || pSub == NULL){
556,425!
1355
    return TSDB_CODE_INVALID_PARA;
×
1356
  }
1357
  SSdb            *pSdb = pMnode->pSdb;
556,426✔
1358
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
556,426✔
1359
  if (*pSub == NULL) {
556,426✔
1360
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
996✔
1361
  }
1362
  return 0;
555,430✔
1363
}
1364

1365
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
605✔
1366
  if (pMnode == NULL || topicName == NULL) return 0;
605!
1367
  int32_t num = 0;
605✔
1368
  SSdb   *pSdb = pMnode->pSdb;
605✔
1369

1370
  void            *pIter = NULL;
605✔
1371
  SMqSubscribeObj *pSub = NULL;
605✔
1372
  while (1) {
1,022✔
1373
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
1,627✔
1374
    if (pIter == NULL) break;
1,627✔
1375

1376
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
1,022✔
1377
    char cgroup[TSDB_CGROUP_LEN] = {0};
1,022✔
1378
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
1,022✔
1379
    if (strcmp(topic, topicName) != 0) {
1,022✔
1380
      sdbRelease(pSdb, pSub);
764✔
1381
      continue;
764✔
1382
    }
1383

1384
    num++;
258✔
1385
    sdbRelease(pSdb, pSub);
258✔
1386
  }
1387

1388
  return num;
605✔
1389
}
1390

1391
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
555,430✔
1392
  if (pMnode == NULL || pSub == NULL) return;
555,430!
1393
  SSdb *pSdb = pMnode->pSdb;
555,430✔
1394
  sdbRelease(pSdb, pSub);
555,430✔
1395
}
1396

1397
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
249✔
1398
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
249!
1399
  int32_t  code = 0;
249✔
1400
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
249✔
1401
  MND_TMQ_NULL_CHECK(pCommitRaw);
249!
1402
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
249✔
1403
  if (code != 0){
249!
1404
    sdbFreeRaw(pCommitRaw);
×
1405
    goto END;
×
1406
  }
1407
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
249✔
1408
END:
249✔
1409
  return code;
249✔
1410
}
1411

1412
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
326✔
1413
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
326!
1414
  SSdb            *pSdb = pMnode->pSdb;
326✔
1415
  int32_t          code = 0;
326✔
1416
  void            *pIter = NULL;
326✔
1417
  SMqSubscribeObj *pSub = NULL;
326✔
1418
  while (1) {
488✔
1419
    sdbRelease(pSdb, pSub);
814✔
1420
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
814✔
1421
    if (pIter == NULL) break;
814✔
1422

1423
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
488✔
1424
    char cgroup[TSDB_CGROUP_LEN] = {0};
488✔
1425
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
488✔
1426
    if (strcmp(topic, topicName) != 0) {
488✔
1427
      continue;
244✔
1428
    }
1429

1430
    // iter all vnode to delete handle
1431
    if (taosHashGetSize(pSub->consumerHash) != 0) {
244!
1432
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1433
      goto END;
×
1434
    }
1435

1436
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
244!
1437
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
244!
1438
  }
1439

1440
END:
326✔
1441
  sdbRelease(pSdb, pSub);
326✔
1442
  sdbCancelFetch(pSdb, pIter);
326✔
1443

1444
  TAOS_RETURN(code);
326✔
1445
}
1446

1447
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
284✔
1448
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1449
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
284!
1450
    return TSDB_CODE_INVALID_PARA;
×
1451
  }
1452
  int32_t code = 0;
284✔
1453
  int32_t sz = taosArrayGetSize(vgs);
284✔
1454
  for (int32_t j = 0; j < sz; j++) {
651✔
1455
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
367✔
1456
    MND_TMQ_NULL_CHECK(pVgEp);
367!
1457

1458
    SColumnInfoData *pColInfo = NULL;
367✔
1459
    int32_t          cols = 0;
367✔
1460

1461
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1462
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1463
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
367!
1464

1465
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1466
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1467
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
367!
1468

1469
    // vg id
1470
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1471
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1472
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
367!
1473

1474
    // consumer id
1475
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
367✔
1476
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
367✔
1477
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
367✔
1478

1479
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1480
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1481
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
367!
1482

1483
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
367✔
1484
    if (user) STR_TO_VARSTR(userStr, user);
367✔
1485
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1486
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1487
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
367!
1488

1489
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
367✔
1490
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
367✔
1491
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367✔
1492
    MND_TMQ_NULL_CHECK(pColInfo);
367!
1493
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
367!
1494

1495
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
367!
1496
          varDataVal(cgroup), pVgEp->vgId);
1497

1498
    // offset
1499
    OffsetRows *data = NULL;
367✔
1500
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
1,361✔
1501
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
994✔
1502
      MND_TMQ_NULL_CHECK(tmp);
994!
1503
      if (tmp->vgId != pVgEp->vgId) {
994✔
1504
        mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
736!
1505
        continue;
736✔
1506
      }
1507
      data = tmp;
258✔
1508
    }
1509
    if (data) {
367✔
1510
      // vg id
1511
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
258✔
1512
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
258✔
1513
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
258✔
1514
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
258✔
1515
      varDataSetLen(buf, strlen(varDataVal(buf)));
258✔
1516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
258✔
1517
      MND_TMQ_NULL_CHECK(pColInfo);
258!
1518
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
258!
1519
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
258✔
1520
      MND_TMQ_NULL_CHECK(pColInfo);
258!
1521
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
258!
1522
    } else {
1523
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
109✔
1524
      MND_TMQ_NULL_CHECK(pColInfo);
109!
1525
      colDataSetNULL(pColInfo, *numOfRows);
109!
1526
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
109✔
1527
      MND_TMQ_NULL_CHECK(pColInfo);
109!
1528
      colDataSetNULL(pColInfo, *numOfRows);
109!
1529
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
109!
1530
    }
1531
    (*numOfRows)++;
367✔
1532
  }
1533
  return 0;
284✔
1534
END:
×
1535
  return code;
×
1536
}
1537

1538
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
5,846✔
1539
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
5,846!
1540
    return TSDB_CODE_INVALID_PARA;
×
1541
  }
1542
  SMnode          *pMnode = pReq->info.node;
5,849✔
1543
  SSdb            *pSdb = pMnode->pSdb;
5,849✔
1544
  int32_t          numOfRows = 0;
5,849✔
1545
  SMqSubscribeObj *pSub = NULL;
5,849✔
1546
  int32_t          code = 0;
5,849✔
1547

1548
  mInfo("mnd show subscriptions begin");
5,849✔
1549

1550
  while (numOfRows < rowsCapacity) {
6,025!
1551
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
6,025✔
1552
    if (pShow->pIter == NULL) {
6,025✔
1553
      break;
5,850✔
1554
    }
1555

1556
    taosRLockLatch(&pSub->lock);
175✔
1557

1558
    if (numOfRows + pSub->vgNum > rowsCapacity) {
175!
1559
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1560
    }
1561

1562
    // topic and cgroup
1563
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
175✔
1564
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
175✔
1565
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
175✔
1566
    varDataSetLen(topic, strlen(varDataVal(topic)));
175✔
1567
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
175✔
1568

1569
    SMqConsumerEp *pConsumerEp = NULL;
175✔
1570
    void          *pIter = NULL;
175✔
1571

1572
    while (1) {
109✔
1573
      pIter = taosHashIterate(pSub->consumerHash, pIter);
284✔
1574
      if (pIter == NULL) break;
284✔
1575
      pConsumerEp = (SMqConsumerEp *)pIter;
109✔
1576

1577
      char          *user = NULL;
109✔
1578
      char          *fqdn = NULL;
109✔
1579
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
109✔
1580
      if (pConsumer != NULL) {
109!
1581
        user = pConsumer->user;
109✔
1582
        fqdn = pConsumer->fqdn;
109✔
1583
        sdbRelease(pSdb, pConsumer);
109✔
1584
      }
1585
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
109!
1586
                  pConsumerEp->offsetRows));
1587
    }
1588

1589
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
175!
1590

1591
    pBlock->info.rows = numOfRows;
175✔
1592

1593
    taosRUnLockLatch(&pSub->lock);
175✔
1594
    sdbRelease(pSdb, pSub);
175✔
1595
  }
1596

1597
  mInfo("mnd end show subscriptions");
5,850!
1598

1599
  pShow->numOfRows += numOfRows;
5,850✔
1600
  return numOfRows;
5,850✔
1601

1602
END:
×
1603
  taosRUnLockLatch(&pSub->lock);
×
1604
  sdbRelease(pSdb, pSub);
×
1605

1606
  return code;
3✔
1607
}
1608

1609
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1610
  if (pMnode == NULL) {
×
1611
    return;
×
1612
  }
1613
  SSdb *pSdb = pMnode->pSdb;
×
1614
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1615
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc