• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.95
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
134✔
46
  int32_t  code = 0;
134✔
47
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
134✔
48
  MND_TMQ_NULL_CHECK(pCommitRaw);
134!
49
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
134✔
50
  if (code != 0) {
134!
51
    sdbFreeRaw(pCommitRaw);
×
52
    goto END;
×
53
  }
54
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
134✔
55

56
END:
134✔
57
  return code;
134✔
58
}
59

60
int32_t mndInitSubscribe(SMnode *pMnode) {
716✔
61
  SSdbTable table = {
716✔
62
      .sdbType = SDB_SUBSCRIBE,
63
      .keyType = SDB_KEY_BINARY,
64
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
65
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
66
      .insertFp = (SdbInsertFp)mndSubActionInsert,
67
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
68
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
69
  };
70

71
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
716✔
72
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
716✔
73
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
716✔
74
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
716✔
75
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
716✔
76

77
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
716✔
78
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
716✔
79

80
  return sdbSetTable(pMnode->pSdb, table);
716✔
81
}
82

83
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
66✔
84
  int32_t code = 0;
66✔
85
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
66!
86
  (*pSub)->dbUid = pTopic->dbUid;
66✔
87
  (*pSub)->stbUid = pTopic->stbUid;
66✔
88
  (*pSub)->subType = pTopic->subType;
66✔
89
  (*pSub)->withMeta = pTopic->withMeta;
66✔
90

91
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
66!
92
  return code;
66✔
93

94
END:
×
95
  tDeleteSubscribeObj(*pSub);
×
96
  taosMemoryFree(*pSub);
×
97
  return code;
×
98
}
99

100
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
259✔
101
                                    SSubplan *pPlan) {
102
  SMqRebVgReq req = {0};
259✔
103
  int32_t     code = 0;
259✔
104
  SEncoder encoder = {0};
259✔
105

106
  req.oldConsumerId = pRebVg->oldConsumerId;
259✔
107
  req.newConsumerId = pRebVg->newConsumerId;
259✔
108
  req.vgId = pRebVg->pVgEp->vgId;
259✔
109
  if (pPlan) {
259✔
110
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
241✔
111
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
241✔
112
    int32_t msgLen = 0;
241✔
113
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
241!
114
  } else {
115
    req.qmsg = taosStrdup("");
18✔
116
    MND_TMQ_NULL_CHECK(req.qmsg);
18!
117
  }
118
  req.subType = pSub->subType;
259✔
119
  req.withMeta = pSub->withMeta;
259✔
120
  req.suid = pSub->stbUid;
259✔
121
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
259✔
122

123
  int32_t tlen = 0;
259✔
124
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
259!
125
  if (code < 0) {
259!
126
    goto END;
×
127
  }
128

129
  tlen += sizeof(SMsgHead);
259✔
130
  void *buf = taosMemoryMalloc(tlen);
259✔
131
  MND_TMQ_NULL_CHECK(buf);
259!
132
  SMsgHead *pMsgHead = (SMsgHead *)buf;
259✔
133
  pMsgHead->contLen = htonl(tlen);
259✔
134
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
259✔
135

136
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
259✔
137
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
259!
138
  *pBuf = buf;
259✔
139
  *pLen = tlen;
259✔
140

141
END:
259✔
142
  tEncoderClear(&encoder);
259✔
143
  taosMemoryFree(req.qmsg);
259✔
144
  return code;
259✔
145
}
146

147
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
259✔
148
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
149
  int32_t code = 0;
259✔
150
  void   *buf  = NULL;
259✔
151

152
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
259!
153
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
154
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
155
    goto END;
×
156
  }
157

158
  int32_t tlen = 0;
259✔
159
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
259!
160
  int32_t vgId = pRebVg->pVgEp->vgId;
259✔
161
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
259✔
162
  if (pVgObj == NULL) {
259!
163
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
164
    goto END;
×
165
  }
166

167
  STransAction action = {0};
259✔
168
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
259✔
169
  action.pCont = buf;
259✔
170
  action.contLen = tlen;
259✔
171
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
259✔
172

173
  mndReleaseVgroup(pMnode, pVgObj);
259✔
174
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
259!
175
  return code;
259✔
176

177
END:
×
178
  taosMemoryFree(buf);
×
179
  return code;
×
180
}
181

182
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
401✔
183
  int32_t i = 0;
401✔
184
  while (key[i] != TMQ_SEPARATOR_CHAR) {
2,298✔
185
    i++;
1,897✔
186
  }
187
  (void)memcpy(cgroup, key, i);
401✔
188
  cgroup[i] = 0;
401✔
189
  if (fullName) {
401✔
190
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
396✔
191
  } else {
192
    while (key[i] != '.') {
15✔
193
      i++;
10✔
194
    }
195
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
5✔
196
  }
197
}
401✔
198

199
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
156✔
200
  int32_t code = 0;
156✔
201
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
156✔
202
  if (pRebInfo == NULL) {
156✔
203
    pRebInfo = tNewSMqRebSubscribe(key);
141✔
204
    if (pRebInfo == NULL) {
141!
205
      code = terrno;
×
206
      goto END;
×
207
    }
208
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
141✔
209
    taosMemoryFreeClear(pRebInfo);
141!
210
    if (code != 0) {
141!
211
      goto END;
×
212
    }
213
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
141✔
214
    MND_TMQ_NULL_CHECK(pRebInfo);
141!
215
  }
216
  if (pReb){
156!
217
    *pReb = pRebInfo;
156✔
218
  }
219

220
END:
×
221
  return code;
156✔
222
}
223

224
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
266✔
225
  int32_t         code = 0;
266✔
226
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
266✔
227
  MND_TMQ_NULL_CHECK(pVgEp);
266!
228
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
266✔
229
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
266!
230
  mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
266!
231
END:
×
232
  return code;
266✔
233
}
234

235
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
141✔
236
  int32_t code = 0;
141✔
237
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
141✔
238
  int32_t actualRemoved = 0;
141✔
239
  for (int32_t i = 0; i < numOfRemoved; i++) {
215✔
240
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
74✔
241
    MND_TMQ_NULL_CHECK(consumerId);
74!
242
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
74✔
243
    if (pConsumerEp == NULL) {
74!
244
      continue;
×
245
    }
246

247
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
74✔
248
    for (int32_t j = 0; j < consumerVgNum; j++) {
205✔
249
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
131!
250
    }
251

252
    taosArrayDestroy(pConsumerEp->vgs);
74✔
253
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
74!
254
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
148!
255
    actualRemoved++;
74✔
256
  }
257

258
  if (numOfRemoved != actualRemoved) {
141!
259
    mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
260
           actualRemoved);
261
  } else {
262
    mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
141!
263
  }
264
END:
×
265
  return code;
141✔
266
}
267

268
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
141✔
269
  int32_t code = 0;
141✔
270
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
141✔
271

272
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
223✔
273
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
82✔
274
    MND_TMQ_NULL_CHECK(consumerId);
82!
275
    SMqConsumerEp newConsumerEp = {0};
82✔
276
    newConsumerEp.consumerId = *consumerId;
82✔
277
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
82✔
278
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
82!
279
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
82!
280
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
164!
281
    mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
82!
282
  }
283
END:
141✔
284
  return code;
141✔
285
}
286

287
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
141✔
288
  int32_t code = 0;
141✔
289
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
141✔
290
  for (int32_t i = 0; i < numOfVgroups; i++) {
276✔
291
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
135!
292
  }
293
END:
141✔
294
  return code;
141✔
295
}
296

297
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
141✔
298
                                     int32_t remainderVgCnt) {
299
  int32_t code = 0;
141✔
300
  int32_t cnt = 0;
141✔
301
  void   *pIter = NULL;
141✔
302

303
  while (1) {
9✔
304
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
150✔
305
    if (pIter == NULL) {
150✔
306
      break;
141✔
307
    }
308

309
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
9✔
310
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
9✔
311

312
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
18!
313
    if (consumerVgNum > minVgCnt) {
9!
314
      if (cnt < remainderVgCnt) {
×
315
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
×
316
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
317
        }
318
        cnt++;
×
319
      } else {
320
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
×
321
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
322
        }
323
      }
324
    }
325
  }
326
END:
141✔
327
  return code;
141✔
328
}
329

330
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
141✔
331
  int32_t code = 0;
141✔
332
  int32_t totalVgNum = 0;
141✔
333
  SVgObj *pVgroup = NULL;
141✔
334
  SMqVgEp *pVgEp = NULL;
141✔
335
  void   *pIter = NULL;
141✔
336
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
141✔
337
  MND_TMQ_NULL_CHECK(newVgs);
141!
338
  while (1) {
339
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
671✔
340
    if (pIter == NULL) {
671✔
341
      break;
141✔
342
    }
343

344
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
530✔
345
      sdbRelease(pMnode->pSdb, pVgroup);
252✔
346
      continue;
252✔
347
    }
348

349
    totalVgNum++;
278✔
350
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
278✔
351
    MND_TMQ_NULL_CHECK(pVgEp);
278!
352
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
278✔
353
    pVgEp->vgId = pVgroup->vgId;
278✔
354
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
278!
355
    pVgEp = NULL;
278✔
356
    sdbRelease(pMnode->pSdb, pVgroup);
278✔
357
  }
358

359
  pIter = NULL;
141✔
360
  while (1) {
83✔
361
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
224✔
362
    if (pIter == NULL) break;
224✔
363
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
83✔
364
    int32_t j = 0;
83✔
365
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
226✔
366
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
143✔
367
      MND_TMQ_NULL_CHECK(pVgEpTmp);
143!
368
      bool     find = false;
143✔
369
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
155!
370
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
155✔
371
        MND_TMQ_NULL_CHECK(pnewVgEp);
155!
372
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
155✔
373
          tDeleteSMqVgEp(pnewVgEp);
143✔
374
          taosArrayRemove(newVgs, k);
143✔
375
          find = true;
143✔
376
          break;
143✔
377
        }
378
      }
379
      if (!find) {
143!
380
        mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
×
381
        tDeleteSMqVgEp(pVgEpTmp);
×
382
        taosArrayRemove(pConsumerEp->vgs, j);
×
383
        continue;
×
384
      }
385
      j++;
143✔
386
    }
387
  }
388

389
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
141!
390
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
×
391
    mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
×
392
    taosArrayDestroy(newVgs);
×
393
  } else {
394
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
141✔
395
  }
396
  return totalVgNum;
141✔
397

398
END:
×
399
  sdbRelease(pMnode->pSdb, pVgroup);
×
400
  taosMemoryFree(pVgEp);
×
401
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
402
  return code;
×
403
}
404

405
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
141✔
406
  SMqSubscribeObj *pSub = NULL;
141✔
407
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
141✔
408
  if( code != 0){
141✔
409
    return 0;
66✔
410
  }
411
  taosRLockLatch(&pSub->lock);
75✔
412
  if (pOutput->pSub->offsetRows == NULL) {
75✔
413
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
58✔
414
    if(pOutput->pSub->offsetRows == NULL) {
58!
415
      taosRUnLockLatch(&pSub->lock);
×
416
      code = terrno;
×
417
      goto END;
×
418
    }
419
  }
420
  void *pIter = NULL;
75✔
421
  while (1) {
83✔
422
    pIter = taosHashIterate(pSub->consumerHash, pIter);
158✔
423
    if (pIter == NULL) break;
158✔
424
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
83✔
425
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
83✔
426

427
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
214✔
428
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
131✔
429
      MND_TMQ_NULL_CHECK(d1);
131!
430
      bool        jump = false;
131✔
431
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
131!
UNCOV
432
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
×
UNCOV
433
        MND_TMQ_NULL_CHECK(pVgEp);
×
UNCOV
434
        if (pVgEp->vgId == d1->vgId) {
×
UNCOV
435
          jump = true;
×
UNCOV
436
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
×
437
                pConsumerEp->consumerId, pVgEp->vgId);
UNCOV
438
          break;
×
439
        }
440
      }
441
      if (jump) continue;
131!
442
      bool find = false;
131✔
443
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
245✔
444
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
133✔
445
        MND_TMQ_NULL_CHECK(d2);
133!
446
        if (d1->vgId == d2->vgId) {
133✔
447
          d2->rows += d1->rows;
19✔
448
          d2->offset = d1->offset;
19✔
449
          d2->ever = d1->ever;
19✔
450
          find = true;
19✔
451
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
19!
452
          break;
19✔
453
        }
454
      }
455
      if (!find) {
131✔
456
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
224!
457
      }
458
    }
459
  }
460
  taosRUnLockLatch(&pSub->lock);
75✔
461
  mndReleaseSubscribe(pMnode, pSub);
75✔
462

463
END:
75✔
464
  return code;
75✔
465
}
466

467
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
141✔
468
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
141!
469
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
407✔
470
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
266✔
471
    if (pOutputRebVg == NULL) continue;
266!
472
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
266!
473
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
474
  }
475

476
  void *pIter = NULL;
141✔
477
  while (1) {
91✔
478
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
232✔
479
    if (pIter == NULL) break;
232✔
480
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
91✔
481
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
91✔
482
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
91!
483
          pConsumerEp->consumerId, sz);
484
    for (int32_t i = 0; i < sz; i++) {
253✔
485
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
162✔
486
      if (pVgEp == NULL) continue;
162!
487
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
162!
488
            pConsumerEp->consumerId);
489
    }
490
  }
491
}
141✔
492

493
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
141✔
494
                           int32_t *remainderVgCnt) {
495
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
141✔
496
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
141✔
497
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
141✔
498

499
  // calc num
500
  if (numOfFinal != 0) {
141✔
501
    *minVgCnt = totalVgNum / numOfFinal;
79✔
502
    *remainderVgCnt = totalVgNum % numOfFinal;
79✔
503
  } else {
504
    mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey);
62!
505
  }
506
  mInfo(
141!
507
      "[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
508
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
509
}
141✔
510

511
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
141✔
512
  SMqRebOutputVg *pRebVg = NULL;
141✔
513
  void           *pAssignIter = NULL;
141✔
514
  void           *pIter = NULL;
141✔
515
  int32_t         code = 0;
141✔
516

517
  while (1) {
91✔
518
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
232✔
519
    if (pIter == NULL) {
232✔
520
      break;
141✔
521
    }
522
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
91✔
523
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
238✔
524
      pAssignIter = taosHashIterate(pHash, pAssignIter);
147✔
525
      if (pAssignIter == NULL) {
147!
526
        mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
527
        break;
×
528
      }
529

530
      pRebVg = (SMqRebOutputVg *)pAssignIter;
147✔
531
      pRebVg->newConsumerId = pConsumerEp->consumerId;
147✔
532
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
294!
533
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
147!
534
            pConsumerEp->consumerId);
535
    }
536
  }
537

538
  while (1) {
3✔
539
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
144✔
540
    if (pIter == NULL) {
144✔
541
      break;
62✔
542
    }
543
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
82✔
544
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
82!
545
      pAssignIter = taosHashIterate(pHash, pAssignIter);
82✔
546
      if (pAssignIter == NULL) {
82✔
547
        mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key);
79!
548
        break;
79✔
549
      }
550

551
      pRebVg = (SMqRebOutputVg *)pAssignIter;
3✔
552
      pRebVg->newConsumerId = pConsumerEp->consumerId;
3✔
553
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
6!
554
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
3!
555
            pConsumerEp->consumerId);
556
    }
557
  }
558

559
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
141✔
560
  if (pAssignIter != NULL) {
141!
561
    mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
562
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
563
    goto END;
×
564
  }
565
  while (1) {
266✔
566
    pAssignIter = taosHashIterate(pHash, pAssignIter);
407✔
567
    if (pAssignIter == NULL) {
407✔
568
      break;
141✔
569
    }
570

571
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
266✔
572
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
532!
573
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
266✔
574
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
232!
575
    }
576
  }
577

578
END:
141✔
579
  return code;
141✔
580
}
581

582
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
141✔
583
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
141✔
584
  if (totalVgNum < 0){
141!
585
    return totalVgNum;
×
586
  }
587
  const char *pSubKey = pOutput->pSub->key;
141✔
588
  int32_t     minVgCnt = 0;
141✔
589
  int32_t     remainderVgCnt = 0;
141✔
590
  int32_t     code = 0;
141✔
591
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
141✔
592
  MND_TMQ_NULL_CHECK(pHash);
141!
593
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
141!
594
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
141!
595
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
141✔
596
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
141!
597
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
141!
598
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
141!
599
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
141!
600
  printRebalanceLog(pOutput);
141✔
601
  taosHashCleanup(pHash);
141✔
602

603
END:
141✔
604
  return code;
141✔
605
}
606

607
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
402✔
608
  int32_t         code = 0;
402✔
609
  SMqConsumerObj *pConsumerNew = NULL;
402✔
610
  int32_t         consumerNum = taosArrayGetSize(consumers);
402✔
611
  for (int32_t i = 0; i < consumerNum; i++) {
560✔
612
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
158✔
613
    MND_TMQ_NULL_CHECK(consumerId);
158!
614
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
158!
615
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
158!
616
    tDeleteSMqConsumerObj(pConsumerNew);
158✔
617
  }
618
  pConsumerNew = NULL;
402✔
619

620
END:
402✔
621
  tDeleteSMqConsumerObj(pConsumerNew);
402✔
622
  return code;
402✔
623
}
624

625
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
134✔
626
  int32_t code = 0;
134✔
627
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
134!
628
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
134!
629
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
134!
630
END:
134✔
631
  return code;
134✔
632
}
633

634
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
141✔
635
  struct SSubplan *pPlan = NULL;
141✔
636
  int32_t          code = 0;
141✔
637
  STrans          *pTrans = NULL;
141✔
638

639
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
141✔
640
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
124!
641
  }
642

643
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
141✔
644
  char cgroup[TSDB_CGROUP_LEN] = {0};
141✔
645
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
141✔
646

647
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
141✔
648
  if (pTrans == NULL) {
141!
649
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
650
    if (terrno != 0) code = terrno;
×
651
    goto END;
×
652
  }
653

654
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
141✔
655
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
141✔
656

657
  // 1. redo action: action to all vg
658
  const SArray *rebVgs = pOutput->rebVgs;
134✔
659
  int32_t       vgNum = taosArrayGetSize(rebVgs);
134✔
660
  for (int32_t i = 0; i < vgNum; i++) {
393✔
661
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
259✔
662
    MND_TMQ_NULL_CHECK(pRebVg);
259!
663
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
259!
664
  }
665

666
  // 2. commit log: subscribe and vg assignment
667
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
134!
668

669
  // 3. commit log: consumer to update status and epoch
670
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
134!
671

672
  // 4. set cb
673
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
134✔
674

675
  // 5. execution
676
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
134!
677

678
END:
134✔
679
  nodesDestroyNode((SNode *)pPlan);
141✔
680
  mndTransDrop(pTrans);
141✔
681
  TAOS_RETURN(code);
141✔
682
}
683

684
static void freeRebalanceItem(void *param) {
141✔
685
  SMqRebInfo *pInfo = param;
141✔
686
  taosArrayDestroy(pInfo->newConsumers);
141✔
687
  taosArrayDestroy(pInfo->removedConsumers);
141✔
688
}
141✔
689

690
// type = 0 remove  type = 1 add
691
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) {
192✔
692
  int32_t code = 0;
192✔
693
  int32_t topicNum = taosArrayGetSize(topicList);
192✔
694
  for (int32_t i = 0; i < topicNum; i++) {
348✔
695
    char *removedTopic = taosArrayGetP(topicList, i);
156✔
696
    MND_TMQ_NULL_CHECK(removedTopic);
156!
697
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
156✔
698
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", group, TMQ_SEPARATOR, removedTopic);
156✔
699
    SMqRebInfo *pRebSub = NULL;
156✔
700
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
156!
701
    if (type == 0)
156✔
702
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &consumerId));
148!
703
    else if (type == 1)
82!
704
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &consumerId));
164!
705
  }
706

707
END:
192✔
708
  return code;
192✔
709
}
710

711
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
40✔
712
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
40✔
713
  for (int32_t i = 0; i < newTopicNum; i++) {
102✔
714
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
62✔
715
    if (topic == NULL){
62!
716
      continue;
×
717
    }
718
    SMqSubscribeObj *pSub = NULL;
62✔
719
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
62✔
720
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
62✔
721
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
62✔
722
    if (code != 0) {
62!
723
      continue;
×
724
    }
725
    taosRLockLatch(&pSub->lock);
62✔
726

727
    // iterate all vg assigned to the consumer of that topic
728
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
62✔
729
    if (pConsumerEp == NULL){
62!
730
      taosRUnLockLatch(&pSub->lock);
×
731
      mndReleaseSubscribe(pMnode, pSub);
×
732
      continue;
×
733
    }
734
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
62✔
735
    for (int32_t j = 0; j < vgNum; j++) {
190✔
736
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
128✔
737
      if (pVgEp == NULL) {
128!
738
        continue;
×
739
      }
740
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
128✔
741
      if (!pVgroup) {
128!
742
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
×
743
        if (code != 0){
×
744
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
745
        }else{
746
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
×
747
        }
748
      }
749
      mndReleaseVgroup(pMnode, pVgroup);
128✔
750
    }
751
    taosRUnLockLatch(&pSub->lock);
62✔
752
    mndReleaseSubscribe(pMnode, pSub);
62✔
753
  }
754
}
40✔
755

756
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
21,481✔
757
  SMnode         *pMnode = pMsg->info.node;
21,481✔
758
  SSdb           *pSdb = pMnode->pSdb;
21,481✔
759
  SMqConsumerObj *pConsumer = NULL;
21,481✔
760
  void           *pIter = NULL;
21,481✔
761
  int32_t         code = 0;
21,481✔
762

763
  // iterate all consumers, find all modification
764
  while (1) {
168✔
765
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
21,649✔
766
    if (pIter == NULL) {
21,649✔
767
      break;
21,481✔
768
    }
769

770
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
168✔
771
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
168✔
772
    int32_t status = atomic_load_32(&pConsumer->status);
168✔
773

774
    mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
168!
775
           ", hbstatus:%d, pollStatus:%d",
776
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
777
           pConsumer->createTime, hbStatus, pollStatus);
778

779
    if (status == MQ_CONSUMER_STATUS_READY) {
168✔
780
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
72✔
781
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
32!
782
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
40!
783
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
40!
784
        taosRLockLatch(&pConsumer->lock);
×
785
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
×
786
        taosRUnLockLatch(&pConsumer->lock);
×
787
      } else {
788
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
40✔
789
      }
790
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
96!
791
      taosRLockLatch(&pConsumer->lock);
96✔
792
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
96!
793
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
96!
794
      taosRUnLockLatch(&pConsumer->lock);
96✔
795
    } else {
796
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
797
    }
798

799
    mndReleaseConsumer(pMnode, pConsumer);
168✔
800
  }
801
END:
21,481✔
802
  return code;
21,481✔
803
}
804

805
bool mndRebTryStart() {
21,481✔
806
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
21,481✔
807
  if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0;
21,481!
808
}
809

810
void mndRebCntInc() {
136✔
811
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
136✔
812
  if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
136!
813
}
136✔
814

815
void mndRebCntDec() {
21,617✔
816
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
21,617✔
817
  if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
21,617!
818
}
21,617✔
819

820
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
141✔
821
  taosArrayDestroy(rebOutput->newConsumers);
141✔
822
  taosArrayDestroy(rebOutput->modifyConsumers);
141✔
823
  taosArrayDestroy(rebOutput->removedConsumers);
141✔
824
  taosArrayDestroy(rebOutput->rebVgs);
141✔
825
  tDeleteSubscribeObj(rebOutput->pSub);
141✔
826
  taosMemoryFree(rebOutput->pSub);
141✔
827
}
141✔
828

829
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
141✔
830
  int32_t code = 0;
141✔
831
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
141✔
832
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
141!
833
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
141✔
834
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
141!
835
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
141✔
836
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
141!
837
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
141✔
838
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
141!
839
  return code;
141✔
840

841
END:
×
842
  clearRebOutput(rebOutput);
×
843
  return code;
×
844
}
845

846
// This function only works when there are dirty consumers
847
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
75✔
848
  int32_t code = 0;
75✔
849
  void   *pIter = NULL;
75✔
850
  while (1) {
83✔
851
    pIter = taosHashIterate(pSub->consumerHash, pIter);
158✔
852
    if (pIter == NULL) {
158✔
853
      break;
75✔
854
    }
855

856
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
83✔
857
    SMqConsumerObj *pConsumer = NULL;
83✔
858
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
83✔
859
    if (code == 0) {
83!
860
      mndReleaseConsumer(pMnode, pConsumer);
83✔
861
      continue;
83✔
862
    }
863
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
864
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
865

866
    taosArrayDestroy(pConsumerEp->vgs);
×
867
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
868
  }
869
END:
75✔
870
  return code;
75✔
871
}
872

873
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
141✔
874
  const char      *key = rebInput->pRebInfo->key;
141✔
875
  SMqSubscribeObj *pSub = NULL;
141✔
876
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
141✔
877

878
  if (code != 0) {
141✔
879
    // split sub key and extract topic
880
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
66✔
881
    char cgroup[TSDB_CGROUP_LEN] = {0};
66✔
882
    mndSplitSubscribeKey(key, topic, cgroup, true);
66✔
883
    SMqTopicObj *pTopic = NULL;
66✔
884
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
66!
885
    taosRLockLatch(&pTopic->lock);
66✔
886

887
    rebInput->oldConsumerNum = 0;
66✔
888
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
66✔
889
    if (code != 0) {
66!
890
      mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
891
      taosRUnLockLatch(&pTopic->lock);
×
892
      mndReleaseTopic(pMnode, pTopic);
×
893
      return code;
×
894
    }
895

896
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
66✔
897
    taosRUnLockLatch(&pTopic->lock);
66✔
898
    mndReleaseTopic(pMnode, pTopic);
66✔
899

900
    mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
66!
901
  } else {
902
    taosRLockLatch(&pSub->lock);
75✔
903
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
75✔
904
    if(code != 0){
75!
905
      taosRUnLockLatch(&pSub->lock);
×
906
      goto END;
×
907
    }
908
    code = checkConsumer(pMnode, rebOutput->pSub);
75✔
909
    if(code != 0){
75!
910
      taosRUnLockLatch(&pSub->lock);
×
911
      goto END;
×
912
    }
913
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
75✔
914
    taosRUnLockLatch(&pSub->lock);
75✔
915

916
    mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
75!
917
    mndReleaseSubscribe(pMnode, pSub);
75✔
918
  }
919

920
END:
141✔
921
  return code;
141✔
922
}
923

924
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
21,481✔
925
  int     code = 0;
21,481✔
926
  void   *pIter = NULL;
21,481✔
927
  SMnode *pMnode = pMsg->info.node;
21,481✔
928
  mDebug("[rebalance] start to process mq timer");
21,481✔
929
  if (!mndRebTryStart()) {
21,481!
930
    mInfo("[rebalance] mq rebalance already in progress, do nothing");
×
931
    return code;
×
932
  }
933

934
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
21,481✔
935
  MND_TMQ_NULL_CHECK(rebSubHash);
21,481!
936

937
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
21,481✔
938

939
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
21,481!
940
  if (taosHashGetSize(rebSubHash) > 0) {
21,481✔
941
    mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
89!
942
  }
943

944
  while (1) {
141✔
945
    pIter = taosHashIterate(rebSubHash, pIter);
21,622✔
946
    if (pIter == NULL) {
21,622✔
947
      break;
21,481✔
948
    }
949

950
    SMqRebInputObj  rebInput = {0};
141✔
951
    SMqRebOutputObj rebOutput = {0};
141✔
952
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
141!
953
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
141✔
954
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
141✔
955
    if (code != 0) {
141!
956
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
957
    }
958

959
    if (code == 0){
141!
960
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
141✔
961
      if (code != 0) {
141!
962
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
963
      }
964
    }
965

966
    if (code == 0){
141!
967
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
141✔
968
      if (code != 0) {
141✔
969
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
7!
970
      }
971
    }
972

973
    clearRebOutput(&rebOutput);
141✔
974
  }
975

976
  if (taosHashGetSize(rebSubHash) > 0) {
21,481✔
977
    mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
89!
978
  }
979

980
END:
21,392✔
981
  taosHashCancelIterate(rebSubHash, pIter);
21,481✔
982
  taosHashCleanup(rebSubHash);
21,481✔
983
  mndRebCntDec();
21,481✔
984

985
  TAOS_RETURN(code);
21,481✔
986
}
987

988
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
5✔
989
  void   *pIter = NULL;
5✔
990
  SVgObj *pVgObj = NULL;
5✔
991
  int32_t code = 0;
5✔
992
  while (1) {
12✔
993
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
17✔
994
    if (pIter == NULL) {
17✔
995
      break;
5✔
996
    }
997

998
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
12✔
999
      sdbRelease(pMnode->pSdb, pVgObj);
7✔
1000
      continue;
7✔
1001
    }
1002
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
5✔
1003
    MND_TMQ_NULL_CHECK(pReq);
5!
1004
    pReq->head.vgId = htonl(pVgObj->vgId);
5✔
1005
    pReq->vgId = pVgObj->vgId;
5✔
1006
    pReq->consumerId = -1;
5✔
1007
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
5✔
1008

1009
    STransAction action = {0};
5✔
1010
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
5✔
1011
    action.pCont = pReq;
5✔
1012
    action.contLen = sizeof(SMqVDeleteReq);
5✔
1013
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
5✔
1014
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
5✔
1015

1016
    sdbRelease(pMnode->pSdb, pVgObj);
5✔
1017
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
5!
1018
  }
1019

1020
END:
5✔
1021
  sdbRelease(pMnode->pSdb, pVgObj);
5✔
1022
  sdbCancelFetch(pMnode->pSdb, pIter);
5✔
1023
  return code;
5✔
1024
}
1025

1026
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
×
1027
  void           *pIter = NULL;
×
1028
  SMqConsumerObj *pConsumer = NULL;
×
1029
  int             code = 0;
×
1030
  while (1) {
×
1031
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
×
1032
    if (pIter == NULL) {
×
1033
      break;
×
1034
    }
1035

1036
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
×
1037
      sdbRelease(pMnode->pSdb, pConsumer);
×
1038
      continue;
×
1039
    }
1040

1041
    bool found = checkTopic(pConsumer->assignedTopics, topic);
×
1042
    if (found){
×
1043
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1044
             topic, pConsumer->consumerId, pConsumer->cgroup);
1045
      code = TSDB_CODE_MND_CGROUP_USED;
×
1046
      goto END;
×
1047
    }
1048

1049
    sdbRelease(pMnode->pSdb, pConsumer);
×
1050
  }
1051

1052
END:
×
1053
  sdbRelease(pMnode->pSdb, pConsumer);
×
1054
  sdbCancelFetch(pMnode->pSdb, pIter);
×
1055
  return code;
×
1056
}
1057

1058
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
×
1059
  SMnode         *pMnode = pMsg->info.node;
×
1060
  SMDropCgroupReq dropReq = {0};
×
1061
  STrans         *pTrans = NULL;
×
1062
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1063
  SMqSubscribeObj *pSub = NULL;
×
1064

1065
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
×
1066
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
1067
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
×
1068
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
1069
  if (code != 0) {
×
1070
    if (dropReq.igNotExists) {
×
1071
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1072
      return 0;
×
1073
    } else {
1074
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1075
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1076
      return code;
×
1077
    }
1078
  }
1079

1080
  taosWLockLatch(&pSub->lock);
×
1081
  if (taosHashGetSize(pSub->consumerHash) != 0) {
×
1082
    code = TSDB_CODE_MND_CGROUP_USED;
×
1083
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1084
    goto END;
×
1085
  }
1086

1087
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
×
1088
  MND_TMQ_NULL_CHECK(pTrans);
×
1089
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
×
1090
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
×
1091
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
1092
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
1093
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
×
1094
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
1095
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
1096

1097
END:
×
1098
  taosWUnLockLatch(&pSub->lock);
×
1099
  mndReleaseSubscribe(pMnode, pSub);
×
1100
  mndTransDrop(pTrans);
×
1101

1102
  if (code != 0) {
×
1103
    mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
×
1104
    TAOS_RETURN(code);
×
1105
  }
1106
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
×
1107
}
1108

1109
void mndCleanupSubscribe(SMnode *pMnode) {}
715✔
1110

1111
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
203✔
1112
  int32_t code = 0;
203✔
1113
  int32_t lino = 0;
203✔
1114
  terrno = TSDB_CODE_OUT_OF_MEMORY;
203✔
1115
  void   *buf = NULL;
203✔
1116
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
203✔
1117
  if (tlen <= 0) goto SUB_ENCODE_OVER;
203!
1118
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
203✔
1119

1120
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
203✔
1121
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
203!
1122

1123
  buf = taosMemoryMalloc(tlen);
203✔
1124
  if (buf == NULL) goto SUB_ENCODE_OVER;
203!
1125

1126
  void *abuf = buf;
203✔
1127
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
203!
1128
    goto SUB_ENCODE_OVER;
×
1129
  }
1130

1131
  int32_t dataPos = 0;
203✔
1132
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
203!
1133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
203!
1134
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
203!
1135
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
203!
1136

1137
  terrno = TSDB_CODE_SUCCESS;
203✔
1138

1139
SUB_ENCODE_OVER:
203✔
1140
  taosMemoryFreeClear(buf);
203!
1141
  if (terrno != TSDB_CODE_SUCCESS) {
203!
1142
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1143
    sdbFreeRaw(pRaw);
×
1144
    return NULL;
×
1145
  }
1146

1147
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
203!
1148
  return pRaw;
203✔
1149
}
1150

1151
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
142✔
1152
  int32_t code = 0;
142✔
1153
  int32_t lino = 0;
142✔
1154
  terrno = TSDB_CODE_OUT_OF_MEMORY;
142✔
1155
  SSdbRow         *pRow = NULL;
142✔
1156
  SMqSubscribeObj *pSub = NULL;
142✔
1157
  void            *buf = NULL;
142✔
1158

1159
  int8_t sver = 0;
142✔
1160
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
142!
1161

1162
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
142!
1163
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1164
    goto SUB_DECODE_OVER;
×
1165
  }
1166

1167
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
142✔
1168
  if (pRow == NULL) goto SUB_DECODE_OVER;
142!
1169

1170
  pSub = sdbGetRowObj(pRow);
142✔
1171
  if (pSub == NULL) goto SUB_DECODE_OVER;
142!
1172

1173
  int32_t dataPos = 0;
142✔
1174
  int32_t tlen;
1175
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
142!
1176
  buf = taosMemoryMalloc(tlen);
142✔
1177
  if (buf == NULL) goto SUB_DECODE_OVER;
142!
1178
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
142!
1179
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
142!
1180

1181
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
142!
1182
    goto SUB_DECODE_OVER;
×
1183
  }
1184

1185
  // update epset saved in mnode
1186
  if (pSub->unassignedVgs != NULL) {
142!
1187
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
142✔
1188
    for (int32_t i = 0; i < size; ++i) {
257✔
1189
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
115✔
1190
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
115✔
1191
    }
1192
  }
1193
  if (pSub->consumerHash != NULL) {
142!
1194
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
142✔
1195
    while (pIter) {
235✔
1196
      SMqConsumerEp *pConsumerEp = pIter;
93✔
1197
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
93✔
1198
      for (int32_t i = 0; i < size; ++i) {
263✔
1199
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
170✔
1200
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
170✔
1201
      }
1202
      pIter = taosHashIterate(pSub->consumerHash, pIter);
93✔
1203
    }
1204
  }
1205

1206
  terrno = TSDB_CODE_SUCCESS;
142✔
1207

1208
SUB_DECODE_OVER:
142✔
1209
  taosMemoryFreeClear(buf);
142!
1210
  if (terrno != TSDB_CODE_SUCCESS) {
142!
1211
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1212
    taosMemoryFreeClear(pRow);
×
1213
    return NULL;
×
1214
  }
1215

1216
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
142!
1217
  return pRow;
142✔
1218
}
1219

1220
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
69✔
1221
  mTrace("subscribe:%s, perform insert action", pSub->key);
69!
1222
  return 0;
69✔
1223
}
1224

1225
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
142✔
1226
  mTrace("subscribe:%s, perform delete action", pSub->key);
142!
1227
  tDeleteSubscribeObj(pSub);
142✔
1228
  return 0;
142✔
1229
}
1230

1231
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
68✔
1232
  mTrace("subscribe:%s, perform update action", pOldSub->key);
68!
1233
  taosWLockLatch(&pOldSub->lock);
68✔
1234

1235
  SHashObj *tmp = pOldSub->consumerHash;
68✔
1236
  pOldSub->consumerHash = pNewSub->consumerHash;
68✔
1237
  pNewSub->consumerHash = tmp;
68✔
1238

1239
  SArray *tmp1 = pOldSub->unassignedVgs;
68✔
1240
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
68✔
1241
  pNewSub->unassignedVgs = tmp1;
68✔
1242

1243
  SArray *tmp2 = pOldSub->offsetRows;
68✔
1244
  pOldSub->offsetRows = pNewSub->offsetRows;
68✔
1245
  pNewSub->offsetRows = tmp2;
68✔
1246

1247
  taosWUnLockLatch(&pOldSub->lock);
68✔
1248
  return 0;
68✔
1249
}
1250

1251
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
599✔
1252
  SSdb            *pSdb = pMnode->pSdb;
599✔
1253
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
599✔
1254
  if (*pSub == NULL) {
599✔
1255
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
132✔
1256
  }
1257
  return 0;
467✔
1258
}
1259

1260
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
82✔
1261
  int32_t num = 0;
82✔
1262
  SSdb   *pSdb = pMnode->pSdb;
82✔
1263

1264
  void            *pIter = NULL;
82✔
1265
  SMqSubscribeObj *pSub = NULL;
82✔
1266
  while (1) {
184✔
1267
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
266✔
1268
    if (pIter == NULL) break;
266✔
1269

1270
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
184✔
1271
    char cgroup[TSDB_CGROUP_LEN] = {0};
184✔
1272
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
184✔
1273
    if (strcmp(topic, topicName) != 0) {
184✔
1274
      sdbRelease(pSdb, pSub);
180✔
1275
      continue;
180✔
1276
    }
1277

1278
    num++;
4✔
1279
    sdbRelease(pSdb, pSub);
4✔
1280
  }
1281

1282
  return num;
82✔
1283
}
1284

1285
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
467✔
1286
  SSdb *pSdb = pMnode->pSdb;
467✔
1287
  sdbRelease(pSdb, pSub);
467✔
1288
}
467✔
1289

1290
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
5✔
1291
  int32_t  code = 0;
5✔
1292
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
5✔
1293
  MND_TMQ_NULL_CHECK(pCommitRaw);
5!
1294
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
5✔
1295
  if (code != 0){
5!
1296
    sdbFreeRaw(pCommitRaw);
×
1297
    goto END;
×
1298
  }
1299
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
5✔
1300
END:
5✔
1301
  return code;
5✔
1302
}
1303

1304
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
9✔
1305
  SSdb            *pSdb = pMnode->pSdb;
9✔
1306
  int32_t          code = 0;
9✔
1307
  void            *pIter = NULL;
9✔
1308
  SMqSubscribeObj *pSub = NULL;
9✔
1309
  while (1) {
5✔
1310
    sdbRelease(pSdb, pSub);
14✔
1311
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
14✔
1312
    if (pIter == NULL) break;
14✔
1313

1314
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
5✔
1315
    char cgroup[TSDB_CGROUP_LEN] = {0};
5✔
1316
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
5✔
1317
    if (strcmp(topic, topicName) != 0) {
5!
1318
      continue;
×
1319
    }
1320

1321
    // iter all vnode to delete handle
1322
    if (taosHashGetSize(pSub->consumerHash) != 0) {
5!
1323
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1324
      goto END;
×
1325
    }
1326

1327
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
5!
1328
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
5!
1329
  }
1330

1331
END:
9✔
1332
  sdbRelease(pSdb, pSub);
9✔
1333
  sdbCancelFetch(pSdb, pIter);
9✔
1334

1335
  TAOS_RETURN(code);
9✔
1336
}
1337

1338
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
5✔
1339
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1340
  int32_t code = 0;
5✔
1341
  int32_t sz = taosArrayGetSize(vgs);
5✔
1342
  for (int32_t j = 0; j < sz; j++) {
10✔
1343
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
5✔
1344
    MND_TMQ_NULL_CHECK(pVgEp);
5!
1345

1346
    SColumnInfoData *pColInfo = NULL;
5✔
1347
    int32_t          cols = 0;
5✔
1348

1349
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1350
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1351
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
5!
1352

1353
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1354
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1355
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
5!
1356

1357
    // vg id
1358
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1359
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1360
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
5!
1361

1362
    // consumer id
1363
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
5✔
1364
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
5✔
1365
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
5✔
1366

1367
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1368
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1369
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
5!
1370

1371
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1372
    if (user) STR_TO_VARSTR(userStr, user);
5!
1373
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1374
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1375
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
5!
1376

1377
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1378
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
5!
1379
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1380
    MND_TMQ_NULL_CHECK(pColInfo);
5!
1381
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
5!
1382

1383
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
5!
1384
          varDataVal(cgroup), pVgEp->vgId);
1385

1386
    // offset
1387
    OffsetRows *data = NULL;
5✔
1388
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
10✔
1389
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
5✔
1390
      MND_TMQ_NULL_CHECK(tmp);
5!
1391
      if (tmp->vgId != pVgEp->vgId) {
5!
UNCOV
1392
        mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
×
UNCOV
1393
        continue;
×
1394
      }
1395
      data = tmp;
5✔
1396
    }
1397
    if (data) {
5!
1398
      // vg id
1399
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
5✔
1400
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
5✔
1401
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
5✔
1402
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
5✔
1403
      varDataSetLen(buf, strlen(varDataVal(buf)));
5✔
1404
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1405
      MND_TMQ_NULL_CHECK(pColInfo);
5!
1406
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
5!
1407
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
5✔
1408
      MND_TMQ_NULL_CHECK(pColInfo);
5!
1409
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
5!
1410
    } else {
1411
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1412
      MND_TMQ_NULL_CHECK(pColInfo);
×
1413
      colDataSetNULL(pColInfo, *numOfRows);
×
1414
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1415
      MND_TMQ_NULL_CHECK(pColInfo);
×
1416
      colDataSetNULL(pColInfo, *numOfRows);
×
1417
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
×
1418
    }
1419
    (*numOfRows)++;
5✔
1420
  }
1421
  return 0;
5✔
1422
END:
×
1423
  return code;
×
1424
}
1425

1426
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
19✔
1427
  SMnode          *pMnode = pReq->info.node;
19✔
1428
  SSdb            *pSdb = pMnode->pSdb;
19✔
1429
  int32_t          numOfRows = 0;
19✔
1430
  SMqSubscribeObj *pSub = NULL;
19✔
1431
  int32_t          code = 0;
19✔
1432

1433
  mInfo("mnd show subscriptions begin");
19!
1434

1435
  while (numOfRows < rowsCapacity) {
24!
1436
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
24✔
1437
    if (pShow->pIter == NULL) {
24✔
1438
      break;
19✔
1439
    }
1440

1441
    taosRLockLatch(&pSub->lock);
5✔
1442

1443
    if (numOfRows + pSub->vgNum > rowsCapacity) {
5!
1444
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1445
    }
1446

1447
    // topic and cgroup
1448
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1449
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
5✔
1450
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
5✔
1451
    varDataSetLen(topic, strlen(varDataVal(topic)));
5✔
1452
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
5✔
1453

1454
    SMqConsumerEp *pConsumerEp = NULL;
5✔
1455
    void          *pIter = NULL;
5✔
1456

1457
    while (1) {
×
1458
      pIter = taosHashIterate(pSub->consumerHash, pIter);
5✔
1459
      if (pIter == NULL) break;
5!
1460
      pConsumerEp = (SMqConsumerEp *)pIter;
×
1461

1462
      char          *user = NULL;
×
1463
      char          *fqdn = NULL;
×
1464
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
×
1465
      if (pConsumer != NULL) {
×
1466
        user = pConsumer->user;
×
1467
        fqdn = pConsumer->fqdn;
×
1468
        sdbRelease(pSdb, pConsumer);
×
1469
      }
1470
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
×
1471
                  pConsumerEp->offsetRows));
1472
    }
1473

1474
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
5!
1475

1476
    pBlock->info.rows = numOfRows;
5✔
1477

1478
    taosRUnLockLatch(&pSub->lock);
5✔
1479
    sdbRelease(pSdb, pSub);
5✔
1480
  }
1481

1482
  mInfo("mnd end show subscriptions");
19!
1483

1484
  pShow->numOfRows += numOfRows;
19✔
1485
  return numOfRows;
19✔
1486

1487
END:
×
1488
  return code;
×
1489
}
1490

1491
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1492
  SSdb *pSdb = pMnode->pSdb;
×
1493
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1494
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc