• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.22
/source/dnode/mnode/impl/src/mndSubscribe.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndSubscribe.h"
18
#include "mndConsumer.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTopic.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_SUBSCRIBE_VER_NUMBER   3
29
#define MND_SUBSCRIBE_RESERVE_SIZE 64
30

31
//#define MND_CONSUMER_LOST_HB_CNT          6
32

33
static int32_t mqRebInExecCnt = 0;
34

35
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
36
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
37
static int32_t  mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
38
static int32_t  mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
39
static int32_t  mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
40
static int32_t  mndProcessRebalanceReq(SRpcMsg *pMsg);
41
static int32_t  mndProcessDropCgroupReq(SRpcMsg *pMsg);
42
static int32_t  mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
43
static void     mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
44
static int32_t  mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash);
45

46
static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) {
×
47
  if (pTrans == NULL || pSub == NULL) {
×
48
    return TSDB_CODE_INVALID_PARA;
×
49
  }
50
  int32_t code = 0;
×
51
  int32_t lino = 0;
×
52
  PRINT_LOG_START
×
53
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
×
54
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
55
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
56
  if (code != 0) {
×
57
    sdbFreeRaw(pCommitRaw);
×
58
    goto END;
×
59
  }
60
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
×
61

62
END:
×
63
  PRINT_LOG_END
×
64
  return code;
×
65
}
66

67
int32_t mndInitSubscribe(SMnode *pMnode) {
16✔
68
  SSdbTable table = {
16✔
69
      .sdbType = SDB_SUBSCRIBE,
70
      .keyType = SDB_KEY_BINARY,
71
      .encodeFp = (SdbEncodeFp)mndSubActionEncode,
72
      .decodeFp = (SdbDecodeFp)mndSubActionDecode,
73
      .insertFp = (SdbInsertFp)mndSubActionInsert,
74
      .updateFp = (SdbUpdateFp)mndSubActionUpdate,
75
      .deleteFp = (SdbDeleteFp)mndSubActionDelete,
76
  };
77

78
  if (pMnode == NULL) {
16✔
79
    return TSDB_CODE_INVALID_PARA;
×
80
  }
81
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
16✔
82
  mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
16✔
83
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
16✔
84
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
16✔
85
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
16✔
86

87
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
16✔
88
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
16✔
89

90
  return sdbSetTable(pMnode->pSdb, table);
16✔
91
}
92

93
int32_t mndSchedInitSubEp(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
×
94
  int32_t code = 0;
×
95
  SSdb *  pSdb = pMnode->pSdb;
×
96
  SVgObj *pVgroup = NULL;
×
97

98
  void *pIter = NULL;
×
99
  while (1) {
×
100
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
101
    if (pIter == NULL) {
×
102
      break;
×
103
    }
104

105
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
×
106
      sdbRelease(pSdb, pVgroup);
×
107
      continue;
×
108
    }
109

110
    pSub->vgNum++;
×
111

112
    SMqVgEp pVgEp = {0};
×
113
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
114
    pVgEp.vgId = pVgroup->vgId;
×
115
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL) {
×
116
      code = terrno;
×
117
      sdbRelease(pSdb, pVgroup);
×
118
      sdbCancelFetch(pSdb, pIter);
×
119
      goto END;
×
120
    }
121
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp.vgId);
×
122
    sdbRelease(pSdb, pVgroup);
×
123
  }
124

125
END:
×
126
  return code;
×
127
}
128

129
static int32_t mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey,
×
130
                                     SMqSubscribeObj **pSub) {
131
  if (pMnode == NULL || pTopic == NULL || subKey == NULL || pSub == NULL) {
×
132
    return TSDB_CODE_INVALID_PARA;
×
133
  }
134
  int32_t lino = 0;
×
135
  int32_t code = 0;
×
136
  PRINT_LOG_START
×
137
  MND_TMQ_RETURN_CHECK(tNewSubscribeObj(subKey, pSub));
×
138
  (*pSub)->dbUid = pTopic->dbUid;
×
139
  (*pSub)->stbUid = pTopic->stbUid;
×
140
  (*pSub)->subType = pTopic->subType;
×
141
  (*pSub)->withMeta = pTopic->withMeta;
×
142

143
  MND_TMQ_RETURN_CHECK(mndSchedInitSubEp(pMnode, pTopic, *pSub));
×
144

145
END:
×
146
  PRINT_LOG_END
×
147
  return code;
×
148
}
149

150
static void mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
×
151
  if (key == NULL || topic == NULL || cgroup == NULL) {
×
152
    return;
×
153
  }
154
  int32_t i = 0;
×
155
  while (key[i] != TMQ_SEPARATOR_CHAR) {
×
156
    i++;
×
157
  }
158
  (void)memcpy(cgroup, key, i);
×
159
  cgroup[i] = 0;
×
160
  if (fullName) {
×
161
    tstrncpy(topic, &key[i + 1], TSDB_TOPIC_FNAME_LEN);
×
162
  } else {
163
    while (key[i] != '.') {
×
164
      i++;
×
165
    }
166
    tstrncpy(topic, &key[i + 1], TSDB_CGROUP_LEN);
×
167
  }
168
}
169

170
static int32_t mndBuildSubChangeReq(SMnode *pMnode, void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
×
171
                                    const SMqRebOutputVg *pRebVg) {
172
  if (pSub == NULL || pRebVg == NULL || pBuf == NULL || pLen == NULL) {
×
173
    return TSDB_CODE_INVALID_PARA;
×
174
  }
175
  SMqRebVgReq  req = {0};
×
176
  int32_t      code = 0;
×
177
  int32_t      lino = 0;
×
178
  SEncoder     encoder = {0};
×
179
  SMqTopicObj *pTopic = NULL;
×
180
  void *       buf = NULL;
×
181

182
  PRINT_LOG_START
×
183
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
184
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
185
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
186
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
×
187
  taosRLockLatch(&pTopic->lock);
×
188
  req.oldConsumerId = pRebVg->oldConsumerId;
×
189
  req.newConsumerId = pRebVg->newConsumerId;
×
190
  req.vgId = pRebVg->pVgEp.vgId;
×
191
  req.qmsg = pTopic->physicalPlan;
×
192
  req.schema = pTopic->schema;
×
193
  req.subType = pSub->subType;
×
194
  req.withMeta = pSub->withMeta;
×
195
  req.suid = pSub->stbUid;
×
196
  tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
197

198
  int32_t tlen = 0;
×
199
  tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, code);
×
200
  if (code < 0) {
×
201
    goto END;
×
202
  }
203

204
  tlen += sizeof(SMsgHead);
×
205
  buf = taosMemoryMalloc(tlen);
×
206
  MND_TMQ_NULL_CHECK(buf);
×
207
  SMsgHead *pMsgHead = (SMsgHead *)buf;
×
208
  pMsgHead->contLen = htonl(tlen);
×
209
  pMsgHead->vgId = htonl(pRebVg->pVgEp.vgId);
×
210

211
  tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen);
×
212
  MND_TMQ_RETURN_CHECK(tEncodeSMqRebVgReq(&encoder, &req));
×
213
  *pBuf = buf;
×
214
  buf = NULL;
×
215
  *pLen = tlen;
×
216

217
END:
×
218
  PRINT_LOG_END
×
219
  taosMemoryFree(buf);
×
220
  if (pTopic != NULL) {
×
221
    taosRUnLockLatch(&pTopic->lock);
×
222
  }
223
  mndReleaseTopic(pMnode, pTopic);
×
224
  tEncoderClear(&encoder);
×
225
  return code;
×
226
}
227

228
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
×
229
                                        const SMqRebOutputVg *pRebVg) {
230
  if (pMnode == NULL || pTrans == NULL || pSub == NULL || pRebVg == NULL) {
×
231
    return TSDB_CODE_INVALID_PARA;
×
232
  }
233
  int32_t code = 0;
×
234
  int32_t lino = 0;
×
235
  void *  buf = NULL;
×
236
  PRINT_LOG_START
×
237
  if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
×
238
    if (pRebVg->oldConsumerId == -1) return 0;  // drop stream, no consumer, while split vnode,all consumerId is -1
×
239
    code = TSDB_CODE_MND_INVALID_SUB_OPTION;
×
240
    goto END;
×
241
  }
242

243
  int32_t tlen = 0;
×
244
  MND_TMQ_RETURN_CHECK(mndBuildSubChangeReq(pMnode, &buf, &tlen, pSub, pRebVg));
×
245
  int32_t vgId = pRebVg->pVgEp.vgId;
×
246
  SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
×
247
  if (pVgObj == NULL) {
×
248
    code = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
249
    goto END;
×
250
  }
251

252
  STransAction action = {0};
×
253
  action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
×
254
  action.pCont = buf;
×
255
  buf = NULL;
×
256
  action.contLen = tlen;
×
257
  action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
×
258

259
  mndReleaseVgroup(pMnode, pVgObj);
×
260
  MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
×
261

262
END:
×
263
  PRINT_LOG_END
×
264
  taosMemoryFree(buf);
×
265
  return code;
×
266
}
267

268
static void freeRebalanceItem(void *param) {
×
269
  if (param == NULL) return;
×
270
  SMqRebInfo *pInfo = param;
×
271
  taosArrayDestroy(pInfo->newConsumers);
×
272
  taosArrayDestroy(pInfo->removedConsumers);
×
273
}
274

275
static int32_t mndGetOrCreateRebSub(SHashObj *pHash, const char *key, SMqRebInfo **pReb) {
×
276
  if (pHash == NULL || key == NULL) {
×
277
    return TSDB_CODE_INVALID_PARA;
×
278
  }
279
  int32_t code = 0;
×
280
  int32_t lino = 0;
×
281
  PRINT_LOG_START
×
282
  SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
×
283
  if (pRebInfo == NULL) {
×
284
    pRebInfo = tNewSMqRebSubscribe(key);
×
285
    if (pRebInfo == NULL) {
×
286
      code = terrno;
×
287
      goto END;
×
288
    }
289
    code = taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
×
290
    if (code != 0) {
×
291
      freeRebalanceItem(pRebInfo);
×
292
      taosMemoryFreeClear(pRebInfo);
×
293
      goto END;
×
294
    }
295
    taosMemoryFreeClear(pRebInfo);
×
296

297
    pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
×
298
    MND_TMQ_NULL_CHECK(pRebInfo);
×
299
  }
300
  if (pReb) {
×
301
    *pReb = pRebInfo;
×
302
  }
303

304
END:
×
305
  PRINT_LOG_END
×
306
  return code;
×
307
}
308

309
static int32_t pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char *key) {
×
310
  if (vgs == NULL || pHash == NULL || key == NULL) {
×
311
    return TSDB_CODE_INVALID_PARA;
×
312
  }
313
  int32_t code = 0;
×
314
  int32_t lino = 0;
×
315
  PRINT_LOG_START
×
316
  SMqVgEp *pVgEp = (SMqVgEp *)taosArrayPop(vgs);
×
317
  MND_TMQ_NULL_CHECK(pVgEp);
×
318
  SMqRebOutputVg outputVg = {consumerId, -1, *pVgEp};
×
319
  MND_TMQ_RETURN_CHECK(taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)));
×
320
  mInfo("tmq rebalance sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, key, pVgEp->vgId, consumerId);
×
321

322
END:
×
323
  PRINT_LOG_END
×
324
  return code;
×
325
}
326

327
static int32_t processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
×
328
  if (pHash == NULL || pOutput == NULL || pInput == NULL) {
×
329
    return TSDB_CODE_INVALID_PARA;
×
330
  }
331
  int32_t code = 0;
×
332
  int32_t lino = 0;
×
333
  PRINT_LOG_START
×
334
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
×
335
  int32_t actualRemoved = 0;
×
336
  for (int32_t i = 0; i < numOfRemoved; i++) {
×
337
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
×
338
    MND_TMQ_NULL_CHECK(consumerId);
×
339
    SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t));
×
340
    if (pConsumerEp == NULL) {
×
341
      continue;
×
342
    }
343

344
    int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
×
345
    for (int32_t j = 0; j < consumerVgNum; j++) {
×
346
      MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, *consumerId, pOutput->pSub->key));
×
347
    }
348

349
    MND_TMQ_RETURN_CHECK(taosHashRemove(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t)));
×
350
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->removedConsumers, consumerId));
×
351
    actualRemoved++;
×
352
  }
353

354
  if (numOfRemoved != actualRemoved) {
×
355
    mError("tmq rebalance sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key,
×
356
           numOfRemoved, actualRemoved);
357
  } else {
358
    mInfo("tmq rebalance sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved);
×
359
  }
360
END:
×
361
  PRINT_LOG_END
×
362
  return code;
×
363
}
364

365
static int32_t processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
×
366
  if (pOutput == NULL || pInput == NULL) {
×
367
    return TSDB_CODE_INVALID_PARA;
×
368
  }
369
  int32_t code = 0;
×
370
  int32_t lino = 0;
×
371
  PRINT_LOG_START
×
372
  int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
×
373

374
  for (int32_t i = 0; i < numOfNewConsumers; i++) {
×
375
    int64_t *consumerId = (int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
×
376
    MND_TMQ_NULL_CHECK(consumerId);
×
377
    SMqConsumerEp newConsumerEp = {0};
×
378
    newConsumerEp.consumerId = *consumerId;
×
379
    newConsumerEp.vgs = taosArrayInit(0, sizeof(SMqVgEp));
×
380
    MND_TMQ_NULL_CHECK(newConsumerEp.vgs);
×
381
    if (taosHashPut(pOutput->pSub->consumerHash, consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)) !=
×
382
        0) {
383
      freeSMqConsumerEp(&newConsumerEp);
×
384
      code = terrno;
×
385
      goto END;
×
386
    }
387
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->newConsumers, consumerId));
×
388
    mInfo("tmq rebalance sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, *consumerId);
×
389
  }
390
END:
×
391
  PRINT_LOG_END
×
392
  return code;
×
393
}
394

395
static int32_t processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
×
396
  if (pOutput == NULL || pHash == NULL) {
×
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399
  int32_t code = 0;
×
400
  int32_t lino = 0;
×
401
  PRINT_LOG_START
×
402
  int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
×
403
  for (int32_t i = 0; i < numOfVgroups; i++) {
×
404
    MND_TMQ_RETURN_CHECK(pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key));
×
405
  }
406
END:
×
407
  PRINT_LOG_END
×
408
  return code;
×
409
}
410

411
static int32_t processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
×
412
                                        int32_t remainderVgCnt) {
413
  if (pOutput == NULL || pHash == NULL) {
×
414
    return TSDB_CODE_INVALID_PARA;
×
415
  }
416
  int32_t code = 0;
×
417
  int32_t lino = 0;
×
418
  int32_t cnt = 0;
×
419
  void *  pIter = NULL;
×
420
  PRINT_LOG_START
×
421

422
  while (1) {
×
423
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
424
    if (pIter == NULL) {
×
425
      break;
×
426
    }
427

428
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
429
    int32_t        consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
×
430

431
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId));
×
432
    if (consumerVgNum > minVgCnt) {
×
433
      if (cnt < remainderVgCnt) {
×
434
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {  // pop until equal minVg + 1
×
435
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
436
        }
437
        cnt++;
×
438
      } else {
439
        while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
×
440
          MND_TMQ_RETURN_CHECK(pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key));
×
441
        }
442
      }
443
    }
444
  }
445
END:
×
446
  PRINT_LOG_END
×
447
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
×
448
  return code;
×
449
}
450

451
static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
×
452
  if (pMnode == NULL || pOutput == NULL) {
×
453
    return TSDB_CODE_INVALID_PARA;
×
454
  }
455
  int32_t code = 0;
×
456
  int32_t lino = 0;
×
457
  int32_t totalVgNum = 0;
×
458
  SVgObj *pVgroup = NULL;
×
459
  void *  pIter = NULL;
×
460
  void *  pIterHash = NULL;
×
461
  SArray *newVgs = taosArrayInit(0, sizeof(SMqVgEp));
×
462
  MND_TMQ_NULL_CHECK(newVgs);
×
463
  while (1) {
×
464
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
465
    if (pIter == NULL) {
×
466
      break;
×
467
    }
468
    if (pVgroup->mountVgId) {
×
469
      sdbRelease(pMnode->pSdb, pVgroup);
×
470
      continue;
×
471
    }
472

473
    if (!mndVgroupInDb(pVgroup, pOutput->pSub->dbUid)) {
×
474
      sdbRelease(pMnode->pSdb, pVgroup);
×
475
      continue;
×
476
    }
477

478
    totalVgNum++;
×
479
    SMqVgEp pVgEp = {0};
×
480
    pVgEp.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
481
    pVgEp.vgId = pVgroup->vgId;
×
482
    MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
×
483
    sdbRelease(pMnode->pSdb, pVgroup);
×
484
  }
485

486
  while (1) {
×
487
    pIterHash = taosHashIterate(pOutput->pSub->consumerHash, pIterHash);
×
488
    if (pIterHash == NULL) break;
×
489
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIterHash;
×
490
    int32_t        j = 0;
×
491
    while (j < taosArrayGetSize(pConsumerEp->vgs)) {
×
492
      SMqVgEp *pVgEpTmp = taosArrayGet(pConsumerEp->vgs, j);
×
493
      MND_TMQ_NULL_CHECK(pVgEpTmp);
×
494
      bool find = false;
×
495
      for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
×
496
        SMqVgEp *pnewVgEp = taosArrayGet(newVgs, k);
×
497
        MND_TMQ_NULL_CHECK(pnewVgEp);
×
498
        if (pVgEpTmp->vgId == pnewVgEp->vgId) {
×
499
          taosArrayRemove(newVgs, k);
×
500
          find = true;
×
501
          break;
×
502
        }
503
      }
504
      if (!find) {
×
505
        mInfo("tmq rebalance processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
×
506
        taosArrayRemove(pConsumerEp->vgs, j);
×
507
        continue;
×
508
      }
509
      j++;
×
510
    }
511
  }
512

513
  if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
×
514
    MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
×
515
    mInfo("tmq rebalance processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
×
516
  }
517

518
END:
×
519
  sdbRelease(pMnode->pSdb, pVgroup);
×
520
  sdbCancelFetch(pMnode->pSdb, pIter);
×
521
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIterHash);
×
522
  taosArrayDestroy(newVgs);
×
523
  if (code != 0) {
×
524
    mError("tmq rebalance processRemoveAddVgs failed, code:%d", code);
×
525
    return code;
×
526
  } else {
527
    mInfo("tmq rebalance processRemoveAddVgs completed, total vg num:%d", totalVgNum);
×
528
    return totalVgNum;
×
529
  }
530
}
531

532
static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
×
533
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
×
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  void *           pIter = NULL;
×
537
  SMqSubscribeObj *pSub = NULL;
×
538
  int32_t          lino = 0;
×
539
  int32_t          code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub);  // put all offset rows
×
540
  if (code != 0) {
×
541
    return 0;
×
542
  }
543
  taosRLockLatch(&pSub->lock);
×
544
  PRINT_LOG_START
×
545
  if (pOutput->pSub->offsetRows == NULL) {
×
546
    pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
×
547
    MND_TMQ_NULL_CHECK(pOutput->pSub->offsetRows);
×
548
  }
549
  while (1) {
×
550
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
551
    if (pIter == NULL) break;
×
552
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
553
    SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
×
554

555
    for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
×
556
      OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
×
557
      MND_TMQ_NULL_CHECK(d1);
×
558
      bool jump = false;
×
559
      for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++) {
×
560
        SMqVgEp *pVgEp = taosArrayGet(pConsumerEpNew->vgs, i);
×
561
        MND_TMQ_NULL_CHECK(pVgEp);
×
562
        if (pVgEp->vgId == d1->vgId) {
×
563
          jump = true;
×
564
          mInfo("pSub->offsetRows jump, because consumer id:0x%" PRIx64 " and vgId:%d not change",
×
565
                pConsumerEp->consumerId, pVgEp->vgId);
566
          break;
×
567
        }
568
      }
569
      if (jump) continue;
×
570
      bool find = false;
×
571
      for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
×
572
        OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
×
573
        MND_TMQ_NULL_CHECK(d2);
×
574
        if (d1->vgId == d2->vgId) {
×
575
          d2->rows += d1->rows;
×
576
          d2->offset = d1->offset;
×
577
          d2->ever = d1->ever;
×
578
          find = true;
×
579
          mInfo("pSub->offsetRows add vgId:%d, after:%" PRId64 ", before:%" PRId64, d2->vgId, d2->rows, d1->rows);
×
580
          break;
×
581
        }
582
      }
583
      if (!find) {
×
584
        MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->pSub->offsetRows, d1));
×
585
      }
586
    }
587
  }
588

589
END:
×
590
  taosRUnLockLatch(&pSub->lock);
×
591
  taosHashCancelIterate(pSub->consumerHash, pIter);
×
592
  mndReleaseSubscribe(pMnode, pSub);
×
593
  PRINT_LOG_END
×
594
  return code;
×
595
}
596

597
static void printRebalanceLog(SMqRebOutputObj *pOutput) {
×
598
  if (pOutput == NULL) return;
×
599
  mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
×
600
  for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
×
601
    SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
×
602
    if (pOutputRebVg == NULL) continue;
×
603
    mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
×
604
          pOutputRebVg->pVgEp.vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
605
  }
606

607
  void *pIter = NULL;
×
608
  while (1) {
×
609
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
610
    if (pIter == NULL) break;
×
611
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
612
    int32_t        sz = taosArrayGetSize(pConsumerEp->vgs);
×
613
    mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key,
×
614
          pConsumerEp->consumerId, sz);
615
    for (int32_t i = 0; i < sz; i++) {
×
616
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
617
      if (pVgEp == NULL) continue;
×
618
      mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
×
619
            pConsumerEp->consumerId);
620
    }
621
  }
622
}
623

624
static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, int32_t *minVgCnt,
×
625
                           int32_t *remainderVgCnt) {
626
  if (pInput == NULL || pSubKey == NULL || minVgCnt == NULL || remainderVgCnt == NULL) {
×
627
    return;
×
628
  }
629
  int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
×
630
  int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
×
631
  int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
×
632

633
  // calc num
634
  if (numOfFinal != 0) {
×
635
    *minVgCnt = totalVgNum / numOfFinal;
×
636
    *remainderVgCnt = totalVgNum % numOfFinal;
×
637
  } else {
638
    mInfo("tmq rebalance sub:%s no consumer subscribe this topic", pSubKey);
×
639
  }
640
  mInfo(
×
641
      "tmq rebalance sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d "
642
      "remainderVg:%d",
643
      pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt);
644
}
645

646
static int32_t assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt) {
×
647
  if (pOutput == NULL || pHash == NULL) {
×
648
    return TSDB_CODE_INVALID_PARA;
×
649
  }
650
  SMqRebOutputVg *pRebVg = NULL;
×
651
  void *          pAssignIter = NULL;
×
652
  void *          pIter = NULL;
×
653
  int32_t         code = 0;
×
654
  int32_t         lino = 0;
×
655
  PRINT_LOG_START
×
656

657
  while (1) {
×
658
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
659
    if (pIter == NULL) {
×
660
      break;
×
661
    }
662
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
663
    while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
×
664
      pAssignIter = taosHashIterate(pHash, pAssignIter);
×
665
      if (pAssignIter == NULL) {
×
666
        mError("tmq rebalance sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
×
667
        break;
×
668
      }
669

670
      pRebVg = (SMqRebOutputVg *)pAssignIter;
×
671
      pRebVg->newConsumerId = pConsumerEp->consumerId;
×
672
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
×
673
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp.vgId,
×
674
            pConsumerEp->consumerId);
675
    }
676
  }
677

678
  while (1) {
×
679
    pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
×
680
    if (pIter == NULL) {
×
681
      break;
×
682
    }
683
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
×
684
    if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
×
685
      pAssignIter = taosHashIterate(pHash, pAssignIter);
×
686
      if (pAssignIter == NULL) {
×
687
        mInfo("tmq rebalance sub:%s assign iter is used up", pOutput->pSub->key);
×
688
        break;
×
689
      }
690

691
      pRebVg = (SMqRebOutputVg *)pAssignIter;
×
692
      pRebVg->newConsumerId = pConsumerEp->consumerId;
×
693
      MND_TMQ_NULL_CHECK(taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp));
×
694
      mInfo("tmq rebalance mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp.vgId,
×
695
            pConsumerEp->consumerId);
696
    }
697
  }
698

699
  if (pAssignIter != NULL) {
×
700
    mError("tmq rebalancesub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
×
701
    code = TSDB_CODE_MND_INTERNAL_ERROR;
×
702
    goto END;
×
703
  }
704
  while (1) {
×
705
    pAssignIter = taosHashIterate(pHash, pAssignIter);
×
706
    if (pAssignIter == NULL) {
×
707
      break;
×
708
    }
709

710
    SMqRebOutputVg *pRebOutput = (SMqRebOutputVg *)pAssignIter;
×
711
    MND_TMQ_NULL_CHECK(taosArrayPush(pOutput->rebVgs, pRebOutput));
×
712
    if (taosHashGetSize(pOutput->pSub->consumerHash) == 0) {  // if all consumer is removed
×
713
      MND_TMQ_NULL_CHECK(
×
714
          taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp));  // put all vg into unassigned
715
    }
716
  }
717

718
END:
×
719
  taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
×
720
  taosHashCancelIterate(pHash, pAssignIter);
×
721
  PRINT_LOG_END
×
722
  return code;
×
723
}
724

725
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
×
726
  if (pMnode == NULL || pInput == NULL || pOutput == NULL) {
×
727
    return TSDB_CODE_INVALID_PARA;
×
728
  }
729
  int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput);
×
730
  if (totalVgNum < 0) {
×
731
    return totalVgNum;
×
732
  }
733
  const char *pSubKey = pOutput->pSub->key;
×
734
  int32_t     minVgCnt = 0;
×
735
  int32_t     remainderVgCnt = 0;
×
736
  int32_t     code = 0;
×
737
  int32_t     lino = 0;
×
738
  PRINT_LOG_START
×
739
  SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
740
  MND_TMQ_NULL_CHECK(pHash);
×
741
  MND_TMQ_RETURN_CHECK(processRemovedConsumers(pOutput, pHash, pInput));
×
742
  MND_TMQ_RETURN_CHECK(processUnassignedVgroups(pOutput, pHash));
×
743
  calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt);
×
744
  MND_TMQ_RETURN_CHECK(processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt));
×
745
  MND_TMQ_RETURN_CHECK(processNewConsumers(pOutput, pInput));
×
746
  MND_TMQ_RETURN_CHECK(assignVgroups(pOutput, pHash, minVgCnt));
×
747
  MND_TMQ_RETURN_CHECK(processSubOffsetRows(pMnode, pInput, pOutput));
×
748
  printRebalanceLog(pOutput);
×
749

750
END:
×
751
  taosHashCleanup(pHash);
×
752
  PRINT_LOG_END
×
753
  return code;
×
754
}
755

756
static int32_t presistConsumerByType(STrans *pTrans, SArray *consumers, int8_t type, char *cgroup, char *topic) {
×
757
  if (pTrans == NULL || consumers == NULL || cgroup == NULL) {
×
758
    return TSDB_CODE_INVALID_PARA;
×
759
  }
760
  int32_t code = 0;
×
761
  int32_t lino = 0;
×
762
  PRINT_LOG_START
×
763
  SMqConsumerObj *pConsumerNew = NULL;
×
764
  int32_t         consumerNum = taosArrayGetSize(consumers);
×
765
  for (int32_t i = 0; i < consumerNum; i++) {
×
766
    int64_t *consumerId = (int64_t *)taosArrayGet(consumers, i);
×
767
    MND_TMQ_NULL_CHECK(consumerId);
×
768
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(*consumerId, cgroup, type, topic, NULL, &pConsumerNew));
×
769
    MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
×
770
    tDeleteSMqConsumerObj(pConsumerNew);
×
771
  }
772
  pConsumerNew = NULL;
×
773

774
END:
×
775
  PRINT_LOG_END
×
776
  tDeleteSMqConsumerObj(pConsumerNew);
×
777
  return code;
×
778
}
779

780
static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic) {
×
781
  if (pTrans == NULL || pOutput == NULL || cgroup == NULL || topic == NULL) {
×
782
    return TSDB_CODE_INVALID_PARA;
×
783
  }
784
  int32_t code = 0;
×
785
  int32_t lino = 0;
×
786
  PRINT_LOG_START
×
787
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL));
×
788
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic));
×
789
  MND_TMQ_RETURN_CHECK(presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic));
×
790
END:
×
791
  PRINT_LOG_END
×
792
  return code;
×
793
}
794

795
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
×
796
  if (pMnode == NULL || pMsg == NULL || pOutput == NULL) {
×
797
    return TSDB_CODE_INVALID_PARA;
×
798
  }
799
  int32_t code = 0;
×
800
  int32_t lino = 0;
×
801
  STrans *pTrans = NULL;
×
802
  PRINT_LOG_START
×
803

804
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
805
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
806
  mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
×
807

808
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
×
809
  if (pTrans == NULL) {
×
810
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
811
    if (terrno != 0) code = terrno;
×
812
    goto END;
×
813
  }
814

815
  mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key);
×
816
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
817

818
  // 1. redo action: action to all vg
819
  const SArray *rebVgs = pOutput->rebVgs;
×
820
  int32_t       vgNum = taosArrayGetSize(rebVgs);
×
821
  for (int32_t i = 0; i < vgNum; i++) {
×
822
    SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
×
823
    MND_TMQ_NULL_CHECK(pRebVg);
×
824
    MND_TMQ_RETURN_CHECK(mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg));
×
825
  }
826

827
  // 2. commit log: subscribe and vg assignment
828
  MND_TMQ_RETURN_CHECK(mndSetSubCommitLogs(pTrans, pOutput->pSub));
×
829

830
  // 3. commit log: consumer to update status and epoch
831
  if (!pOutput->isReload) {
×
832
    MND_TMQ_RETURN_CHECK(mndPresistConsumer(pTrans, pOutput, cgroup, topic));
×
833
  }
834

835
  // 4. set cb
836
  mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
×
837

838
  // 5. execution
839
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
840

841
END:
×
842
  mndTransDrop(pTrans);
×
843
  PRINT_LOG_END
×
844
  TAOS_RETURN(code);
×
845
}
846

847
// type = 0 remove  type = 1 add
848
static int32_t buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, SMqConsumerObj *pConsumer) {
×
849
  if (rebSubHash == NULL || topicList == NULL) {
×
850
    return TSDB_CODE_INVALID_PARA;
×
851
  }
852
  int32_t code = 0;
×
853
  int32_t lino = 0;
×
854
  PRINT_LOG_START
×
855
  int32_t topicNum = taosArrayGetSize(topicList);
×
856
  for (int32_t i = 0; i < topicNum; i++) {
×
857
    char *removedTopic = taosArrayGetP(topicList, i);
×
858
    MND_TMQ_NULL_CHECK(removedTopic);
×
859
    char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
860
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, removedTopic);
×
861
    SMqRebInfo *pRebSub = NULL;
×
862
    MND_TMQ_RETURN_CHECK(mndGetOrCreateRebSub(rebSubHash, key, &pRebSub));
×
863
    if (type == 0)
×
864
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId));
×
865
    else if (type == 1)
×
866
      MND_TMQ_NULL_CHECK(taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId));
×
867
  }
868

869
END:
×
870
  PRINT_LOG_END
×
871
  return code;
×
872
}
873

874
static void checkOneTopic(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash, const char *topic) {
×
875
  SMqSubscribeObj *pSub = NULL;
×
876
  char             key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
877
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
×
878
  int32_t code = 0;
×
879
  int32_t lino = 0;
×
880

881
  MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub));
×
882
  taosRLockLatch(&pSub->lock);
×
883
  // iterate all vg assigned to the consumer of that topic
884
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
×
885
  MND_TMQ_NULL_CHECK(pConsumerEp);
×
886
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
×
887
  for (int32_t j = 0; j < vgNum; j++) {
×
888
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
×
889
    if (pVgEp == NULL) {
×
890
      continue;
×
891
    }
892
    SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
×
893
    if (!pVgroup) {
×
894
      code = mndGetOrCreateRebSub(rebSubHash, key, NULL);
×
895
      if (code != 0) {
×
896
        mError("failed to mndGetOrCreateRebSub vgroup:%d, error:%s", pVgEp->vgId, tstrerror(code))
×
897
      } else {
898
        mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
×
899
      }
900
    }
901
    mndReleaseVgroup(pMnode, pVgroup);
×
902
  }
903

904
END:
×
905
  if (pSub != NULL) {
×
906
    taosRUnLockLatch(&pSub->lock);
×
907
  }
908
  mndReleaseSubscribe(pMnode, pSub);
×
909
}
×
910

911
static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj *rebSubHash) {
×
912
  if (pMnode == NULL || pConsumer == NULL || rebSubHash == NULL) {
×
913
    return;
×
914
  }
915
  int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
×
916
  for (int32_t i = 0; i < newTopicNum; i++) {
×
917
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
918
    if (topic == NULL) {
×
919
      continue;
×
920
    }
921
    checkOneTopic(pMnode, pConsumer, rebSubHash, topic);
×
922
  }
923
}
924

925
static bool isOffLine(int32_t hbStatus, int32_t pollStatus, SMqConsumerObj *pConsumer) {
×
926
  return hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
×
927
               pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs;
×
928
}
929

930
static int32_t checkOneConsumer(SMqConsumerObj *pConsumer, SMnode *pMnode, SRpcMsg *pMsg, SHashObj *rebSubHash) {
×
931
  int32_t code = 0;
×
932
  int32_t lino = 0;
×
933
  PRINT_LOG_START
×
934
  taosRLockLatch(&pConsumer->lock);
×
935

936
  int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
×
937
  int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
×
938
  int32_t status = atomic_load_32(&pConsumer->status);
×
939

940
  mDebug("tmq rebalance check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
×
941
         ", hbstatus:%d, pollStatus:%d",
942
         pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
943
         hbStatus, pollStatus);
944

945
  if (status == MQ_CONSUMER_STATUS_READY) {
×
946
    if (taosArrayGetSize(pConsumer->currentTopics) == 0) {  // unsubscribe or close
×
947
      MND_TMQ_RETURN_CHECK(
×
948
          mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
949
    } else if (isOffLine(hbStatus, pollStatus, pConsumer)) {
×
950
      mInfo("tmq rebalance for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
×
951
            ", hb lost cnt:%d, or long time no poll cnt:%d",
952
            pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
953
            pConsumer->createTime, hbStatus, pollStatus);
954
      MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer));
×
955
    } else {
956
      checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
×
957
    }
958
  } else if (status == MQ_CONSUMER_STATUS_REBALANCE && !isOffLine(hbStatus, pollStatus, pConsumer)) {
×
959
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer));
×
960
    MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer));
×
961
  } else {
962
    MND_TMQ_RETURN_CHECK(
×
963
        mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
964
  }
965

966
END:
×
967
  taosRUnLockLatch(&pConsumer->lock);
×
968
  PRINT_LOG_END
×
969
  return code;
×
970
}
971

972
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
37✔
973
  if (pMsg == NULL || rebSubHash == NULL) {
37✔
974
    return TSDB_CODE_INVALID_PARA;
×
975
  }
976
  SMnode *        pMnode = pMsg->info.node;
37✔
977
  SSdb *          pSdb = pMnode->pSdb;
37✔
978
  SMqConsumerObj *pConsumer = NULL;
37✔
979
  void *          pIter = NULL;
37✔
980
  int32_t         code = 0;
37✔
981
  int32_t         lino = 0;
37✔
982
  PRINT_LOG_START
37✔
983

984
  // iterate all consumers, find all modification
985
  while (1) {
986
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
37✔
987
    if (pIter == NULL) {
37✔
988
      break;
37✔
989
    }
990
    MND_TMQ_RETURN_CHECK(checkOneConsumer(pConsumer, pMnode, pMsg, rebSubHash));
×
991
    mndReleaseConsumer(pMnode, pConsumer);
×
992
  }
993
END:
37✔
994
  PRINT_LOG_END
37✔
995
  sdbCancelFetch(pSdb, pIter);
37✔
996
  mndReleaseConsumer(pMnode, pConsumer);
37✔
997
  return code;
37✔
998
}
999

1000
bool mndRebTryStart() {
37✔
1001
  int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
37✔
1002
  if (old > 0) mInfo("tmq rebalance counter old val:%d", old) return old == 0;
37✔
1003
}
1004

1005
void mndRebCntInc() {
×
1006
  int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
×
1007
  if (val > 0) mInfo("tmq rebalance cnt inc, value:%d", val)
×
1008
}
×
1009

1010
void mndRebCntDec() {
37✔
1011
  int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
37✔
1012
  if (val > 0) mInfo("tmq rebalance cnt sub, value:%d", val)
37✔
1013
}
37✔
1014

1015
static void clearRebOutput(SMqRebOutputObj *rebOutput) {
×
1016
  if (rebOutput == NULL) {
×
1017
    return;
×
1018
  }
1019
  taosArrayDestroy(rebOutput->newConsumers);
×
1020
  rebOutput->newConsumers = NULL;
×
1021
  taosArrayDestroy(rebOutput->modifyConsumers);
×
1022
  rebOutput->modifyConsumers = NULL;
×
1023
  taosArrayDestroy(rebOutput->removedConsumers);
×
1024
  rebOutput->removedConsumers = NULL;
×
1025
  taosArrayDestroy(rebOutput->rebVgs);
×
1026
  rebOutput->rebVgs = NULL;
×
1027
  tDeleteSubscribeObj(rebOutput->pSub);
×
1028
  taosMemoryFree(rebOutput->pSub);
×
1029
  rebOutput->pSub = NULL;
×
1030
}
1031

1032
static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
×
1033
  if (rebOutput == NULL) {
×
1034
    return TSDB_CODE_INVALID_PARA;
×
1035
  }
1036
  int32_t code = 0;
×
1037
  int32_t lino = 0;
×
1038
  PRINT_LOG_START
×
1039
  rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t));
×
1040
  MND_TMQ_NULL_CHECK(rebOutput->newConsumers);
×
1041
  rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t));
×
1042
  MND_TMQ_NULL_CHECK(rebOutput->removedConsumers);
×
1043
  rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t));
×
1044
  MND_TMQ_NULL_CHECK(rebOutput->modifyConsumers);
×
1045
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1046
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1047
  rebOutput = NULL;
×
1048

1049
END:
×
1050
  PRINT_LOG_END
×
1051
  clearRebOutput(rebOutput);
×
1052
  return code;
×
1053
}
1054

1055
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) {
×
1056
  if (pMnode == NULL || rebInput == NULL || rebOutput == NULL) {
×
1057
    return TSDB_CODE_INVALID_PARA;
×
1058
  }
1059
  const char *     key = rebInput->pRebInfo->key;
×
1060
  SMqSubscribeObj *pSub = NULL;
×
1061
  SMqTopicObj *    pTopic = NULL;
×
1062
  int32_t          code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
1063
  int32_t          lino = 0;
×
1064
  PRINT_LOG_START
×
1065

1066
  if (code != 0) {
×
1067
    // split sub key and extract topic
1068
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1069
    char cgroup[TSDB_CGROUP_LEN] = {0};
×
1070
    mndSplitSubscribeKey(key, topic, cgroup, true);
×
1071
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
×
1072
    taosRLockLatch(&pTopic->lock);
×
1073
    rebInput->oldConsumerNum = 0;
×
1074
    MND_TMQ_RETURN_CHECK(mndCreateSubscription(pMnode, pTopic, key, &rebOutput->pSub));
×
1075
    (void)memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
×
1076
    mInfo("tmq rebalance sub topic:%s has no consumers sub yet", key);
×
1077
  } else {
1078
    MND_TMQ_RETURN_CHECK(tCloneSubscribeObj(pSub, &rebOutput->pSub));
×
1079
    rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
×
1080

1081
    mInfo("tmq rebalance sub topic:%s has %d consumers sub till now", key,
×
1082
          taosHashGetSize(rebOutput->pSub->consumerHash));
1083
  }
1084

1085
END:
×
1086
  PRINT_LOG_END
×
1087
  if (pTopic != NULL) {
×
1088
    taosRUnLockLatch(&pTopic->lock);
×
1089
  }
1090
  mndReleaseTopic(pMnode, pTopic);
×
1091
  mndReleaseSubscribe(pMnode, pSub);
×
1092
  return code;
×
1093
}
1094

1095
static int32_t collectVgs(SMqRebOutputObj *rebOutput, SMqSubscribeObj *pSub) {
×
1096
  int32_t code = 0;
×
1097
  int32_t lino = 0;
×
1098

1099
  void *pIterConsumer = NULL;
×
1100

1101
  PRINT_LOG_START
×
1102
  rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
×
1103
  MND_TMQ_NULL_CHECK(rebOutput->rebVgs);
×
1104

1105
  SMqConsumerEp *pConsumerEp = NULL;
×
1106

1107
  while (1) {
1108
    pIterConsumer = taosHashIterate(pSub->consumerHash, pIterConsumer);
×
1109
    if (pIterConsumer == NULL) break;
×
1110
    pConsumerEp = (SMqConsumerEp *)pIterConsumer;
×
1111

1112
    for (int32_t i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++) {
×
1113
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, i);
×
1114
      MND_TMQ_NULL_CHECK(pVgEp);
×
1115
      SMqRebOutputVg *vg = taosArrayReserve(rebOutput->rebVgs, 1);
×
1116
      MND_TMQ_NULL_CHECK(vg);
×
1117
      vg->pVgEp = *pVgEp;
×
1118
      vg->oldConsumerId = -1;
×
1119
      vg->newConsumerId = pConsumerEp->consumerId;
×
1120
      mDebug("sub:%s reload rebalance vgId:%d remains on consumer:0x%" PRIx64, pSub->key, vg->pVgEp.vgId,
×
1121
             vg->newConsumerId);
1122
    }
1123
  }
1124
END:
×
1125
  PRINT_LOG_END
×
1126
  taosHashCancelIterate(pSub->consumerHash, pIterConsumer);
×
1127
  return code;
×
1128
}
1129

1130
static int32_t rebalanceOneSub(SRpcMsg *pMsg, SMqSubscribeObj *pSub) {
×
1131
  int32_t code = 0;
×
1132
  int32_t lino = 0;
×
1133

1134
  SMnode *        pMnode = pMsg->info.node;
×
1135
  SMqRebOutputObj rebOutput = {0};
×
1136

1137
  PRINT_LOG_START
×
1138
  taosRLockLatch(&pSub->lock);
×
1139

1140
  // topic and cgroup
1141
  char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1142
  char cgroup[TSDB_CGROUP_LEN] = {0};
×
1143
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1144
  if (taosHashGet(topicsToReload, topic, strlen(topic)) == NULL) {
×
1145
    mDebug("%s topic:%s no need reload rebalance", __func__, topic);
×
1146
    goto END;
×
1147
  }
1148

1149
  rebOutput.pSub = pSub;
×
1150
  rebOutput.isReload = true;
×
1151
  MND_TMQ_RETURN_CHECK(collectVgs(&rebOutput, pSub));
×
1152
  code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1153
  if (code != 0) {
×
1154
    mError("%s error,msg:%s", __func__, tstrerror(code))
×
1155
  }
1156

1157
END:
×
1158
  taosRUnLockLatch(&pSub->lock);
×
1159
  rebOutput.pSub = NULL;  // avoid double free
×
1160
  clearRebOutput(&rebOutput);
×
1161
  PRINT_LOG_END
×
1162
  return code;
×
1163
}
1164

1165
static int32_t reloadRebalance(SRpcMsg *pMsg) {
×
1166
  SMnode *pMnode = pMsg->info.node;
×
1167

1168
  SSdb *           pSdb = pMnode->pSdb;
×
1169
  SMqSubscribeObj *pSub = NULL;
×
1170
  int32_t          code = 0;
×
1171
  int32_t          lino = 0;
×
1172

1173
  PRINT_LOG_START
×
1174
  mInfo("reloadRebalance start, total topics to reload:%d", taosHashGetSize(topicsToReload));
×
1175
  void *pIter = NULL;
×
1176
  while (1) {
1177
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1178
    if (pIter == NULL) {
×
1179
      break;
×
1180
    }
1181

1182
    MND_TMQ_RETURN_CHECK(rebalanceOneSub(pMsg, pSub));
×
1183
    sdbRelease(pSdb, pSub);
×
1184
  }
1185
  taosHashClear(topicsToReload);
×
1186
END:
×
1187
  sdbCancelFetch(pSdb, pIter);
×
1188
  sdbRelease(pSdb, pSub);
×
1189
  PRINT_LOG_END
×
1190

1191
  return code;
×
1192
}
1193

1194
static int32_t normalRebalance(SRpcMsg *pMsg) {
37✔
1195
  int     code = 0;
37✔
1196
  int32_t lino = 0;
37✔
1197

1198
  void *  pIter = NULL;
37✔
1199
  SMnode *pMnode = pMsg->info.node;
37✔
1200

1201
  PRINT_LOG_START
37✔
1202
  SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
37✔
1203
  MND_TMQ_NULL_CHECK(rebSubHash);
37✔
1204

1205
  taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
37✔
1206

1207
  MND_TMQ_RETURN_CHECK(mndCheckConsumer(pMsg, rebSubHash));
37✔
1208
  if (taosHashGetSize(rebSubHash) > 0) {
37✔
1209
    mInfo("tmq rebalance mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
×
1210
  }
1211

1212
  while (1) {
×
1213
    pIter = taosHashIterate(rebSubHash, pIter);
37✔
1214
    if (pIter == NULL) {
37✔
1215
      break;
37✔
1216
    }
1217

1218
    SMqRebInputObj  rebInput = {0};
×
1219
    SMqRebOutputObj rebOutput = {0};
×
1220
    MND_TMQ_RETURN_CHECK(initRebOutput(&rebOutput));
×
1221
    rebInput.pRebInfo = (SMqRebInfo *)pIter;
×
1222
    code = buildRebOutput(pMnode, &rebInput, &rebOutput);
×
1223
    if (code != 0) {
×
1224
      mError("mq rebalance buildRebOutput, msg:%s", tstrerror(code))
×
1225
    }
1226

1227
    if (code == 0) {
×
1228
      code = mndDoRebalance(pMnode, &rebInput, &rebOutput);
×
1229
      if (code != 0) {
×
1230
        mError("mq rebalance do rebalance error, msg:%s", tstrerror(code))
×
1231
      }
1232
    }
1233

1234
    if (code == 0) {
×
1235
      code = mndPersistRebResult(pMnode, pMsg, &rebOutput);
×
1236
      if (code != 0) {
×
1237
        mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code))
×
1238
      }
1239
    }
1240

1241
    clearRebOutput(&rebOutput);
×
1242
  }
1243

1244
  if (taosHashGetSize(rebSubHash) > 0) {
37✔
1245
    mInfo("tmq rebalance mq rebalance completed successfully, wait trans finish")
×
1246
  }
1247

1248
END:
37✔
1249
  PRINT_LOG_END
37✔
1250
  taosHashCancelIterate(rebSubHash, pIter);
37✔
1251
  taosHashCleanup(rebSubHash);
37✔
1252
  return code;
37✔
1253
}
1254

1255
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
37✔
1256
  if (pMsg == NULL) {
37✔
1257
    return TSDB_CODE_INVALID_PARA;
×
1258
  }
1259
  int     code = 0;
37✔
1260
  int32_t lino = 0;
37✔
1261

1262
  void *  pIter = NULL;
37✔
1263
  SMnode *pMnode = pMsg->info.node;
37✔
1264
  PRINT_LOG_START;
37✔
1265
  if (!mndRebTryStart()) {
37✔
1266
    mInfo("tmq rebalance mq rebalance already in progress, do nothing");
×
1267
    goto END;
×
1268
  }
1269

1270
  if (taosHashGetSize(topicsToReload) > 0) {
37✔
1271
    code = reloadRebalance(pMsg);
×
1272
  } else {
1273
    code = normalRebalance(pMsg);
37✔
1274
  }
1275

1276
  mndRebCntDec();
37✔
1277

1278
END:
37✔
1279
  PRINT_LOG_END
37✔
1280
  TAOS_RETURN(code);
37✔
1281
}
1282

1283
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) {
×
1284
  if (pMnode == NULL || pSub == NULL || pTrans == NULL) {
×
1285
    return TSDB_CODE_INVALID_PARA;
×
1286
  }
1287
  void *  pIter = NULL;
×
1288
  SVgObj *pVgObj = NULL;
×
1289
  int32_t code = 0;
×
1290
  int32_t lino = 0;
×
1291
  PRINT_LOG_START
×
1292
  while (1) {
×
1293
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgObj);
×
1294
    if (pIter == NULL) {
×
1295
      break;
×
1296
    }
1297
    if (pVgObj->mountVgId) {
×
1298
      sdbRelease(pMnode->pSdb, pVgObj);
×
1299
      continue;
×
1300
    }
1301

1302
    if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
×
1303
      sdbRelease(pMnode->pSdb, pVgObj);
×
1304
      continue;
×
1305
    }
1306
    SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
×
1307
    MND_TMQ_NULL_CHECK(pReq);
×
1308
    pReq->head.vgId = htonl(pVgObj->vgId);
×
1309
    pReq->vgId = pVgObj->vgId;
×
1310
    pReq->consumerId = -1;
×
1311
    (void)memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
×
1312

1313
    STransAction action = {0};
×
1314
    action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
×
1315
    action.pCont = pReq;
×
1316
    action.contLen = sizeof(SMqVDeleteReq);
×
1317
    action.msgType = TDMT_VND_TMQ_DELETE_SUB;
×
1318
    action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
×
1319

1320
    sdbRelease(pMnode->pSdb, pVgObj);
×
1321
    MND_TMQ_RETURN_CHECK(mndTransAppendRedoAction(pTrans, &action));
×
1322
  }
1323

1324
END:
×
1325
  PRINT_LOG_END
×
1326
  sdbRelease(pMnode->pSdb, pVgObj);
×
1327
  sdbCancelFetch(pMnode->pSdb, pIter);
×
1328
  return code;
×
1329
}
1330

1331
static int32_t checkoutOneConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName,
×
1332
                                   char *cgroup) {
1333
  int32_t         code = 0;
×
1334
  int32_t         lino = 0;
×
1335
  SMqConsumerObj *pConsumerNew = NULL;
×
1336

1337
  taosRLockLatch(&pConsumer->lock);
×
1338

1339
  if (strcmp(cgroup, pConsumer->cgroup) != 0) {
×
1340
    goto END;
×
1341
  }
1342

1343
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
×
1344
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
×
1345
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
×
1346
  if (found1 || found2 || found3) {
×
1347
    if (deleteConsumer) {
×
1348
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
×
1349
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
×
1350
      tDeleteSMqConsumerObj(pConsumerNew);
×
1351
      pConsumerNew = NULL;
×
1352
    } else {
1353
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
1354
             pConsumer->consumerId, pConsumer->cgroup);
1355
      code = TSDB_CODE_MND_CGROUP_USED;
×
1356
      goto END;
×
1357
    }
1358
  }
1359

1360
END:
×
1361
  tDeleteSMqConsumerObj(pConsumerNew);
×
1362
  taosRUnLockLatch(&pConsumer->lock);
×
1363
  return code;
×
1364
}
1365

1366
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
×
1367
  if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
×
1368
    return TSDB_CODE_INVALID_PARA;
×
1369
  }
1370
  void *          pIter = NULL;
×
1371
  SMqConsumerObj *pConsumer = NULL;
×
1372
  int             code = 0;
×
1373
  int32_t         lino = 0;
×
1374
  PRINT_LOG_START
×
1375
  while (1) {
1376
    pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
×
1377
    if (pIter == NULL) {
×
1378
      break;
×
1379
    }
1380
    MND_TMQ_RETURN_CHECK(checkoutOneConsumer(pTrans, pConsumer, deleteConsumer, topic, cgroup));
×
1381
    sdbRelease(pMnode->pSdb, pConsumer);
×
1382
  }
1383

1384
END:
×
1385
  sdbRelease(pMnode->pSdb, pConsumer);
×
1386
  sdbCancelFetch(pMnode->pSdb, pIter);
×
1387
  return code;
×
1388
}
1389

1390
static int32_t dropCgroup(SMqSubscribeObj *pSub, SRpcMsg *pMsg, SMDropCgroupReq *dropReq) {
×
1391
  int32_t code = 0;
×
1392
  int32_t lino = 0;
×
1393
  STrans *pTrans = NULL;
×
1394
  SMnode *pMnode = pMsg->info.node;
×
1395
  PRINT_LOG_START
×
1396
  taosRLockLatch(&pSub->lock);
×
1397
  if (!dropReq->force && taosHashGetSize(pSub->consumerHash) != 0) {
×
1398
    code = TSDB_CODE_MND_CGROUP_USED;
×
1399
    mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq->cgroup, dropReq->topic, tstrerror(code));
×
1400
    goto END;
×
1401
  }
1402

1403
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pMsg, "drop-cgroup");
×
1404
  MND_TMQ_NULL_CHECK(pTrans);
×
1405
  mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq->cgroup, dropReq->topic);
×
1406
  mndTransSetDbName(pTrans, pSub->dbName, NULL);
×
1407
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
1408
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
1409
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq->cgroup, dropReq->topic, dropReq->force));
×
1410
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
1411
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
1412

1413
END:
×
1414
  taosRUnLockLatch(&pSub->lock);
×
1415
  mndTransDrop(pTrans);
×
1416
  PRINT_LOG_END
×
1417
  return code;
×
1418
}
1419

1420
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
×
1421
  if (pMsg == NULL) {
×
1422
    return TSDB_CODE_INVALID_PARA;
×
1423
  }
1424
  SMnode *        pMnode = pMsg->info.node;
×
1425
  SMDropCgroupReq dropReq = {0};
×
1426
  int32_t         code = 0;
×
1427
  int32_t         lino = 0;
×
1428

1429
  SMqSubscribeObj *pSub = NULL;
×
1430

1431
  PRINT_LOG_START
×
1432
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq));
×
1433
  char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
1434
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", dropReq.cgroup, TMQ_SEPARATOR, dropReq.topic);
×
1435
  code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
1436
  if (code != 0) {
×
1437
    if (dropReq.igNotExists) {
×
1438
      mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
×
1439
      mndReleaseSubscribe(pMnode, pSub);
×
1440
      return 0;
×
1441
    } else {
1442
      code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1443
      goto END;
×
1444
    }
1445
  }
1446
  MND_TMQ_RETURN_CHECK(dropCgroup(pSub, pMsg, &dropReq));
×
1447

1448
END:
×
1449
  mndReleaseSubscribe(pMnode, pSub);
×
1450
  PRINT_LOG_END
×
1451

1452
  if (code != 0) {
×
1453
    TAOS_RETURN(code);
×
1454
  }
1455
  TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS);
×
1456
}
1457

1458
void mndCleanupSubscribe(SMnode *pMnode) {}
16✔
1459

1460
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
×
1461
  if (pSub == NULL) {
×
1462
    return NULL;
×
1463
  }
1464
  int32_t code = 0;
×
1465
  int32_t lino = 0;
×
1466
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1467
  void *  buf = NULL;
×
1468
  int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
×
1469
  if (tlen <= 0) goto SUB_ENCODE_OVER;
×
1470
  int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
×
1471

1472
  SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
×
1473
  if (pRaw == NULL) goto SUB_ENCODE_OVER;
×
1474

1475
  buf = taosMemoryMalloc(tlen);
×
1476
  if (buf == NULL) goto SUB_ENCODE_OVER;
×
1477

1478
  void *abuf = buf;
×
1479
  if (tEncodeSubscribeObj(&abuf, pSub) < 0) {
×
1480
    goto SUB_ENCODE_OVER;
×
1481
  }
1482

1483
  int32_t dataPos = 0;
×
1484
  SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
×
1485
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
×
1486
  SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
×
1487
  SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
×
1488

1489
  terrno = TSDB_CODE_SUCCESS;
×
1490

1491
SUB_ENCODE_OVER:
×
1492
  taosMemoryFreeClear(buf);
×
1493
  if (terrno != TSDB_CODE_SUCCESS) {
×
1494
    mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
×
1495
    sdbFreeRaw(pRaw);
×
1496
    return NULL;
×
1497
  }
1498

1499
  mDebug("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
×
1500
  return pRaw;
×
1501
}
1502

1503
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
×
1504
  if (pRaw == NULL) {
×
1505
    return NULL;
×
1506
  }
1507
  int32_t code = 0;
×
1508
  int32_t lino = 0;
×
1509
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1510
  SSdbRow *        pRow = NULL;
×
1511
  SMqSubscribeObj *pSub = NULL;
×
1512
  void *           buf = NULL;
×
1513

1514
  int8_t sver = 0;
×
1515
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
×
1516

1517
  if (sver > MND_SUBSCRIBE_VER_NUMBER || sver < 1) {
×
1518
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1519
    goto SUB_DECODE_OVER;
×
1520
  }
1521

1522
  pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
×
1523
  if (pRow == NULL) goto SUB_DECODE_OVER;
×
1524

1525
  pSub = sdbGetRowObj(pRow);
×
1526
  if (pSub == NULL) goto SUB_DECODE_OVER;
×
1527

1528
  int32_t dataPos = 0;
×
1529
  int32_t tlen;
1530
  SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
×
1531
  buf = taosMemoryMalloc(tlen);
×
1532
  if (buf == NULL) goto SUB_DECODE_OVER;
×
1533
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
×
1534
  SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
×
1535

1536
  if (tDecodeSubscribeObj(buf, pSub, sver) == NULL) {
×
1537
    goto SUB_DECODE_OVER;
×
1538
  }
1539

1540
  // update epset saved in mnode
1541
  if (pSub->unassignedVgs != NULL) {
×
1542
    int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
×
1543
    for (int32_t i = 0; i < size; ++i) {
×
1544
      SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pSub->unassignedVgs, i);
×
1545
      tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
×
1546
    }
1547
  }
1548
  if (pSub->consumerHash != NULL) {
×
1549
    void *pIter = taosHashIterate(pSub->consumerHash, NULL);
×
1550
    while (pIter) {
×
1551
      SMqConsumerEp *pConsumerEp = pIter;
×
1552
      int32_t        size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
×
1553
      for (int32_t i = 0; i < size; ++i) {
×
1554
        SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGet(pConsumerEp->vgs, i);
×
1555
        tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
×
1556
      }
1557
      pIter = taosHashIterate(pSub->consumerHash, pIter);
×
1558
    }
1559
  }
1560

1561
  terrno = TSDB_CODE_SUCCESS;
×
1562

1563
SUB_DECODE_OVER:
×
1564
  taosMemoryFreeClear(buf);
×
1565
  if (terrno != TSDB_CODE_SUCCESS) {
×
1566
    mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
×
1567
    taosMemoryFreeClear(pRow);
×
1568
    return NULL;
×
1569
  }
1570

1571
  mDebug("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
×
1572
  return pRow;
×
1573
}
1574

1575
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
×
1576
  mDebug("subscribe:%s, perform insert action", pSub != NULL ? pSub->key : "null");
×
1577
  return 0;
×
1578
}
1579

1580
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
×
1581
  mDebug("subscribe:%s, perform delete action", pSub != NULL ? pSub->key : "null");
×
1582
  tDeleteSubscribeObj(pSub);
×
1583
  return 0;
×
1584
}
1585

1586
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
×
1587
  if (pOldSub == NULL || pNewSub == NULL) return TSDB_CODE_INVALID_PARA;
×
1588
  mDebug("subscribe:%s, perform update action", pOldSub->key);
×
1589

1590
  taosWLockLatch(&pOldSub->lock);
×
1591
  TSWAP(pOldSub->consumerHash, pNewSub->consumerHash);
×
1592
  TSWAP(pOldSub->unassignedVgs, pNewSub->unassignedVgs);
×
1593
  TSWAP(pOldSub->offsetRows, pNewSub->offsetRows);
×
1594
  taosWUnLockLatch(&pOldSub->lock);
×
1595

1596
  return 0;
×
1597
}
1598

1599
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj **pSub) {
×
1600
  if (pMnode == NULL || key == NULL || pSub == NULL) {
×
1601
    return TSDB_CODE_INVALID_PARA;
×
1602
  }
1603
  SSdb *pSdb = pMnode->pSdb;
×
1604
  *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
×
1605
  if (*pSub == NULL) {
×
1606
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
1607
  }
1608
  return 0;
×
1609
}
1610

1611
int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
×
1612
  if (pMnode == NULL || topicName == NULL) return 0;
×
1613
  int32_t num = 0;
×
1614
  SSdb *  pSdb = pMnode->pSdb;
×
1615

1616
  void *           pIter = NULL;
×
1617
  SMqSubscribeObj *pSub = NULL;
×
1618
  while (1) {
×
1619
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1620
    if (pIter == NULL) break;
×
1621

1622
    char topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1623
    char cgroup[TSDB_CGROUP_LEN] = {0};
×
1624
    taosRLockLatch(&pSub->lock);
×
1625
    mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1626
    taosRUnLockLatch(&pSub->lock);
×
1627
    if (strcmp(topic, topicName) != 0) {
×
1628
      sdbRelease(pSdb, pSub);
×
1629
      continue;
×
1630
    }
1631

1632
    num++;
×
1633
    sdbRelease(pSdb, pSub);
×
1634
  }
1635

1636
  return num;
×
1637
}
1638

1639
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
×
1640
  if (pMnode == NULL || pSub == NULL) return;
×
1641
  SSdb *pSdb = pMnode->pSdb;
×
1642
  sdbRelease(pSdb, pSub);
×
1643
}
1644

1645
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
×
1646
  if (pMnode == NULL || pTrans == NULL || pSub == NULL) return TSDB_CODE_INVALID_PARA;
×
1647
  int32_t code = 0;
×
1648
  int32_t lino = 0;
×
1649
  PRINT_LOG_START
×
1650
  SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
×
1651
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
1652
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
1653
  if (code != 0) {
×
1654
    sdbFreeRaw(pCommitRaw);
×
1655
    goto END;
×
1656
  }
1657
  code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
×
1658
END:
×
1659
  PRINT_LOG_END
×
1660
  return code;
×
1661
}
1662

1663
static int32_t dropOneSub(SMqSubscribeObj *pSub, SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
×
1664
  int32_t code = 0;
×
1665
  int32_t lino = 0;
×
1666
  char    topic[TSDB_TOPIC_FNAME_LEN] = {0};
×
1667
  char    cgroup[TSDB_CGROUP_LEN] = {0};
×
1668
  taosRLockLatch(&pSub->lock);
×
1669
  mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
×
1670
  if (strcmp(topic, topicName) != 0) {
×
1671
    goto END;
×
1672
  }
1673

1674
  // iter all vnode to delete handle
1675
  if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
×
1676
    code = TSDB_CODE_MND_IN_REBALANCE;
×
1677
    goto END;
×
1678
  }
1679

1680
  MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
×
1681
  MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
×
1682

1683
END:
×
1684
  taosRUnLockLatch(&pSub->lock);
×
1685

1686
  return code;
×
1687
}
1688

1689
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
×
1690
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
×
1691
  SSdb *           pSdb = pMnode->pSdb;
×
1692
  int32_t          code = 0;
×
1693
  int32_t          lino = 0;
×
1694
  void *           pIter = NULL;
×
1695
  SMqSubscribeObj *pSub = NULL;
×
1696
  PRINT_LOG_START
×
1697
  while (1) {
1698
    pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
×
1699
    if (pIter == NULL) break;
×
1700

1701
    MND_TMQ_RETURN_CHECK(dropOneSub(pSub, pMnode, pTrans, topicName, force));
×
1702
    sdbRelease(pSdb, pSub);
×
1703
  }
1704

1705
END:
×
1706
  PRINT_LOG_END
×
1707
  sdbRelease(pSdb, pSub);
×
1708
  sdbCancelFetch(pSdb, pIter);
×
1709

1710
  TAOS_RETURN(code);
×
1711
}
1712

1713
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *user,
×
1714
                           const char *fqdn, const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
1715
  if (pBlock == NULL || numOfRows == NULL || topic == NULL || cgroup == NULL) {
×
1716
    return TSDB_CODE_INVALID_PARA;
×
1717
  }
1718
  int32_t code = 0;
×
1719
  int32_t lino = 0;
×
1720
  PRINT_LOG_START
×
1721
  int32_t sz = taosArrayGetSize(vgs);
×
1722
  for (int32_t j = 0; j < sz; j++) {
×
1723
    SMqVgEp *pVgEp = taosArrayGet(vgs, j);
×
1724
    MND_TMQ_NULL_CHECK(pVgEp);
×
1725

1726
    SColumnInfoData *pColInfo = NULL;
×
1727
    int32_t          cols = 0;
×
1728

1729
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1730
    MND_TMQ_NULL_CHECK(pColInfo);
×
1731
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false));
×
1732

1733
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1734
    MND_TMQ_NULL_CHECK(pColInfo);
×
1735
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false));
×
1736

1737
    // vg id
1738
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1739
    MND_TMQ_NULL_CHECK(pColInfo);
×
1740
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
×
1741

1742
    // consumer id
1743
    char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
×
1744
    (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN - VARSTR_HEADER_SIZE, "0x%" PRIx64, consumerId);
×
1745
    varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
×
1746

1747
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1748
    MND_TMQ_NULL_CHECK(pColInfo);
×
1749
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
×
1750

1751
    char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
×
1752
    if (user) STR_TO_VARSTR(userStr, user);
×
1753
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1754
    MND_TMQ_NULL_CHECK(pColInfo);
×
1755
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
×
1756

1757
    char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
×
1758
    if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
×
1759
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1760
    MND_TMQ_NULL_CHECK(pColInfo);
×
1761
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
×
1762

1763
    mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
×
1764
          varDataVal(cgroup), pVgEp->vgId);
1765

1766
    // offset
1767
    OffsetRows *data = NULL;
×
1768
    for (int i = 0; i < taosArrayGetSize(offsetRows); i++) {
×
1769
      OffsetRows *tmp = taosArrayGet(offsetRows, i);
×
1770
      MND_TMQ_NULL_CHECK(tmp);
×
1771
      if (tmp->vgId != pVgEp->vgId) {
×
1772
        // mInfo("mnd show subscriptions: do not find vgId:%d, %d in offsetRows", tmp->vgId, pVgEp->vgId);
1773
        continue;
×
1774
      }
1775
      data = tmp;
×
1776
    }
1777
    if (data) {
×
1778
      // vg id
1779
      char buf[TSDB_OFFSET_LEN * 2 + VARSTR_HEADER_SIZE] = {0};
×
1780
      (void)tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
×
1781
      (void)snprintf(varDataVal(buf) + strlen(varDataVal(buf)),
×
1782
                     sizeof(buf) - VARSTR_HEADER_SIZE - strlen(varDataVal(buf)), "/%" PRId64, data->ever);
×
1783
      varDataSetLen(buf, strlen(varDataVal(buf)));
×
1784
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1785
      MND_TMQ_NULL_CHECK(pColInfo);
×
1786
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false));
×
1787
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1788
      MND_TMQ_NULL_CHECK(pColInfo);
×
1789
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false));
×
1790
    } else {
1791
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1792
      MND_TMQ_NULL_CHECK(pColInfo);
×
1793
      colDataSetNULL(pColInfo, *numOfRows);
×
1794
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1795
      MND_TMQ_NULL_CHECK(pColInfo);
×
1796
      colDataSetNULL(pColInfo, *numOfRows);
×
1797
      mInfo("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
×
1798
    }
1799
    (*numOfRows)++;
×
1800
  }
1801

1802
END:
×
1803
  PRINT_LOG_END
×
1804
  return code;
×
1805
}
1806

1807
static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOperUser, bool showAll, SSDataBlock *pBlock,
×
1808
                           int32_t *numOfRows, int32_t rowsCapacity) {
1809
  int32_t        code = 0;
×
1810
  int32_t        lino = 0;
×
1811
  SMnode        *pMnode = pReq->info.node;
×
1812
  SSdb          *pSdb = pMnode->pSdb;
×
1813
  SMqConsumerEp *pConsumerEp = NULL;
×
1814
  SMqTopicObj   *pTopic = NULL;
×
1815
  void          *pIter = NULL;
×
1816
  bool           showTopic = false;
×
1817
  PRINT_LOG_START
×
1818

1819
  taosRLockLatch(&pSub->lock);
×
1820
  if (*numOfRows + pSub->vgNum > rowsCapacity) {
×
1821
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + pSub->vgNum));
×
1822
  }
1823

1824
  // topic and cgroup
1825
  char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1826
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
×
1827
  mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
×
1828
  varDataSetLen(topic, strlen(varDataVal(topic)));
×
1829
  varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
×
1830

1831
  if (!showAll) {
×
1832
    (void)mndAcquireTopic(pMnode, topic, &pTopic);
×
1833
    if (pTopic) {
×
1834
      SName name = {0};  // 1.topic1
×
1835
      if (0 == tNameFromString(&name, pTopic->name, T_NAME_ACCT | T_NAME_DB)) {
×
1836
        if (0 == mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_SUBSCRIPTION_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId,
×
1837
                                          pTopic->db, name.dbname)) {
×
1838
          showTopic = true;
×
1839
        }
1840
      }
1841
    }
1842
  }
1843

1844
  while (1) {
×
1845
    pIter = taosHashIterate(pSub->consumerHash, pIter);
×
1846
    if (pIter == NULL) break;
×
1847
    pConsumerEp = (SMqConsumerEp *)pIter;
×
1848

1849
    char           *user = NULL;
×
1850
    char           *fqdn = NULL;
×
1851
    bool            subscribeOwner = false;
×
1852
    SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
×
1853
    if (pConsumer != NULL) {
×
1854
      user = pConsumer->user;
×
1855
      fqdn = pConsumer->fqdn;
×
1856
      if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
×
1857
        subscribeOwner = true;
×
1858
      }
1859
      sdbRelease(pSdb, pConsumer);
×
1860
    }
1861
    if (!showAll && !showTopic && !subscribeOwner) {
×
1862
      continue;
×
1863
    }
1864
    MND_TMQ_RETURN_CHECK(buildResult(pBlock, numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup,
×
1865
                                     pConsumerEp->vgs, pConsumerEp->offsetRows));
1866
  }
1867

1868
  MND_TMQ_RETURN_CHECK(
×
1869
      buildResult(pBlock, numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
1870

1871
  pBlock->info.rows = *numOfRows;
×
1872

1873
END:
×
1874
  mndReleaseTopic(pMnode, pTopic);
×
1875
  taosRUnLockLatch(&pSub->lock);
×
1876
  taosHashCancelIterate(pSub->consumerHash, pIter);
×
1877
  PRINT_LOG_END
×
1878
  return code;
×
1879
}
1880

1881
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
1882
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
×
1883
    return TSDB_CODE_INVALID_PARA;
×
1884
  }
1885
  SMnode          *pMnode = pReq->info.node;
×
1886
  SSdb            *pSdb = pMnode->pSdb;
×
1887
  int32_t          numOfRows = 0;
×
1888
  SMqSubscribeObj *pSub = NULL;
×
1889
  SUserObj        *pOperUser = NULL;
×
1890
  int32_t          code = 0;
×
1891
  int32_t          lino = 0;
×
1892
  bool             showAll = false;
×
1893
  char             objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
1894

1895
  mInfo("mnd show subscriptions begin");
×
1896
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
×
1897
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
×
1898
  showAll = (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_SUBSCRIPTION_SHOW,
×
1899
                                          PRIV_OBJ_TOPIC, 0, objFName, "*"));
1900

1901
  while (numOfRows < rowsCapacity) {
×
1902
    pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
×
1903
    if (pShow->pIter == NULL) {
×
1904
      break;
×
1905
    }
1906

1907
    MND_TMQ_RETURN_CHECK(retrieveSub(pReq, pSub, pOperUser, showAll, pBlock, &numOfRows, rowsCapacity));
×
1908

1909
    sdbRelease(pSdb, pSub);
×
1910
    pSub = NULL;
×
1911
  }
1912
  mInfo("mnd end show subscriptions");
×
1913
  pShow->numOfRows += numOfRows;
×
1914

1915
END:
×
1916
  sdbCancelFetch(pSdb, pShow->pIter);
×
1917
  sdbRelease(pSdb, pSub);
×
1918
  mndReleaseUser(pMnode, pOperUser);
×
1919

1920
  if (code != 0) {
×
1921
    mError("mnd show subscriptions failed, msg:%s", tstrerror(code));
×
1922
    TAOS_RETURN(code);
×
1923
  } else {
1924
    mDebug("mnd show subscriptions success, rows:%d", numOfRows);
×
1925
    return numOfRows;
×
1926
  }
1927
}
1928

1929
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
×
1930
  if (pMnode == NULL) {
×
1931
    return;
×
1932
  }
1933
  SSdb *pSdb = pMnode->pSdb;
×
1934
  sdbCancelFetchByType(pSdb, pIter, SDB_SUBSCRIBE);
×
1935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc