• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.14
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndScheduler.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tcompare.h"
25
#include "tname.h"
26

27
#define MND_SUBSCRIBE_VER_NUMBER   3
28
#define MND_SUBSCRIBE_RESERVE_SIZE 64
29

30
//#define MND_CONSUMER_LOST_HB_CNT          6
31

32
static int32_t mqRebInExecCnt = 0;
33

34
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
35
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
36
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
37
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
39
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
40
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
41
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
42
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
43
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
44

UNCOV
45
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
×
UNCOV
46
  if (pTrans == NULL || pSub == NULL) {
×
47
    return TSDB_CODE_INVALID_PARA;
×
48
  }
UNCOV
49
  int32_t  code = 0;
×
UNCOV
50
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
×
UNCOV
51
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
UNCOV
52
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
UNCOV
53
  if (code != 0) {
×
54
    sdbFreeRaw(pCommitRaw);
×
55
    goto END;
×
56
  }
UNCOV
57
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
×
58

UNCOV
59
END:
×
UNCOV
60
  return code;
×
61
}
62

63
int32_t mndInitSubscribe(SMnode *pMnode) {
13✔
64
  SSdbTable table = {
13✔
65
      .sdbType = SDB_SUBSCRIBE,
66
      .keyType = SDB_KEY_BINARY,
67
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
68
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
69
      .insertFp = (SdbInsertFp)mndSubActionInsert,
70
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
71
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
72
  };
73

74
  if (pMnode == NULL) {
13!
75
    return TSDB_CODE_INVALID_PARA;
×
76
  }
77
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
13✔
78
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
13✔
79
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
13✔
80
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
13✔
81
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
13✔
82

83
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
13✔
84
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
13✔
85

86
  return sdbSetTable(pMnode->pSdb, table);
13✔
87
}
88

UNCOV
89
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey, SMqSubscribeObj** pSub) {
×
UNCOV
90
  if(pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
×
91
    return TSDB_CODE_INVALID_PARA;
×
92
  }
UNCOV
93
  int32_t code = 0;
×
UNCOV
94
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
×
UNCOV
95
  (*pSub)->dbUid = pTopic->dbUid;
×
UNCOV
96
  (*pSub)->stbUid = pTopic->stbUid;
×
UNCOV
97
  (*pSub)->subType = pTopic->subType;
×
UNCOV
98
  (*pSub)->withMeta = pTopic->withMeta;
×
99

UNCOV
100
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
×
UNCOV
101
  return code;
×
102

103
END:
×
104
  tDeleteSubscribeObj(*pSub);
×
105
  taosMemoryFree(*pSub);
×
106
  return code;
×
107
}
108

UNCOV
109
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg,
×
110
                                    SSubplan *pPlan) {
UNCOV
111
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
×
112
    return TSDB_CODE_INVALID_PARA;
×
113
  }
UNCOV
114
  SMqRebVgReq req = {0};
×
UNCOV
115
  int32_t     code = 0;
×
UNCOV
116
  SEncoder encoder = {0};
×
117

UNCOV
118
  req.oldConsumerId = pRebVg->oldConsumerId;
×
UNCOV
119
  req.newConsumerId = pRebVg->newConsumerId;
×
UNCOV
120
  req.vgId = pRebVg->pVgEp->vgId;
×
UNCOV
121
  if (pPlan) {
×
UNCOV
122
    pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
×
UNCOV
123
    pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
×
UNCOV
124
    int32_t msgLen = 0;
×
UNCOV
125
    MND_TMQ_RETURN_CHECK(qSubPlanToString(pPlan, &req.qmsg, &msgLen));
×
126
  } else {
UNCOV
127
    req.qmsg = taosStrdup("");
×
UNCOV
128
    MND_TMQ_NULL_CHECK(req.qmsg);
×
129
  }
UNCOV
130
  req.subType = pSub->subType;
×
UNCOV
131
  req.withMeta = pSub->withMeta;
×
UNCOV
132
  req.suid = pSub->stbUid;
×
UNCOV
133
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
134

UNCOV
135
  int32_t tlen = 0;
×
UNCOV
136
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
×
UNCOV
137
  if (code < 0) {
×
138
    goto END;
×
139
  }
140

UNCOV
141
  tlen += sizeof(SMsgHead);
×
UNCOV
142
  void *buf = taosMemoryMalloc(tlen);
×
UNCOV
143
  MND_TMQ_NULL_CHECK(buf);
×
UNCOV
144
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
UNCOV
145
  pMsgHead->contLen = htonl(tlen);
×
UNCOV
146
  pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
×
147

UNCOV
148
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
×
UNCOV
149
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
×
UNCOV
150
  *pBuf = buf;
×
UNCOV
151
  *pLen = tlen;
×
152

UNCOV
153
END:
×
UNCOV
154
  tEncoderClear(&encoder);
×
UNCOV
155
  taosMemoryFree(req.qmsg);
×
UNCOV
156
  return code;
×
157
}
158

UNCOV
159
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
×
160
                                        const SMqRebOutputVg *pRebVg, SSubplan *pPlan) {
UNCOV
161
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
×
162
    return TSDB_CODE_INVALID_PARA;
×
163
  }
UNCOV
164
  int32_t code = 0;
×
UNCOV
165
  void   *buf  = NULL;
×
166

UNCOV
167
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
×
168
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
169
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
170
    goto END;
×
171
  }
172

UNCOV
173
  int32_t tlen = 0;
×
UNCOV
174
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan));
×
UNCOV
175
  int32_t vgId = pRebVg->pVgEp->vgId;
×
UNCOV
176
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
×
UNCOV
177
  if (pVgObj == NULL) {
×
178
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
179
    goto END;
×
180
  }
181

UNCOV
182
  STransAction action = {0};
×
UNCOV
183
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
×
UNCOV
184
  action.pCont = buf;
×
UNCOV
185
  action.contLen = tlen;
×
UNCOV
186
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
×
187

UNCOV
188
  mndReleaseVgroup(pMnode, pVgObj);
×
UNCOV
189
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
×
UNCOV
190
  return code;
×
191

192
END:
×
193
  taosMemoryFree(buf);
×
194
  return code;
×
195
}
196

UNCOV
197
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
×
UNCOV
198
  if (key == NULL || topic == NULL || cgroup == NULL) {
×
199
    return;
×
200
  }
UNCOV
201
  int32_t i = 0;
×
UNCOV
202
  while (key[i] != TMQ_SEPARATOR_CHAR) {
×
UNCOV
203
    i++;
×
204
  }
UNCOV
205
  (void)memcpy(cgroup, key, i);
×
UNCOV
206
  cgroup[i] = 0;
×
UNCOV
207
  if (fullName) {
×
UNCOV
208
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
×
209
  } else {
UNCOV
210
    while (key[i] != '.') {
×
UNCOV
211
      i++;
×
212
    }
UNCOV
213
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
×
214
  }
215
}
216

UNCOV
217
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
×
UNCOV
218
  if (pHash == NULL || key == NULL) {
×
219
    return TSDB_CODE_INVALID_PARA;
×
220
  }
UNCOV
221
  int32_t code = 0;
×
UNCOV
222
  SMqRebInfo* pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
×
UNCOV
223
  if (pRebInfo == NULL) {
×
UNCOV
224
    pRebInfo = tNewSMqRebSubscribe(key);
×
UNCOV
225
    if (pRebInfo == NULL) {
×
226
      code = terrno;
×
227
      goto END;
×
228
    }
UNCOV
229
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
×
UNCOV
230
    taosMemoryFreeClear(pRebInfo);
×
UNCOV
231
    if (code != 0) {
×
232
      goto END;
×
233
    }
UNCOV
234
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
×
UNCOV
235
    MND_TMQ_NULL_CHECK(pRebInfo);
×
236
  }
UNCOV
237
  if (pReb){
×
UNCOV
238
    *pReb = pRebInfo;
×
239
  }
240

UNCOV
241
END:
×
UNCOV
242
  return code;
×
243
}
244

UNCOV
245
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
×
UNCOV
246
  if (vgs == NULL || pHash == NULL || key == NULL) {
×
247
    return TSDB_CODE_INVALID_PARA;
×
248
  }
UNCOV
249
  int32_t         code = 0;
×
UNCOV
250
  SMqVgEp       **pVgEp = (SMqVgEp **)taosArrayPop(vgs);
×
UNCOV
251
  MND_TMQ_NULL_CHECK(pVgEp);
×
UNCOV
252
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
×
UNCOV
253
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &(*pVgEp)->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
×
UNCOV
254
  mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, (*pVgEp)->vgId, consumerId);
×
255
END:
×
UNCOV
256
  return code;
×
257
}
258

UNCOV
259
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
×
UNCOV
260
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
×
261
    return TSDB_CODE_INVALID_PARA;
×
262
  }
UNCOV
263
  int32_t code = 0;
×
UNCOV
264
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
×
UNCOV
265
  int32_t actualRemoved = 0;
×
UNCOV
266
  for (int32_t i = 0; i < numOfRemoved; i++) {
×
UNCOV
267
    int64_t*      consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
×
UNCOV
268
    MND_TMQ_NULL_CHECK(consumerId);
×
UNCOV
269
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
×
UNCOV
270
    if (pConsumerEp == NULL) {
×
271
      continue;
×
272
    }
273

UNCOV
274
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
×
UNCOV
275
    for (int32_t j = 0; j < consumerVgNum; j++) {
×
UNCOV
276
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
×
277
    }
278

UNCOV
279
    taosArrayDestroy(pConsumerEp->vgs);
×
UNCOV
280
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
×
UNCOV
281
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
×
UNCOV
282
    actualRemoved++;
×
283
  }
284

UNCOV
285
  if (numOfRemoved != actualRemoved) {
×
286
    mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved,
×
287
           actualRemoved);
288
  } else {
UNCOV
289
    mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
×
290
  }
291
END:
×
UNCOV
292
  return code;
×
293
}
294

UNCOV
295
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
×
UNCOV
296
  if (pOutput == NULL || pInput == NULL) {
×
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
UNCOV
299
  int32_t code = 0;
×
UNCOV
300
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
×
301

UNCOV
302
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
×
UNCOV
303
    int64_t* consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
×
UNCOV
304
    MND_TMQ_NULL_CHECK(consumerId);
×
UNCOV
305
    SMqConsumerEp newConsumerEp = {0};
×
UNCOV
306
    newConsumerEp.consumerId = *consumerId;
×
UNCOV
307
    newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
×
UNCOV
308
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
×
UNCOV
309
    MND_TMQ_RETURN_CHECK(taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)));
×
UNCOV
310
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
×
UNCOV
311
    mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
×
312
  }
UNCOV
313
END:
×
UNCOV
314
  return code;
×
315
}
316

UNCOV
317
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
×
UNCOV
318
  if (pOutput == NULL || pHash == NULL) {
×
319
    return TSDB_CODE_INVALID_PARA;
×
320
  }
UNCOV
321
  int32_t code = 0;
×
UNCOV
322
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
×
UNCOV
323
  for (int32_t i = 0; i < numOfVgroups; i++) {
×
UNCOV
324
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
×
325
  }
UNCOV
326
END:
×
UNCOV
327
  return code;
×
328
}
329

UNCOV
330
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
×
331
                                     int32_t remainderVgCnt) {
UNCOV
332
  if (pOutput == NULL || pHash == NULL) {
×
333
    return TSDB_CODE_INVALID_PARA;
×
334
  }
UNCOV
335
  int32_t code = 0;
×
UNCOV
336
  int32_t cnt = 0;
×
UNCOV
337
  void   *pIter = NULL;
×
338

UNCOV
339
  while (1) {
×
UNCOV
340
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
341
    if (pIter == NULL) {
×
UNCOV
342
      break;
×
343
    }
344

UNCOV
345
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
346
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
×
347

UNCOV
348
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
×
UNCOV
349
    if (consumerVgNum > minVgCnt) {
×
UNCOV
350
      if (cnt < remainderVgCnt) {
×
UNCOV
351
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
×
352
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
353
        }
UNCOV
354
        cnt++;
×
355
      } else {
UNCOV
356
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
×
UNCOV
357
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
358
        }
359
      }
360
    }
361
  }
UNCOV
362
END:
×
UNCOV
363
  return code;
×
364
}
365

UNCOV
366
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
×
UNCOV
367
  if (pMnode == NULL || pOutput == NULL) {
×
368
    return TSDB_CODE_INVALID_PARA;
×
369
  }
UNCOV
370
  int32_t code = 0;
×
UNCOV
371
  int32_t totalVgNum = 0;
×
UNCOV
372
  SVgObj *pVgroup = NULL;
×
UNCOV
373
  SMqVgEp *pVgEp = NULL;
×
UNCOV
374
  void   *pIter = NULL;
×
UNCOV
375
  SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
376
  MND_TMQ_NULL_CHECK(newVgs);
×
377
  while (1) {
UNCOV
378
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
379
    if (pIter == NULL) {
×
UNCOV
380
      break;
×
381
    }
382

UNCOV
383
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
×
UNCOV
384
      sdbRelease(pMnode->pSdb, pVgroup);
×
UNCOV
385
      continue;
×
386
    }
387

UNCOV
388
    totalVgNum++;
×
UNCOV
389
    pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
×
UNCOV
390
    MND_TMQ_NULL_CHECK(pVgEp);
×
UNCOV
391
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
392
    pVgEp->vgId = pVgroup->vgId;
×
UNCOV
393
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
×
UNCOV
394
    pVgEp = NULL;
×
UNCOV
395
    sdbRelease(pMnode->pSdb, pVgroup);
×
396
  }
397

UNCOV
398
  pIter = NULL;
×
UNCOV
399
  while (1) {
×
UNCOV
400
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
401
    if (pIter == NULL) break;
×
UNCOV
402
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
403
    int32_t j = 0;
×
UNCOV
404
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
×
UNCOV
405
      SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
×
UNCOV
406
      MND_TMQ_NULL_CHECK(pVgEpTmp);
×
UNCOV
407
      bool     find = false;
×
UNCOV
408
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
×
UNCOV
409
        SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
×
UNCOV
410
        MND_TMQ_NULL_CHECK(pnewVgEp);
×
UNCOV
411
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
×
UNCOV
412
          tDeleteSMqVgEp(pnewVgEp);
×
UNCOV
413
          taosArrayRemove(newVgs, k);
×
UNCOV
414
          find = true;
×
UNCOV
415
          break;
×
416
        }
417
      }
UNCOV
418
      if (!find) {
×
UNCOV
419
        mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
×
UNCOV
420
        tDeleteSMqVgEp(pVgEpTmp);
×
UNCOV
421
        taosArrayRemove(pConsumerEp->vgs, j);
×
UNCOV
422
        continue;
×
423
      }
UNCOV
424
      j++;
×
425
    }
426
  }
427

UNCOV
428
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
×
UNCOV
429
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
×
UNCOV
430
    mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
×
UNCOV
431
    taosArrayDestroy(newVgs);
×
432
  } else {
UNCOV
433
    taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
434
  }
UNCOV
435
  return totalVgNum;
×
436

437
END:
×
438
  sdbRelease(pMnode->pSdb, pVgroup);
×
439
  taosMemoryFree(pVgEp);
×
440
  taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
×
441
  return code;
×
442
}
443

UNCOV
444
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
×
UNCOV
445
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
×
446
    return TSDB_CODE_INVALID_PARA;
×
447
  }
UNCOV
448
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
449
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
×
UNCOV
450
  if( code != 0){
×
UNCOV
451
    return 0;
×
452
  }
UNCOV
453
  taosRLockLatch(&pSub->lock);
×
UNCOV
454
  if (pOutput->pSub->offsetRows == NULL) {
×
UNCOV
455
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
×
UNCOV
456
    if(pOutput->pSub->offsetRows == NULL) {
×
457
      taosRUnLockLatch(&pSub->lock);
×
458
      code = terrno;
×
459
      goto END;
×
460
    }
461
  }
UNCOV
462
  void *pIter = NULL;
×
UNCOV
463
  while (1) {
×
UNCOV
464
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
465
    if (pIter == NULL) break;
×
UNCOV
466
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
467
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
×
468

UNCOV
469
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
×
UNCOV
470
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
×
UNCOV
471
      MND_TMQ_NULL_CHECK(d1);
×
UNCOV
472
      bool        jump = false;
×
UNCOV
473
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
×
UNCOV
474
        SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
×
UNCOV
475
        MND_TMQ_NULL_CHECK(pVgEp);
×
UNCOV
476
        if (pVgEp->vgId == d1->vgId) {
×
UNCOV
477
          jump = true;
×
UNCOV
478
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
×
479
                pConsumerEp->consumerId, pVgEp->vgId);
UNCOV
480
          break;
×
481
        }
482
      }
UNCOV
483
      if (jump) continue;
×
UNCOV
484
      bool find = false;
×
UNCOV
485
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
×
UNCOV
486
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
×
UNCOV
487
        MND_TMQ_NULL_CHECK(d2);
×
UNCOV
488
        if (d1->vgId == d2->vgId) {
×
UNCOV
489
          d2->rows += d1->rows;
×
UNCOV
490
          d2->offset = d1->offset;
×
UNCOV
491
          d2->ever = d1->ever;
×
UNCOV
492
          find = true;
×
UNCOV
493
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
×
UNCOV
494
          break;
×
495
        }
496
      }
UNCOV
497
      if (!find) {
×
UNCOV
498
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
×
499
      }
500
    }
501
  }
UNCOV
502
  taosRUnLockLatch(&pSub->lock);
×
UNCOV
503
  mndReleaseSubscribe(pMnode, pSub);
×
504

UNCOV
505
END:
×
UNCOV
506
  return code;
×
507
}
508

UNCOV
509
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
×
UNCOV
510
  if (pOutput == NULL) return;
×
UNCOV
511
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
×
UNCOV
512
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
×
UNCOV
513
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
×
UNCOV
514
    if (pOutputRebVg == NULL) continue;
×
UNCOV
515
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
×
516
          pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
517
  }
518

UNCOV
519
  void *pIter = NULL;
×
UNCOV
520
  while (1) {
×
UNCOV
521
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
522
    if (pIter == NULL) break;
×
UNCOV
523
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
524
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
×
UNCOV
525
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
×
526
          pConsumerEp->consumerId, sz);
UNCOV
527
    for (int32_t i = 0; i < sz; i++) {
×
UNCOV
528
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
×
UNCOV
529
      if (pVgEp == NULL) continue;
×
UNCOV
530
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
×
531
            pConsumerEp->consumerId);
532
    }
533
  }
534
}
535

UNCOV
536
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
×
537
                           int32_t *remainderVgCnt) {
UNCOV
538
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
×
539
    return;
×
540
  }
UNCOV
541
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
×
UNCOV
542
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
×
UNCOV
543
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
×
544

545
  // calc num
UNCOV
546
  if (numOfFinal != 0) {
×
UNCOV
547
    *minVgCnt = totalVgNum / numOfFinal;
×
UNCOV
548
    *remainderVgCnt = totalVgNum % numOfFinal;
×
549
  } else {
UNCOV
550
    mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey);
×
551
  }
UNCOV
552
  mInfo(
×
553
      "[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d",
554
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
555
}
556

UNCOV
557
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
×
UNCOV
558
  if (pOutput == NULL || pHash == NULL) {
×
559
    return TSDB_CODE_INVALID_PARA;
×
560
  }
UNCOV
561
  SMqRebOutputVg *pRebVg = NULL;
×
UNCOV
562
  void           *pAssignIter = NULL;
×
UNCOV
563
  void           *pIter = NULL;
×
UNCOV
564
  int32_t         code = 0;
×
565

UNCOV
566
  while (1) {
×
UNCOV
567
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
568
    if (pIter == NULL) {
×
UNCOV
569
      break;
×
570
    }
UNCOV
571
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
572
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
×
UNCOV
573
      pAssignIter = taosHashIterate(pHash, pAssignIter);
×
UNCOV
574
      if (pAssignIter == NULL) {
×
575
        mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
576
        break;
×
577
      }
578

UNCOV
579
      pRebVg = (SMqRebOutputVg *)pAssignIter;
×
UNCOV
580
      pRebVg->newConsumerId = pConsumerEp->consumerId;
×
UNCOV
581
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
×
UNCOV
582
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId,
×
583
            pConsumerEp->consumerId);
584
    }
585
  }
586

UNCOV
587
  while (1) {
×
UNCOV
588
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
589
    if (pIter == NULL) {
×
UNCOV
590
      break;
×
591
    }
UNCOV
592
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
593
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
×
UNCOV
594
      pAssignIter = taosHashIterate(pHash, pAssignIter);
×
UNCOV
595
      if (pAssignIter == NULL) {
×
UNCOV
596
        mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key);
×
UNCOV
597
        break;
×
598
      }
599

UNCOV
600
      pRebVg = (SMqRebOutputVg *)pAssignIter;
×
UNCOV
601
      pRebVg->newConsumerId = pConsumerEp->consumerId;
×
UNCOV
602
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
×
UNCOV
603
      mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId,
×
604
            pConsumerEp->consumerId);
605
    }
606
  }
607

UNCOV
608
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
×
UNCOV
609
  if (pAssignIter != NULL) {
×
610
    mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
611
    code = TSDB_CODE_PAR_INTERNAL_ERROR;
×
612
    goto END;
×
613
  }
UNCOV
614
  while (1) {
×
UNCOV
615
    pAssignIter = taosHashIterate(pHash, pAssignIter);
×
UNCOV
616
    if (pAssignIter == NULL) {
×
UNCOV
617
      break;
×
618
    }
619

UNCOV
620
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
×
UNCOV
621
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
×
UNCOV
622
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {            // if all consumer is removed
×
UNCOV
623
      MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
×
624
    }
625
  }
626

UNCOV
627
END:
×
UNCOV
628
  return code;
×
629
}
630

UNCOV
631
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
×
UNCOV
632
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
×
633
    return TSDB_CODE_INVALID_PARA;
×
634
  }
UNCOV
635
  int32_t     totalVgNum = processRemoveAddVgs(pMnode, pOutput);
×
UNCOV
636
  if (totalVgNum < 0){
×
637
    return totalVgNum;
×
638
  }
UNCOV
639
  const char *pSubKey = pOutput->pSub->key;
×
UNCOV
640
  int32_t     minVgCnt = 0;
×
UNCOV
641
  int32_t     remainderVgCnt = 0;
×
UNCOV
642
  int32_t     code = 0;
×
UNCOV
643
  SHashObj   *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
UNCOV
644
  MND_TMQ_NULL_CHECK(pHash);
×
UNCOV
645
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
×
UNCOV
646
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
×
UNCOV
647
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
×
UNCOV
648
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
×
UNCOV
649
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
×
UNCOV
650
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
×
UNCOV
651
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
×
UNCOV
652
  printRebalanceLog(pOutput);
×
UNCOV
653
  taosHashCleanup(pHash);
×
654

UNCOV
655
END:
×
UNCOV
656
  return code;
×
657
}
658

UNCOV
659
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
×
UNCOV
660
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
×
661
    return TSDB_CODE_INVALID_PARA;
×
662
  }
UNCOV
663
  int32_t         code = 0;
×
UNCOV
664
  SMqConsumerObj *pConsumerNew = NULL;
×
UNCOV
665
  int32_t         consumerNum = taosArrayGetSize(consumers);
×
UNCOV
666
  for (int32_t i = 0; i < consumerNum; i++) {
×
UNCOV
667
    int64_t* consumerId = (int64_t *)taosArrayGet(consumers, i);
×
UNCOV
668
    MND_TMQ_NULL_CHECK(consumerId);
×
UNCOV
669
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
×
UNCOV
670
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
×
UNCOV
671
    tDeleteSMqConsumerObj(pConsumerNew);
×
672
  }
UNCOV
673
  pConsumerNew = NULL;
×
674

UNCOV
675
END:
×
UNCOV
676
  tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
677
  return code;
×
678
}
679

UNCOV
680
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
×
UNCOV
681
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
×
682
    return TSDB_CODE_INVALID_PARA;
×
683
  }
UNCOV
684
  int32_t code = 0;
×
UNCOV
685
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
×
UNCOV
686
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
×
UNCOV
687
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
×
UNCOV
688
END:
×
UNCOV
689
  return code;
×
690
}
691

UNCOV
692
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
×
UNCOV
693
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
×
694
    return TSDB_CODE_INVALID_PARA;
×
695
  }
UNCOV
696
  struct SSubplan *pPlan = NULL;
×
UNCOV
697
  int32_t          code = 0;
×
UNCOV
698
  STrans          *pTrans = NULL;
×
699

UNCOV
700
  if (strcmp(pOutput->pSub->qmsg, "") != 0) {
×
UNCOV
701
    MND_TMQ_RETURN_CHECK(qStringToSubplan(pOutput->pSub->qmsg, &pPlan));
×
702
  }
703

UNCOV
704
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
UNCOV
705
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
UNCOV
706
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
×
707

UNCOV
708
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
×
UNCOV
709
  if (pTrans == NULL) {
×
710
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
711
    if (terrno != 0) code = terrno;
×
712
    goto END;
×
713
  }
714

UNCOV
715
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
×
UNCOV
716
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
717

718
  // 1. redo action: action to all vg
UNCOV
719
  const SArray *rebVgs = pOutput->rebVgs;
×
UNCOV
720
  int32_t       vgNum = taosArrayGetSize(rebVgs);
×
UNCOV
721
  for (int32_t i = 0; i < vgNum; i++) {
×
UNCOV
722
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
×
UNCOV
723
    MND_TMQ_NULL_CHECK(pRebVg);
×
UNCOV
724
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan));
×
725
  }
726

727
  // 2. commit log: subscribe and vg assignment
UNCOV
728
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
×
729

730
  // 3. commit log: consumer to update status and epoch
UNCOV
731
  MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
×
732

733
  // 4. set cb
UNCOV
734
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
×
735

736
  // 5. execution
UNCOV
737
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
738

UNCOV
739
END:
×
UNCOV
740
  nodesDestroyNode((SNode *)pPlan);
×
UNCOV
741
  mndTransDrop(pTrans);
×
UNCOV
742
  TAOS_RETURN(code);
×
743
}
744

UNCOV
745
static void freeRebalanceItem(void *param) {
×
UNCOV
746
  if (param == NULL) return;
×
UNCOV
747
  SMqRebInfo *pInfo = param;
×
UNCOV
748
  taosArrayDestroy(pInfo->newConsumers);
×
UNCOV
749
  taosArrayDestroy(pInfo->removedConsumers);
×
750
}
751

752
// type = 0 remove  type = 1 add
UNCOV
753
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) {
×
UNCOV
754
  if (rebSubHash == NULL || topicList == NULL || group == NULL) {
×
755
    return TSDB_CODE_INVALID_PARA;
×
756
  }
UNCOV
757
  int32_t code = 0;
×
UNCOV
758
  int32_t topicNum = taosArrayGetSize(topicList);
×
UNCOV
759
  for (int32_t i = 0; i < topicNum; i++) {
×
UNCOV
760
    char *removedTopic = taosArrayGetP(topicList, i);
×
UNCOV
761
    MND_TMQ_NULL_CHECK(removedTopic);
×
UNCOV
762
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
763
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", group, TMQ_SEPARATOR, removedTopic);
×
UNCOV
764
    SMqRebInfo *pRebSub = NULL;
×
UNCOV
765
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
×
UNCOV
766
    if (type == 0)
×
UNCOV
767
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &consumerId));
×
UNCOV
768
    else if (type == 1)
×
UNCOV
769
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &consumerId));
×
770
  }
771

UNCOV
772
END:
×
UNCOV
773
  return code;
×
774
}
775

UNCOV
776
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
×
UNCOV
777
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
×
778
    return;
×
779
  }
UNCOV
780
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
×
UNCOV
781
  for (int32_t i = 0; i < newTopicNum; i++) {
×
UNCOV
782
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
783
    if (topic == NULL){
×
784
      continue;
×
785
    }
UNCOV
786
    SMqSubscribeObj *pSub = NULL;
×
UNCOV
787
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
788
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
×
UNCOV
789
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
UNCOV
790
    if (code != 0) {
×
791
      continue;
×
792
    }
UNCOV
793
    taosRLockLatch(&pSub->lock);
×
794

795
    // iterate all vg assigned to the consumer of that topic
UNCOV
796
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
×
UNCOV
797
    if (pConsumerEp == NULL){
×
798
      taosRUnLockLatch(&pSub->lock);
×
799
      mndReleaseSubscribe(pMnode, pSub);
×
800
      continue;
×
801
    }
UNCOV
802
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
×
UNCOV
803
    for (int32_t j = 0; j < vgNum; j++) {
×
UNCOV
804
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
×
UNCOV
805
      if (pVgEp == NULL) {
×
806
        continue;
×
807
      }
UNCOV
808
      SVgObj  *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
×
UNCOV
809
      if (!pVgroup) {
×
UNCOV
810
        code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
×
UNCOV
811
        if (code != 0){
×
812
          mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
813
        }else{
UNCOV
814
          mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
×
815
        }
816
      }
UNCOV
817
      mndReleaseVgroup(pMnode, pVgroup);
×
818
    }
UNCOV
819
    taosRUnLockLatch(&pSub->lock);
×
UNCOV
820
    mndReleaseSubscribe(pMnode, pSub);
×
821
  }
822
}
823

824
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
809✔
825
  if (pMsg == NULL || rebSubHash == NULL) {
809!
826
    return TSDB_CODE_INVALID_PARA;
×
827
  }
828
  SMnode         *pMnode = pMsg->info.node;
809✔
829
  SSdb           *pSdb = pMnode->pSdb;
809✔
830
  SMqConsumerObj *pConsumer = NULL;
809✔
831
  void           *pIter = NULL;
809✔
832
  int32_t         code = 0;
809✔
833

834
  // iterate all consumers, find all modification
UNCOV
835
  while (1) {
×
836
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
809✔
837
    if (pIter == NULL) {
809!
838
      break;
809✔
839
    }
840

UNCOV
841
    int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
×
UNCOV
842
    int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
×
UNCOV
843
    int32_t status = atomic_load_32(&pConsumer->status);
×
844

UNCOV
845
    mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
×
846
           ", hbstatus:%d, pollStatus:%d",
847
           pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
848
           pConsumer->createTime, hbStatus, pollStatus);
849

UNCOV
850
    if (status == MQ_CONSUMER_STATUS_READY) {
×
UNCOV
851
      if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
×
UNCOV
852
        MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
UNCOV
853
      } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
×
UNCOV
854
                 pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
×
UNCOV
855
        taosRLockLatch(&pConsumer->lock);
×
UNCOV
856
        MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
×
UNCOV
857
        taosRUnLockLatch(&pConsumer->lock);
×
858
      } else {
UNCOV
859
        checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
×
860
      }
UNCOV
861
    } else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
×
UNCOV
862
      taosRLockLatch(&pConsumer->lock);
×
UNCOV
863
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
×
UNCOV
864
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
×
UNCOV
865
      taosRUnLockLatch(&pConsumer->lock);
×
866
    } else {
867
      MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
×
868
    }
869

UNCOV
870
    mndReleaseConsumer(pMnode, pConsumer);
×
871
  }
872
END:
809✔
873
  return code;
809✔
874
}
875

876
bool mndRebTryStart() {
809✔
877
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
809✔
878
  if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0;
809!
879
}
880

UNCOV
881
void mndRebCntInc() {
×
UNCOV
882
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
×
UNCOV
883
  if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
×
UNCOV
884
}
×
885

886
void mndRebCntDec() {
809✔
887
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
809✔
888
  if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
809!
889
}
809✔
890

UNCOV
891
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
×
UNCOV
892
  if (rebOutput == NULL) {
×
893
    return;
×
894
  }
UNCOV
895
  taosArrayDestroy(rebOutput->newConsumers);
×
UNCOV
896
  taosArrayDestroy(rebOutput->modifyConsumers);
×
UNCOV
897
  taosArrayDestroy(rebOutput->removedConsumers);
×
UNCOV
898
  taosArrayDestroy(rebOutput->rebVgs);
×
UNCOV
899
  tDeleteSubscribeObj(rebOutput->pSub);
×
UNCOV
900
  taosMemoryFree(rebOutput->pSub);
×
901
}
902

UNCOV
903
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
×
UNCOV
904
  if (rebOutput == NULL) {
×
905
    return TSDB_CODE_INVALID_PARA;
×
906
  }
UNCOV
907
  int32_t code = 0;
×
UNCOV
908
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
×
UNCOV
909
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
×
UNCOV
910
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
×
UNCOV
911
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
×
UNCOV
912
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
×
UNCOV
913
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
×
UNCOV
914
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
UNCOV
915
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
UNCOV
916
  return code;
×
917

918
END:
×
919
  clearRebOutput(rebOutput);
×
920
  return code;
×
921
}
922

923
// This function only works when there are dirty consumers
UNCOV
924
static int32_t checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) {
×
UNCOV
925
  if (pMnode == NULL || pSub == NULL) {
×
926
    return TSDB_CODE_INVALID_PARA;
×
927
  }
UNCOV
928
  int32_t code = 0;
×
UNCOV
929
  void   *pIter = NULL;
×
UNCOV
930
  while (1) {
×
UNCOV
931
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
932
    if (pIter == NULL) {
×
UNCOV
933
      break;
×
934
    }
935

UNCOV
936
    SMqConsumerEp  *pConsumerEp = (SMqConsumerEp *)pIter;
×
UNCOV
937
    SMqConsumerObj *pConsumer = NULL;
×
UNCOV
938
    code = mndAcquireConsumer(pMnode, pConsumerEp->consumerId, &pConsumer);
×
UNCOV
939
    if (code == 0) {
×
UNCOV
940
      mndReleaseConsumer(pMnode, pConsumer);
×
UNCOV
941
      continue;
×
942
    }
943
    mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
×
944
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs));
×
945

946
    taosArrayDestroy(pConsumerEp->vgs);
×
947
    MND_TMQ_RETURN_CHECK(taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)));
×
948
  }
UNCOV
949
END:
×
UNCOV
950
  return code;
×
951
}
952

UNCOV
953
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
×
UNCOV
954
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
×
955
    return TSDB_CODE_INVALID_PARA;
×
956
  }
UNCOV
957
  const char      *key = rebInput->pRebInfo->key;
×
UNCOV
958
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
959
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
960

UNCOV
961
  if (code != 0) {
×
962
    // split sub key and extract topic
UNCOV
963
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
UNCOV
964
    char cgroup[TSDB_CGROUP_LEN] = {0};
×
UNCOV
965
    mndSplitSubscribeKey(key, topic, cgroup, true);
×
UNCOV
966
    SMqTopicObj *pTopic = NULL;
×
UNCOV
967
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
×
UNCOV
968
    taosRLockLatch(&pTopic->lock);
×
969

UNCOV
970
    rebInput->oldConsumerNum = 0;
×
UNCOV
971
    code = mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub);
×
UNCOV
972
    if (code != 0) {
×
973
      mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code));
×
974
      taosRUnLockLatch(&pTopic->lock);
×
975
      mndReleaseTopic(pMnode, pTopic);
×
976
      return code;
×
977
    }
978

UNCOV
979
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
×
UNCOV
980
    taosRUnLockLatch(&pTopic->lock);
×
UNCOV
981
    mndReleaseTopic(pMnode, pTopic);
×
982

UNCOV
983
    mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
×
984
  } else {
UNCOV
985
    taosRLockLatch(&pSub->lock);
×
UNCOV
986
    code = tCloneSubscribeObj(pSub, &rebOutput->pSub);
×
UNCOV
987
    if(code != 0){
×
988
      taosRUnLockLatch(&pSub->lock);
×
989
      goto END;
×
990
    }
UNCOV
991
    code = checkConsumer(pMnode, rebOutput->pSub);
×
UNCOV
992
    if(code != 0){
×
993
      taosRUnLockLatch(&pSub->lock);
×
994
      goto END;
×
995
    }
UNCOV
996
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
×
UNCOV
997
    taosRUnLockLatch(&pSub->lock);
×
998

UNCOV
999
    mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
×
UNCOV
1000
    mndReleaseSubscribe(pMnode, pSub);
×
1001
  }
1002

UNCOV
1003
END:
×
UNCOV
1004
  return code;
×
1005
}
1006

1007
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
809✔
1008
  if (pMsg == NULL) {
809!
1009
    return TSDB_CODE_INVALID_PARA;
×
1010
  }
1011
  int     code = 0;
809✔
1012
  void   *pIter = NULL;
809✔
1013
  SMnode *pMnode = pMsg->info.node;
809✔
1014
  mDebug("[rebalance] start to process mq timer");
809✔
1015
  if (!mndRebTryStart()) {
809!
1016
    mInfo("[rebalance] mq rebalance already in progress, do nothing");
×
1017
    return code;
×
1018
  }
1019

1020
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
809✔
1021
  MND_TMQ_NULL_CHECK(rebSubHash);
809!
1022

1023
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
809✔
1024

1025
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
809!
1026
  if (taosHashGetSize(rebSubHash) > 0) {
809!
UNCOV
1027
    mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
×
1028
  }
1029

UNCOV
1030
  while (1) {
×
1031
    pIter = taosHashIterate(rebSubHash, pIter);
809✔
1032
    if (pIter == NULL) {
809!
1033
      break;
809✔
1034
    }
1035

UNCOV
1036
    SMqRebInputObj  rebInput = {0};
×
UNCOV
1037
    SMqRebOutputObj rebOutput = {0};
×
UNCOV
1038
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
×
UNCOV
1039
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
×
UNCOV
1040
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
×
UNCOV
1041
    if (code != 0) {
×
1042
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1043
    }
1044

UNCOV
1045
    if (code == 0){
×
UNCOV
1046
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
×
UNCOV
1047
      if (code != 0) {
×
1048
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1049
      }
1050
    }
1051

UNCOV
1052
    if (code == 0){
×
UNCOV
1053
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
UNCOV
1054
      if (code != 0) {
×
UNCOV
1055
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
×
1056
      }
1057
    }
1058

UNCOV
1059
    clearRebOutput(&rebOutput);
×
1060
  }
1061

1062
  if (taosHashGetSize(rebSubHash) > 0) {
809!
UNCOV
1063
    mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
×
1064
  }
1065

1066
END:
809✔
1067
  taosHashCancelIterate(rebSubHash, pIter);
809✔
1068
  taosHashCleanup(rebSubHash);
809✔
1069
  mndRebCntDec();
809✔
1070

1071
  TAOS_RETURN(code);
809✔
1072
}
1073

UNCOV
1074
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
×
UNCOV
1075
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
×
1076
    return TSDB_CODE_INVALID_PARA;
×
1077
  }
UNCOV
1078
  void   *pIter = NULL;
×
UNCOV
1079
  SVgObj *pVgObj = NULL;
×
UNCOV
1080
  int32_t code = 0;
×
UNCOV
1081
  while (1) {
×
UNCOV
1082
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
×
UNCOV
1083
    if (pIter == NULL) {
×
UNCOV
1084
      break;
×
1085
    }
1086

UNCOV
1087
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
×
UNCOV
1088
      sdbRelease(pMnode->pSdb, pVgObj);
×
UNCOV
1089
      continue;
×
1090
    }
UNCOV
1091
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
×
UNCOV
1092
    MND_TMQ_NULL_CHECK(pReq);
×
UNCOV
1093
    pReq->head.vgId = htonl(pVgObj->vgId);
×
UNCOV
1094
    pReq->vgId = pVgObj->vgId;
×
UNCOV
1095
    pReq->consumerId = -1;
×
UNCOV
1096
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
1097

UNCOV
1098
    STransAction action = {0};
×
UNCOV
1099
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
×
UNCOV
1100
    action.pCont = pReq;
×
UNCOV
1101
    action.contLen = sizeof(SMqVDeleteReq);
×
UNCOV
1102
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
×
UNCOV
1103
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
1104

UNCOV
1105
    sdbRelease(pMnode->pSdb, pVgObj);
×
UNCOV
1106
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
×
1107
  }
1108

UNCOV
1109
END:
×
UNCOV
1110
  sdbRelease(pMnode->pSdb, pVgObj);
×
UNCOV
1111
  sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1112
  return code;
×
1113
}
1114

UNCOV
1115
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
×
UNCOV
1116
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
×
1117
    return TSDB_CODE_INVALID_PARA;
×
1118
  }
UNCOV
1119
  void           *pIter = NULL;
×
UNCOV
1120
  SMqConsumerObj *pConsumer = NULL;
×
UNCOV
1121
  int             code = 0;
×
UNCOV
1122
  while (1) {
×
UNCOV
1123
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
×
UNCOV
1124
    if (pIter == NULL) {
×
UNCOV
1125
      break;
×
1126
    }
1127

UNCOV
1128
    if (strcmp(cgroup, pConsumer->cgroup) != 0) {
×
1129
      sdbRelease(pMnode->pSdb, pConsumer);
×
1130
      continue;
×
1131
    }
1132

UNCOV
1133
    bool found = checkTopic(pConsumer->assignedTopics, topic);
×
UNCOV
1134
    if (found){
×
1135
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
×
1136
             topic, pConsumer->consumerId, pConsumer->cgroup);
1137
      code = TSDB_CODE_MND_CGROUP_USED;
×
1138
      goto END;
×
1139
    }
1140

UNCOV
1141
    sdbRelease(pMnode->pSdb, pConsumer);
×
1142
  }
1143

UNCOV
1144
END:
×
UNCOV
1145
  sdbRelease(pMnode->pSdb, pConsumer);
×
UNCOV
1146
  sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
1147
  return code;
×
1148
}
1149

UNCOV
1150
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
×
UNCOV
1151
  if (pMsg == NULL) {
×
1152
    return TSDB_CODE_INVALID_PARA;
×
1153
  }
UNCOV
1154
  SMnode         *pMnode = pMsg->info.node;
×
UNCOV
1155
  SMDropCgroupReq dropReq = {0};
×
UNCOV
1156
  STrans         *pTrans = NULL;
×
UNCOV
1157
  int32_t         code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
1158
  SMqSubscribeObj *pSub = NULL;
×
1159

UNCOV
1160
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
×
UNCOV
1161
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
1162
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
×
UNCOV
1163
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
UNCOV
1164
  if (code != 0) {
×
1165
    if (dropReq.igNotExists) {
×
1166
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1167
      return 0;
×
1168
    } else {
1169
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1170
      mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code));
×
1171
      return code;
×
1172
    }
1173
  }
1174

UNCOV
1175
  taosWLockLatch(&pSub->lock);
×
UNCOV
1176
  if (taosHashGetSize(pSub->consumerHash) != 0) {
×
1177
    code = TSDB_CODE_MND_CGROUP_USED;
×
1178
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
×
1179
    goto END;
×
1180
  }
1181

UNCOV
1182
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
×
UNCOV
1183
  MND_TMQ_NULL_CHECK(pTrans);
×
UNCOV
1184
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
×
UNCOV
1185
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
×
UNCOV
1186
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
UNCOV
1187
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
UNCOV
1188
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
×
UNCOV
1189
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
UNCOV
1190
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
1191

UNCOV
1192
END:
×
UNCOV
1193
  taosWUnLockLatch(&pSub->lock);
×
UNCOV
1194
  mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
1195
  mndTransDrop(pTrans);
×
1196

UNCOV
1197
  if (code != 0) {
×
1198
    mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
×
1199
    TAOS_RETURN(code);
×
1200
  }
UNCOV
1201
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
×
1202
}
1203

1204
void mndCleanupSubscribe(SMnode *pMnode) {}
13✔
1205

UNCOV
1206
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
×
UNCOV
1207
  if (pSub == NULL) {
×
1208
    return NULL;
×
1209
  }
UNCOV
1210
  int32_t code = 0;
×
UNCOV
1211
  int32_t lino = 0;
×
UNCOV
1212
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1213
  void   *buf = NULL;
×
UNCOV
1214
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
×
UNCOV
1215
  if (tlen <= 0) goto SUB_ENCODE_OVER;
×
UNCOV
1216
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
×
1217

UNCOV
1218
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
×
UNCOV
1219
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
×
1220

UNCOV
1221
  buf = taosMemoryMalloc(tlen);
×
UNCOV
1222
  if (buf == NULL) goto SUB_ENCODE_OVER;
×
1223

UNCOV
1224
  void *abuf = buf;
×
UNCOV
1225
  if (tEncodeSubscribeObj(&abuf, pSub) < 0){
×
1226
    goto SUB_ENCODE_OVER;
×
1227
  }
1228

UNCOV
1229
  int32_t dataPos = 0;
×
UNCOV
1230
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
×
UNCOV
1231
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
×
UNCOV
1232
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
×
UNCOV
1233
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
×
1234

UNCOV
1235
  terrno = TSDB_CODE_SUCCESS;
×
1236

UNCOV
1237
SUB_ENCODE_OVER:
×
UNCOV
1238
  taosMemoryFreeClear(buf);
×
UNCOV
1239
  if (terrno != TSDB_CODE_SUCCESS) {
×
1240
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1241
    sdbFreeRaw(pRaw);
×
1242
    return NULL;
×
1243
  }
1244

UNCOV
1245
  mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
×
UNCOV
1246
  return pRaw;
×
1247
}
1248

UNCOV
1249
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
×
UNCOV
1250
  if (pRaw == NULL) {
×
1251
    return NULL;
×
1252
  }
UNCOV
1253
  int32_t code = 0;
×
UNCOV
1254
  int32_t lino = 0;
×
UNCOV
1255
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1256
  SSdbRow         *pRow = NULL;
×
UNCOV
1257
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
1258
  void            *buf = NULL;
×
1259

UNCOV
1260
  int8_t sver = 0;
×
UNCOV
1261
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
×
1262

UNCOV
1263
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
×
1264
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1265
    goto SUB_DECODE_OVER;
×
1266
  }
1267

UNCOV
1268
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
×
UNCOV
1269
  if (pRow == NULL) goto SUB_DECODE_OVER;
×
1270

UNCOV
1271
  pSub = sdbGetRowObj(pRow);
×
UNCOV
1272
  if (pSub == NULL) goto SUB_DECODE_OVER;
×
1273

UNCOV
1274
  int32_t dataPos = 0;
×
1275
  int32_t tlen;
UNCOV
1276
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
×
UNCOV
1277
  buf = taosMemoryMalloc(tlen);
×
UNCOV
1278
  if (buf == NULL) goto SUB_DECODE_OVER;
×
UNCOV
1279
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
×
UNCOV
1280
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
×
1281

UNCOV
1282
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
×
1283
    goto SUB_DECODE_OVER;
×
1284
  }
1285

1286
  // update epset saved in mnode
UNCOV
1287
  if (pSub->unassignedVgs != NULL) {
×
UNCOV
1288
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
×
UNCOV
1289
    for (int32_t i = 0; i < size; ++i) {
×
UNCOV
1290
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
×
UNCOV
1291
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
×
1292
    }
1293
  }
UNCOV
1294
  if (pSub->consumerHash != NULL) {
×
UNCOV
1295
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
×
UNCOV
1296
    while (pIter) {
×
UNCOV
1297
      SMqConsumerEp *pConsumerEp = pIter;
×
UNCOV
1298
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
×
UNCOV
1299
      for (int32_t i = 0; i < size; ++i) {
×
UNCOV
1300
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
×
UNCOV
1301
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
×
1302
      }
UNCOV
1303
      pIter = taosHashIterate(pSub->consumerHash, pIter);
×
1304
    }
1305
  }
1306

UNCOV
1307
  terrno = TSDB_CODE_SUCCESS;
×
1308

UNCOV
1309
SUB_DECODE_OVER:
×
UNCOV
1310
  taosMemoryFreeClear(buf);
×
UNCOV
1311
  if (terrno != TSDB_CODE_SUCCESS) {
×
1312
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1313
    taosMemoryFreeClear(pRow);
×
1314
    return NULL;
×
1315
  }
1316

UNCOV
1317
  mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
×
UNCOV
1318
  return pRow;
×
1319
}
1320

UNCOV
1321
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
×
UNCOV
1322
  mTrace("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
×
UNCOV
1323
  return 0;
×
1324
}
1325

UNCOV
1326
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
×
UNCOV
1327
  mTrace("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
×
UNCOV
1328
  tDeleteSubscribeObj(pSub);
×
UNCOV
1329
  return 0;
×
1330
}
1331

UNCOV
1332
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
×
UNCOV
1333
  if (pOldSub == NULL || pNewSub == NULL) return -1;
×
UNCOV
1334
  mTrace("subscribe:%s, perform update action", pOldSub->key);
×
UNCOV
1335
  taosWLockLatch(&pOldSub->lock);
×
1336

UNCOV
1337
  SHashObj *tmp = pOldSub->consumerHash;
×
UNCOV
1338
  pOldSub->consumerHash = pNewSub->consumerHash;
×
UNCOV
1339
  pNewSub->consumerHash = tmp;
×
1340

UNCOV
1341
  SArray *tmp1 = pOldSub->unassignedVgs;
×
UNCOV
1342
  pOldSub->unassignedVgs = pNewSub->unassignedVgs;
×
UNCOV
1343
  pNewSub->unassignedVgs = tmp1;
×
1344

UNCOV
1345
  SArray *tmp2 = pOldSub->offsetRows;
×
UNCOV
1346
  pOldSub->offsetRows = pNewSub->offsetRows;
×
UNCOV
1347
  pNewSub->offsetRows = tmp2;
×
1348

UNCOV
1349
  taosWUnLockLatch(&pOldSub->lock);
×
UNCOV
1350
  return 0;
×
1351
}
1352

UNCOV
1353
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub) {
×
UNCOV
1354
  if (pMnode == NULL || key == NULL || pSub == NULL){
×
1355
    return TSDB_CODE_INVALID_PARA;
×
1356
  }
UNCOV
1357
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
1358
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
×
UNCOV
1359
  if (*pSub == NULL) {
×
UNCOV
1360
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1361
  }
UNCOV
1362
  return 0;
×
1363
}
1364

UNCOV
1365
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
×
UNCOV
1366
  if (pMnode == NULL || topicName == NULL) return 0;
×
UNCOV
1367
  int32_t num = 0;
×
UNCOV
1368
  SSdb   *pSdb = pMnode->pSdb;
×
1369

UNCOV
1370
  void            *pIter = NULL;
×
UNCOV
1371
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
1372
  while (1) {
×
UNCOV
1373
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
UNCOV
1374
    if (pIter == NULL) break;
×
1375

UNCOV
1376
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
UNCOV
1377
    char cgroup[TSDB_CGROUP_LEN] = {0};
×
UNCOV
1378
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
UNCOV
1379
    if (strcmp(topic, topicName) != 0) {
×
UNCOV
1380
      sdbRelease(pSdb, pSub);
×
UNCOV
1381
      continue;
×
1382
    }
1383

UNCOV
1384
    num++;
×
UNCOV
1385
    sdbRelease(pSdb, pSub);
×
1386
  }
1387

UNCOV
1388
  return num;
×
1389
}
1390

UNCOV
1391
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
×
UNCOV
1392
  if (pMnode == NULL || pSub == NULL) return;
×
UNCOV
1393
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1394
  sdbRelease(pSdb, pSub);
×
1395
}
1396

UNCOV
1397
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
×
UNCOV
1398
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
×
UNCOV
1399
  int32_t  code = 0;
×
UNCOV
1400
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
×
UNCOV
1401
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
UNCOV
1402
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
UNCOV
1403
  if (code != 0){
×
1404
    sdbFreeRaw(pCommitRaw);
×
1405
    goto END;
×
1406
  }
UNCOV
1407
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
×
UNCOV
1408
END:
×
UNCOV
1409
  return code;
×
1410
}
1411

UNCOV
1412
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
×
UNCOV
1413
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
×
UNCOV
1414
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
1415
  int32_t          code = 0;
×
UNCOV
1416
  void            *pIter = NULL;
×
UNCOV
1417
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
1418
  while (1) {
×
UNCOV
1419
    sdbRelease(pSdb, pSub);
×
UNCOV
1420
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
UNCOV
1421
    if (pIter == NULL) break;
×
1422

UNCOV
1423
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
UNCOV
1424
    char cgroup[TSDB_CGROUP_LEN] = {0};
×
UNCOV
1425
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
UNCOV
1426
    if (strcmp(topic, topicName) != 0) {
×
UNCOV
1427
      continue;
×
1428
    }
1429

1430
    // iter all vnode to delete handle
UNCOV
1431
    if (taosHashGetSize(pSub->consumerHash) != 0) {
×
1432
      code = TSDB_CODE_MND_IN_REBALANCE;
×
1433
      goto END;
×
1434
    }
1435

UNCOV
1436
    MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
UNCOV
1437
    MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
1438
  }
1439

UNCOV
1440
END:
×
UNCOV
1441
  sdbRelease(pSdb, pSub);
×
UNCOV
1442
  sdbCancelFetch(pSdb, pIter);
×
1443

UNCOV
1444
  TAOS_RETURN(code);
×
1445
}
1446

UNCOV
1447
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
×
1448
                           const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
UNCOV
1449
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL){
×
1450
    return TSDB_CODE_INVALID_PARA;
×
1451
  }
UNCOV
1452
  int32_t code = 0;
×
UNCOV
1453
  int32_t sz = taosArrayGetSize(vgs);
×
UNCOV
1454
  for (int32_t j = 0; j < sz; j++) {
×
UNCOV
1455
    SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
×
UNCOV
1456
    MND_TMQ_NULL_CHECK(pVgEp);
×
1457

UNCOV
1458
    SColumnInfoData *pColInfo = NULL;
×
UNCOV
1459
    int32_t          cols = 0;
×
1460

UNCOV
1461
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1462
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1463
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
×
1464

UNCOV
1465
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1466
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1467
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
×
1468

1469
    // vg id
UNCOV
1470
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1471
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1472
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
×
1473

1474
    // consumer id
UNCOV
1475
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
×
UNCOV
1476
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
×
UNCOV
1477
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
×
1478

UNCOV
1479
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1480
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1481
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
×
1482

UNCOV
1483
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1484
    if (user) STR_TO_VARSTR(userStr, user);
×
UNCOV
1485
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1486
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1487
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
×
1488

UNCOV
1489
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1490
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
×
UNCOV
1491
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1492
    MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1493
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
×
1494

UNCOV
1495
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
×
1496
          varDataVal(cgroup), pVgEp->vgId);
1497

1498
    // offset
UNCOV
1499
    OffsetRows *data = NULL;
×
UNCOV
1500
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
×
UNCOV
1501
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
×
UNCOV
1502
      MND_TMQ_NULL_CHECK(tmp);
×
UNCOV
1503
      if (tmp->vgId != pVgEp->vgId) {
×
UNCOV
1504
        mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
×
UNCOV
1505
        continue;
×
1506
      }
UNCOV
1507
      data = tmp;
×
1508
    }
UNCOV
1509
    if (data) {
×
1510
      // vg id
UNCOV
1511
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1512
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
×
UNCOV
1513
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
×
UNCOV
1514
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
×
UNCOV
1515
      varDataSetLen(buf, strlen(varDataVal(buf)));
×
UNCOV
1516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1517
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1518
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
×
UNCOV
1519
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1520
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1521
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
×
1522
    } else {
UNCOV
1523
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1524
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1525
      colDataSetNULL(pColInfo, *numOfRows);
×
UNCOV
1526
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1527
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1528
      colDataSetNULL(pColInfo, *numOfRows);
×
UNCOV
1529
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
×
1530
    }
UNCOV
1531
    (*numOfRows)++;
×
1532
  }
UNCOV
1533
  return 0;
×
1534
END:
×
1535
  return code;
×
1536
}
1537

UNCOV
1538
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
1539
  if (pReq == NULL || pShow == NULL || pBlock == NULL){
×
1540
    return TSDB_CODE_INVALID_PARA;
×
1541
  }
UNCOV
1542
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
1543
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
1544
  int32_t          numOfRows = 0;
×
UNCOV
1545
  SMqSubscribeObj *pSub = NULL;
×
UNCOV
1546
  int32_t          code = 0;
×
1547

UNCOV
1548
  mInfo("mnd show subscriptions begin");
×
1549

UNCOV
1550
  while (numOfRows < rowsCapacity) {
×
UNCOV
1551
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
×
UNCOV
1552
    if (pShow->pIter == NULL) {
×
UNCOV
1553
      break;
×
1554
    }
1555

UNCOV
1556
    taosRLockLatch(&pSub->lock);
×
1557

UNCOV
1558
    if (numOfRows + pSub->vgNum > rowsCapacity) {
×
1559
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum))  ;
×
1560
    }
1561

1562
    // topic and cgroup
UNCOV
1563
    char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1564
    char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1565
    mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
×
UNCOV
1566
    varDataSetLen(topic, strlen(varDataVal(topic)));
×
UNCOV
1567
    varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
×
1568

UNCOV
1569
    SMqConsumerEp *pConsumerEp = NULL;
×
UNCOV
1570
    void          *pIter = NULL;
×
1571

UNCOV
1572
    while (1) {
×
UNCOV
1573
      pIter = taosHashIterate(pSub->consumerHash, pIter);
×
UNCOV
1574
      if (pIter == NULL) break;
×
UNCOV
1575
      pConsumerEp = (SMqConsumerEp *)pIter;
×
1576

UNCOV
1577
      char          *user = NULL;
×
UNCOV
1578
      char          *fqdn = NULL;
×
UNCOV
1579
      SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
×
UNCOV
1580
      if (pConsumer != NULL) {
×
UNCOV
1581
        user = pConsumer->user;
×
UNCOV
1582
        fqdn = pConsumer->fqdn;
×
UNCOV
1583
        sdbRelease(pSdb, pConsumer);
×
1584
      }
UNCOV
1585
      MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
×
1586
                  pConsumerEp->offsetRows));
1587
    }
1588

UNCOV
1589
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
×
1590

UNCOV
1591
    pBlock->info.rows = numOfRows;
×
1592

UNCOV
1593
    taosRUnLockLatch(&pSub->lock);
×
UNCOV
1594
    sdbRelease(pSdb, pSub);
×
1595
  }
1596

UNCOV
1597
  mInfo("mnd end show subscriptions");
×
1598

UNCOV
1599
  pShow->numOfRows += numOfRows;
×
UNCOV
1600
  return numOfRows;
×
1601

1602
END:
×
1603
  taosRUnLockLatch(&pSub->lock);
×
1604
  sdbRelease(pSdb, pSub);
×
1605

1606
  return code;
×
1607
}
1608

1609
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1610
  if (pMnode == NULL) {
×
1611
    return;
×
1612
  }
1613
  SSdb *pSdb = pMnode->pSdb;
×
1614
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1615
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc