• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

taosdata / TDengine / #3951

28 Apr 2025 05:42AM UTC coverage: 62.445% (-0.4%) from 62.853%
#3951

push

travis-ci

web-flow
fix: mnode-status-case (#30871)

155256 of 317429 branches covered (48.91%)

Branch coverage included in aggregate %.

240626 of 316539 relevant lines covered (76.02%)

6221630.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.01
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
1,982✔
45
  SSdbTable table = {
1,982✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
1,982!
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
1,982✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
1,982✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
1,982✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
1,982✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
1,982✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
1,982✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
1,982✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
1,981✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
387✔
72
  if (pMnode == NULL || info == NULL) {
387!
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
387✔
76
  void   *msg  = rpcMallocCont(sizeof(int64_t));
387✔
77
  MND_TMQ_NULL_CHECK(msg);
387!
78

79
  *(int64_t*)msg = consumerId;
387✔
80
  SRpcMsg rpcMsg = {
387✔
81
      .msgType = msgType,
82
      .pCont = msg,
83
      .contLen = sizeof(int64_t),
84
      .info = *info,
85
  };
86

87
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
387!
88
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
387!
89
  return code;
387✔
90

91
END:
×
92
  taosMemoryFree(msg);
×
93
  return code;
×
94
}
95

96
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
971✔
97
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
971!
98
    return TSDB_CODE_INVALID_PARA;
×
99
  }
100
  SMqTopicObj *pTopic = NULL;
971✔
101
  int32_t      code = 0;
971✔
102

103
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
971✔
104
  for (int32_t i = 0; i < numOfTopics; i++) {
1,584✔
105
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
616✔
106
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
619!
107
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
616!
108
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
616!
109

110
    if (subscribe->enableReplay) {
616✔
111
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
7✔
112
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
2✔
113
        goto END;
2✔
114
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
5!
115
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
4✔
116
        if (pDb == NULL) {
4!
117
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
          goto END;
×
119
        }
120
        if (pDb->cfg.numOfVgroups != 1) {
4✔
121
          mndReleaseDb(pMnode, pDb);
1✔
122
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
1✔
123
          goto END;
1✔
124
        }
125
        mndReleaseDb(pMnode, pDb);
3✔
126
      }
127
    }
128
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
613✔
129
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
613✔
130
    mndTransSetDbName(pTrans, pTopic->db, key);
613✔
131
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
613!
132
    mndReleaseTopic(pMnode, pTopic);
613✔
133
  }
134
  return 0;
968✔
135

136
END:
3✔
137
  mndReleaseTopic(pMnode, pTopic);
3✔
138
  return code;
3✔
139
}
140

141
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
387✔
142
  if (pMsg == NULL || pMsg->pCont == NULL) {
387!
143
    return TSDB_CODE_INVALID_PARA;
×
144
  }
145
  int32_t              code = 0;
387✔
146
  SMnode              *pMnode = pMsg->info.node;
387✔
147
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
387✔
148
  SMqConsumerObj      *pConsumerNew = NULL;
387✔
149
  STrans              *pTrans = NULL;
387✔
150
  SMqConsumerObj      *pConsumer = NULL;
387✔
151

152
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
387!
153
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
387!
154
        mndConsumerStatusName(pConsumer->status));
155

156
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
387!
157
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
387✔
158
  MND_TMQ_NULL_CHECK(pTrans);
387!
159
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
387!
160
  code = mndTransPrepare(pMnode, pTrans);
387✔
161

162
END:
387✔
163
  mndReleaseConsumer(pMnode, pConsumer);
387✔
164
  tDeleteSMqConsumerObj(pConsumerNew);
387✔
165
  mndTransDrop(pTrans);
387✔
166
  return code;
387✔
167
}
168

169
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
2,866✔
170
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
2,866!
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  int32_t code = 0;
2,866✔
174
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
2,866✔
175
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
2,866!
176
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
5,557✔
177
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,691✔
178
    SMqTopicObj *pTopic = NULL;
2,691✔
179
    code = mndAcquireTopic(pMnode, topic, &pTopic);
2,691✔
180
    if (code != TDB_CODE_SUCCESS) {
2,691!
181
      continue;
×
182
    }
183
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
2,691✔
184
    MND_TMQ_NULL_CHECK(data);
2,691!
185
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
2,691✔
186
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
5,382!
187
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
2,691✔
188
      data->noPrivilege = 1;
×
189
    } else {
190
      data->noPrivilege = 0;
2,691✔
191
    }
192
    mndReleaseTopic(pMnode, pTopic);
2,691✔
193
  }
194
END:
2,866✔
195
  return code;
2,866✔
196
}
197

198
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
2,866✔
199
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
2,866!
200
    return;
×
201
  }
202
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
5,547✔
203
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
2,681✔
204
    if (data == NULL){
2,681!
205
      continue;
×
206
    }
207
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
2,681!
208

209
    SMqSubscribeObj *pSub = NULL;
2,681✔
210
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,681✔
211
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
2,681✔
212
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,681✔
213
    if (code != 0) {
2,681!
214
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
215
      continue;
×
216
    }
217
    taosWLockLatch(&pSub->lock);
2,681✔
218
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,681✔
219
    if (pConsumerEp) {
2,681✔
220
      taosArrayDestroy(pConsumerEp->offsetRows);
2,634✔
221
      pConsumerEp->offsetRows = data->offsetRows;
2,634✔
222
      data->offsetRows = NULL;
2,634✔
223
    }
224
    taosWUnLockLatch(&pSub->lock);
2,681✔
225

226
    mndReleaseSubscribe(pMnode, pSub);
2,681✔
227
  }
228
}
229

230
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
2,866✔
231
  if (pMsg == NULL || rsp == NULL){
2,866!
232
    return TSDB_CODE_INVALID_PARA;
×
233
  }
234
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
2,866✔
235
  if (tlen <= 0){
2,866!
236
    return TSDB_CODE_TMQ_INVALID_MSG;
×
237
  }
238
  void   *buf = rpcMallocCont(tlen);
2,866✔
239
  if (buf == NULL) {
2,866!
240
    return terrno;
×
241
  }
242

243
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
2,866!
244
    rpcFreeCont(buf);
×
245
    return TSDB_CODE_TMQ_INVALID_MSG;
×
246
  }
247
  pMsg->info.rsp = buf;
2,866✔
248
  pMsg->info.rspLen = tlen;
2,866✔
249
  return 0;
2,866✔
250
}
251

252
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
2,880✔
253
  if (pMsg == NULL) {
2,880!
254
    return TSDB_CODE_INVALID_PARA;
×
255
  }
256
  PRINT_LOG_START
2,880!
257
  int32_t         code = 0;
2,880✔
258
  SMnode         *pMnode = pMsg->info.node;
2,880✔
259
  SMqHbReq        req = {0};
2,880✔
260
  SMqHbRsp        rsp = {0};
2,880✔
261
  SMqConsumerObj *pConsumer = NULL;
2,880✔
262
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
2,880!
263
  int64_t consumerId = req.consumerId;
2,880✔
264
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
2,880✔
265
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
2,866!
266
  atomic_store_32(&pConsumer->hbStatus, 0);
2,866✔
267
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
2,866!
268
  if (req.pollFlag == 1){
2,866✔
269
    atomic_store_32(&pConsumer->pollStatus, 0);
893✔
270
  }
271

272
  storeOffsetRows(pMnode, &req, pConsumer);
2,866✔
273
  rsp.debugFlag = tqClientDebugFlag;
2,866✔
274
  code = buildMqHbRsp(pMsg, &rsp);
2,866✔
275

276
END:
2,880✔
277
  tDestroySMqHbRsp(&rsp);
2,880✔
278
  mndReleaseConsumer(pMnode, pConsumer);
2,880✔
279
  tDestroySMqHbReq(&req);
2,880✔
280
  PRINT_LOG_END(code)
2,880!
281
  return code;
2,880✔
282
}
283

284
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
627,650✔
285
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
627,650!
286
    return TSDB_CODE_INVALID_PARA;
×
287
  }
288
  taosRLockLatch(&pConsumer->lock);
627,650✔
289

290
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
627,650✔
291

292
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
627,650✔
293
  if (rsp->topics == NULL) {
627,650!
294
    taosRUnLockLatch(&pConsumer->lock);
×
295
    return terrno;
×
296
  }
297

298
  // handle all topics subscribed by this consumer
299
  for (int32_t i = 0; i < numOfTopics; i++) {
1,254,972✔
300
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
627,322✔
301
    SMqSubscribeObj *pSub = NULL;
627,322✔
302
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
627,322✔
303
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
627,322✔
304
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
627,322✔
305
    if (code != 0) {
627,322!
306
      continue;
×
307
    }
308
    taosRLockLatch(&pSub->lock);
627,322✔
309

310
    SMqSubTopicEp topicEp = {0};
627,322✔
311
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
627,322✔
312

313
    // 2.1 fetch topic schema
314
    SMqTopicObj *pTopic = NULL;
627,322✔
315
    code = mndAcquireTopic(pMnode, topic, &pTopic);
627,322✔
316
    if (code != TDB_CODE_SUCCESS) {
627,322!
317
      taosRUnLockLatch(&pSub->lock);
×
318
      mndReleaseSubscribe(pMnode, pSub);
×
319
      continue;
×
320
    }
321
    taosRLockLatch(&pTopic->lock);
627,322✔
322
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
627,322✔
323
    topicEp.schema.nCols = pTopic->schema.nCols;
627,322✔
324
    if (topicEp.schema.nCols) {
627,322✔
325
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
372,310!
326
      if (topicEp.schema.pSchema == NULL) {
372,310!
327
        taosRUnLockLatch(&pTopic->lock);
×
328
        taosRUnLockLatch(&pSub->lock);
×
329
        mndReleaseSubscribe(pMnode, pSub);
×
330
        mndReleaseTopic(pMnode, pTopic);
×
331
        return terrno;
×
332
      }
333
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
372,310✔
334
    }
335
    taosRUnLockLatch(&pTopic->lock);
627,322✔
336
    mndReleaseTopic(pMnode, pTopic);
627,322✔
337

338
    // 2.2 iterate all vg assigned to the consumer of that topic
339
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
627,322✔
340
    if (pConsumerEp == NULL) {
627,322!
341
      taosRUnLockLatch(&pConsumer->lock);
×
342
      taosRUnLockLatch(&pSub->lock);
×
343
      mndReleaseSubscribe(pMnode, pSub);
×
344
      return TSDB_CODE_OUT_OF_MEMORY;
×
345
    }
346
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
627,322✔
347
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
627,322✔
348
    if (topicEp.vgs == NULL) {
627,322!
349
      taosRUnLockLatch(&pConsumer->lock);
×
350
      taosRUnLockLatch(&pSub->lock);
×
351
      mndReleaseSubscribe(pMnode, pSub);
×
352
      return terrno;
×
353
    }
354

355
    for (int32_t j = 0; j < vgNum; j++) {
1,255,520✔
356
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
628,198✔
357
      if (pVgEp == NULL) {
628,198!
358
        continue;
×
359
      }
360
      if (epoch == -1) {
628,198✔
361
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
626,657✔
362
        if (pVgroup) {
626,657✔
363
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
16,548✔
364
          mndReleaseVgroup(pMnode, pVgroup);
16,548✔
365
        }
366
      }
367
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
628,198✔
368
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
1,256,396!
369
        taosArrayDestroy(topicEp.vgs);
×
370
        taosRUnLockLatch(&pConsumer->lock);
×
371
        taosRUnLockLatch(&pSub->lock);
×
372
        mndReleaseSubscribe(pMnode, pSub);
×
373
        return terrno;
×
374
      }
375
    }
376
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
1,254,644!
377
      taosArrayDestroy(topicEp.vgs);
×
378
      taosRUnLockLatch(&pConsumer->lock);
×
379
      taosRUnLockLatch(&pSub->lock);
×
380
      mndReleaseSubscribe(pMnode, pSub);
×
381
      return terrno;
×
382
    }
383
    taosRUnLockLatch(&pSub->lock);
627,322✔
384
    mndReleaseSubscribe(pMnode, pSub);
627,322✔
385
  }
386
  taosRUnLockLatch(&pConsumer->lock);
627,650✔
387
  return 0;
627,650✔
388
}
389

390
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
632,894✔
391
  if (pMsg == NULL || rsp == NULL) {
632,894!
392
    return TSDB_CODE_INVALID_PARA;
×
393
  }
394
  int32_t code = 0;
632,894✔
395
  // encode rsp
396
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
632,894✔
397
  void   *buf = rpcMallocCont(tlen);
632,894✔
398
  if (buf == NULL) {
632,894!
399
    return terrno;
×
400
  }
401

402
  SMqRspHead *pHead = buf;
632,894✔
403

404
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
632,894✔
405
  pHead->epoch = serverEpoch;
632,894✔
406
  pHead->consumerId = consumerId;
632,894✔
407
  pHead->walsver = 0;
632,894✔
408
  pHead->walever = 0;
632,894✔
409

410
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
632,894✔
411
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
632,894!
412
    rpcFreeCont(buf);
×
413
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
414
  }
415

416
  // send rsp
417
  pMsg->info.rsp = buf;
632,894✔
418
  pMsg->info.rspLen = tlen;
632,894✔
419
  return code;
632,894✔
420
}
421

422
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
636,165✔
423
  if (pMsg == NULL) {
636,165!
424
    return TSDB_CODE_INVALID_PARA;
×
425
  }
426
  SMnode     *pMnode = pMsg->info.node;
636,165✔
427
  SMqAskEpReq req = {0};
636,165✔
428
  SMqAskEpRsp rsp = {0};
636,165✔
429
  int32_t     code = 0;
636,165✔
430
  SMqConsumerObj *pConsumer = NULL;
636,165✔
431
  PRINT_LOG_START
636,165!
432

433
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
636,165!
434
  int64_t consumerId = req.consumerId;
636,165✔
435
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
636,165✔
436
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
636,137!
437
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
438
           pConsumer->cgroup);
439
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
440
    goto END;
×
441
  }
442

443
  // 1. check consumer status
444
  int32_t status = atomic_load_32(&pConsumer->status);
636,137✔
445
  if (status != MQ_CONSUMER_STATUS_READY) {
636,138✔
446
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
3,244!
447
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
3,244✔
448
    goto END;
3,244✔
449
  }
450

451
  int32_t epoch = req.epoch;
632,894✔
452
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
632,894✔
453

454
  // 2. check epoch, only send ep info when epochs do not match
455
  if (epoch != serverEpoch) {
632,894✔
456
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
627,650!
457
          consumerId, epoch, serverEpoch);
458
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
627,650!
459
  }
460

461
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
632,894✔
462

463
END:
636,165✔
464
  tDeleteSMqAskEpRsp(&rsp);
465
  mndReleaseConsumer(pMnode, pConsumer);
636,165✔
466
  PRINT_LOG_END(code);
636,165!
467
  return code;
636,165✔
468
}
469

470
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
389✔
471
  if (pConsumer == NULL || pTrans == NULL) {
389!
472
    return TSDB_CODE_INVALID_PARA;
×
473
  }
474
  int32_t  code = 0;
389✔
475
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
389✔
476
  MND_TMQ_NULL_CHECK(pCommitRaw);
389!
477
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
389✔
478
  if (code != 0) {
389!
479
    sdbFreeRaw(pCommitRaw);
×
480
    goto END;
×
481
  }
482
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
389!
483
END:
389✔
484
  return code;
389✔
485
}
486

487
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
2,085✔
488
  if (pConsumer == NULL || pTrans == NULL) {
2,085!
489
    return TSDB_CODE_INVALID_PARA;
×
490
  }
491
  int32_t  code = 0;
2,085✔
492
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
2,085✔
493
  MND_TMQ_NULL_CHECK(pCommitRaw);
2,085!
494
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
2,085✔
495
  if (code != 0) {
2,085!
496
    sdbFreeRaw(pCommitRaw);
×
497
    goto END;
×
498
  }
499
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
2,085!
500
END:
2,085✔
501
  return code;
2,085✔
502
}
503

504
static void freeItem(void *param) {
1✔
505
  if (param == NULL) {
1!
506
    return;
×
507
  }
508
  void *pItem = *(void **)param;
1✔
509
  if (pItem != NULL) {
1!
510
    taosMemoryFree(pItem);
1!
511
  }
512
}
513

514
#define ADD_TOPIC_TO_ARRAY(element, array) \
515
char *newTopicCopy = taosStrdup(element); \
516
MND_TMQ_NULL_CHECK(newTopicCopy);\
517
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
518
  taosMemoryFree(newTopicCopy);\
519
  code = terrno;\
520
  goto END;\
521
}
522

523
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
444✔
524
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
444!
525
    return TSDB_CODE_INVALID_PARA;
×
526
  }
527
  int32_t code = 0;
444✔
528
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
444✔
529
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
444!
530
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
444✔
531
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
444!
532

533
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
444✔
534
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
444✔
535
  int32_t i = 0, j = 0;
444✔
536
  while (i < oldTopicNum || j < newTopicNum) {
948✔
537
    if (i >= oldTopicNum) {
504✔
538
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
5✔
539
      MND_TMQ_NULL_CHECK(tmp);
5!
540
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
10!
541
      j++;
5✔
542
      continue;
5✔
543
    } else if (j >= newTopicNum) {
499✔
544
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
477✔
545
      MND_TMQ_NULL_CHECK(tmp);
477!
546
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
954!
547
      i++;
477✔
548
      continue;
477✔
549
    } else {
550
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
22✔
551
      MND_TMQ_NULL_CHECK(oldTopic);
22!
552
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
22✔
553
      MND_TMQ_NULL_CHECK(newTopic);
22!
554
      int   comp = strcmp(oldTopic, newTopic);
22✔
555
      if (comp == 0) {
22!
556
        i++;
22✔
557
        j++;
22✔
558
        continue;
22✔
559
      } else if (comp < 0) {
×
560
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
561
        i++;
×
562
        continue;
×
563
      } else {
×
564
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
565
        j++;
×
566
        continue;
×
567
      }
568
    }
569
  }
570
  // no topics need to be rebalanced
571
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
444✔
572
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
22✔
573
  }
574

575
END:
422✔
576
  return code;
444✔
577
}
578

579
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
972✔
580
  if (pTopicList == NULL || pMnode == NULL) {
972!
581
    return TSDB_CODE_INVALID_PARA;
×
582
  }
583
  taosArraySort(pTopicList, taosArrayCompareString);
972✔
584
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
972✔
585

586
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
972✔
587
  for (int i = 0; i < newTopicNum; i++) {
1,588✔
588
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
617✔
589
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
617✔
590
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
1✔
591
    }
592
  }
593
  return 0;
971✔
594
}
595

596
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
968✔
597
  if (pMnode == NULL || subscribe == NULL) {
968!
598
    return TSDB_CODE_INVALID_PARA;
×
599
  }
600
  int64_t         consumerId = subscribe->consumerId;
968✔
601
  char           *cgroup     = subscribe->cgroup;
968✔
602
  SMqConsumerObj *pConsumerNew     = NULL;
968✔
603
  SMqConsumerObj *pExistedConsumer = NULL;
968✔
604
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
968✔
605
  if (code != 0) {
968✔
606
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
524!
607
              ",cgroup:%s, numOfTopics:%d", consumerId,
608
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
609

610
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
524!
611
  } else {
612
    int32_t status = atomic_load_32(&pExistedConsumer->status);
444✔
613

614
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
444!
615
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
616
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
617
          (int32_t)taosArrayGetSize(subscribe->topicNames));
618

619
    if (status != MQ_CONSUMER_STATUS_READY) {
444!
620
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
621
      goto END;
×
622
    }
623
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
444!
624
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
444✔
625
  }
626
  mndReleaseConsumer(pMnode, pExistedConsumer);
946✔
627
  if (ppConsumer){
946!
628
    *ppConsumer = pConsumerNew;
946✔
629
  }
630
  return code;
946✔
631

632
END:
22✔
633
  mndReleaseConsumer(pMnode, pExistedConsumer);
22✔
634
  tDeleteSMqConsumerObj(pConsumerNew);
22✔
635
  return code;
22✔
636
}
637

638
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
1,324✔
639
  if (pMsg == NULL) {
1,324!
640
    return TSDB_CODE_INVALID_PARA;
×
641
  }
642
  SMnode *pMnode = pMsg->info.node;
1,324✔
643
  char   *msgStr = pMsg->pCont;
1,324✔
644
  int32_t code = 0;
1,324✔
645
  SMqConsumerObj *pConsumerNew = NULL;
1,324✔
646
  STrans         *pTrans = NULL;
1,324✔
647

648
  PRINT_LOG_START
1,324!
649
  SCMSubscribeReq subscribe = {0};
1,324✔
650
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
2,648!
651
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
1,324✔
652
  if(unSubscribe){
1,324✔
653
    SMqConsumerObj *pConsumerTmp = NULL;
769✔
654
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
1,115✔
655
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
763✔
656
      mndReleaseConsumer(pMnode, pConsumerTmp);
346✔
657
      goto END;
346✔
658
    }
659
    mndReleaseConsumer(pMnode, pConsumerTmp);
417✔
660
  }
661
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
972✔
662
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
971✔
663
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
664
                          pMsg, "subscribe");
665
  MND_TMQ_NULL_CHECK(pTrans);
971!
666

667
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
971✔
668
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
968✔
669
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
946!
670
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
946!
671
  code = TSDB_CODE_ACTION_IN_PROGRESS;
946✔
672

673
END:
1,324✔
674
  mndTransDrop(pTrans);
1,324✔
675
  tDeleteSMqConsumerObj(pConsumerNew);
1,324✔
676
  taosArrayDestroyP(subscribe.topicNames, NULL);
1,324✔
677
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
1,324✔
678
  PRINT_LOG_END(code);
1,324!
679
  return code;
1,324✔
680
}
681

682
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
2,841✔
683
  if (pConsumer == NULL) {
2,841!
684
    return NULL;
×
685
  }
686
  int32_t code = 0;
2,841✔
687
  int32_t lino = 0;
2,841✔
688
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,841✔
689

690
  void   *buf = NULL;
2,841✔
691
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
2,841✔
692
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
2,841✔
693

694
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
2,841✔
695
  if (pRaw == NULL) goto CM_ENCODE_OVER;
2,841!
696

697
  buf = taosMemoryMalloc(tlen);
2,841!
698
  if (buf == NULL) goto CM_ENCODE_OVER;
2,841!
699

700
  void *abuf = buf;
2,841✔
701
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
2,841!
702
    goto CM_ENCODE_OVER;
×
703
  }
704

705
  int32_t dataPos = 0;
2,841✔
706
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
2,841!
707
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
2,841!
708
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
2,841!
709
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
2,841!
710

711
  terrno = TSDB_CODE_SUCCESS;
2,841✔
712

713
CM_ENCODE_OVER:
2,841✔
714
  taosMemoryFreeClear(buf);
2,841!
715
  if (terrno != 0) {
2,841!
716
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
717
    sdbFreeRaw(pRaw);
×
718
    return NULL;
×
719
  }
720

721
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
2,841✔
722
  return pRaw;
2,841✔
723
}
724

725
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
2,526✔
726
  if (pRaw == NULL) {
2,526!
727
    return NULL;
×
728
  }
729
  int32_t         code = 0;
2,526✔
730
  int32_t         lino = 0;
2,526✔
731
  SSdbRow        *pRow = NULL;
2,526✔
732
  SMqConsumerObj *pConsumer = NULL;
2,526✔
733
  void           *buf = NULL;
2,526✔
734

735
  terrno = 0;
2,526✔
736
  int8_t sver = 0;
2,526✔
737
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
2,526!
738
    goto CM_DECODE_OVER;
×
739
  }
740

741
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
2,526!
742
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
743
    goto CM_DECODE_OVER;
×
744
  }
745

746
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
2,526✔
747
  if (pRow == NULL) {
2,526!
748
    goto CM_DECODE_OVER;
×
749
  }
750

751
  pConsumer = sdbGetRowObj(pRow);
2,526✔
752
  if (pConsumer == NULL) {
2,526!
753
    goto CM_DECODE_OVER;
×
754
  }
755

756
  int32_t dataPos = 0;
2,526✔
757
  int32_t len;
758
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
2,526!
759
  buf = taosMemoryMalloc(len);
2,526!
760
  if (buf == NULL) {
2,526!
761
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
762
    goto CM_DECODE_OVER;
×
763
  }
764

765
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
2,526!
766
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
2,526!
767

768
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
2,526!
769
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
770
    goto CM_DECODE_OVER;
×
771
  }
772

773
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
2,526✔
774

775
CM_DECODE_OVER:
2,526✔
776
  taosMemoryFreeClear(buf);
2,526!
777
  if (terrno != TSDB_CODE_SUCCESS) {
2,526!
778
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
779
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
780
    taosMemoryFreeClear(pRow);
×
781
  }
782

783
  return pRow;
2,526✔
784
}
785

786
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
555✔
787
  if (pConsumer == NULL) {
555!
788
    return TSDB_CODE_INVALID_PARA;
×
789
  }
790
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
555!
791
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
792
  pConsumer->subscribeTime = pConsumer->createTime;
555✔
793
  return 0;
555✔
794
}
795

796
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
2,526✔
797
  if (pConsumer == NULL) {
2,526!
798
    return TSDB_CODE_INVALID_PARA;
×
799
  }
800
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
2,526!
801
        mndConsumerStatusName(pConsumer->status));
802
  tClearSMqConsumerObj(pConsumer);
2,526✔
803
  return 0;
2,526✔
804
}
805

806
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
807
//  int32_t status = pConsumer->status;
808
//
809
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
810
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
811
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
812
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
813
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
814
//    }
815
//  }
816
//}
817

818
// remove from topic list
819
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
1,584✔
820
  if (topicList == NULL || pTopic == NULL) {
1,584!
821
    return;
×
822
  }
823
  int32_t size = taosArrayGetSize(topicList);
1,584✔
824
  for (int32_t i = 0; i < size; i++) {
1,712✔
825
    char *p = taosArrayGetP(topicList, i);
1,701✔
826
    if (strcmp(pTopic, p) == 0) {
1,701✔
827
      taosArrayRemove(topicList, i);
1,573✔
828
      taosMemoryFree(p);
1,573!
829

830
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
1,573!
831
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
832
      break;
1,573✔
833
    }
834
  }
835
}
836

837
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
598✔
838
  if (pConsumer == NULL || pTopic == NULL) {
598!
839
    return false;
×
840
  }
841
  bool    existing = false;
598✔
842
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
598✔
843
  for (int32_t i = 0; i < size; i++) {
679✔
844
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
81✔
845
    if (topic && strcmp(topic, pTopic) == 0) {
81!
846
      existing = true;
×
847
      break;
×
848
    }
849
  }
850

851
  return existing;
598✔
852
}
853

854
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
1,578✔
855
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
1,578!
856
    return TSDB_CODE_INVALID_PARA;
×
857
  }
858
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
1,578!
859
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
860

861
  taosWLockLatch(&pOldConsumer->lock);
1,578✔
862

863
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
1,578✔
864
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
427✔
865
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
427✔
866
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
427✔
867

868
    pOldConsumer->subscribeTime = taosGetTimestampMs();
427✔
869
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
427✔
870
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
427!
871
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
1,151✔
872
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
60✔
873
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
60✔
874
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
60!
875
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
1,091✔
876
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
598✔
877
    if (tmp == NULL){
598!
878
      return TSDB_CODE_TMQ_INVALID_MSG;
×
879
    }
880
    char *pNewTopic = taosStrdup(tmp);
598!
881
    if (pNewTopic == NULL) {
598!
882
      return terrno;
×
883
    }
884
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
598✔
885
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
598✔
886
    if (existing) {
598!
887
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
888
      taosMemoryFree(pNewTopic);
×
889
    } else {
890
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
1,196!
891
        taosMemoryFree(pNewTopic);
×
892
        return TSDB_CODE_TMQ_INVALID_MSG;
×
893
      }
894
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
598✔
895
    }
896

897
    int32_t status = pOldConsumer->status;
598✔
898
//    updateConsumerStatus(pOldConsumer);
899
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
598!
900
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
536✔
901
    }
902

903
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
598✔
904
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
598✔
905

906
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
598!
907
          ", current topics:%d, newTopics:%d, removeTopics:%d",
908
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
909
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
910
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
911
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
912

913
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
493!
914
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
493✔
915
    if (topic == NULL){
493!
916
      return TSDB_CODE_TMQ_INVALID_MSG;
×
917
    }
918
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
493✔
919
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
493✔
920

921
    int32_t status = pOldConsumer->status;
493✔
922
//    updateConsumerStatus(pOldConsumer);
923
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
493!
924
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
433✔
925
    }
926
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
493✔
927
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
493✔
928

929
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
493!
930
          ", current topics:%d, newTopics:%d, removeTopics:%d",
931
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
932
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
933
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
934
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
935
  }
936

937
  taosWUnLockLatch(&pOldConsumer->lock);
1,578✔
938
  return 0;
1,578✔
939
}
940

941
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
642,018✔
942
  if (pMnode == NULL || pConsumer == NULL) {
642,018!
943
    return TSDB_CODE_INVALID_PARA;
×
944
  }
945
  SSdb           *pSdb = pMnode->pSdb;
642,018✔
946
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
642,018✔
947
  if (*pConsumer == NULL) {
642,018✔
948
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
571✔
949
  }
950
  return 0;
641,447✔
951
}
952

953
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
645,965✔
954
  if (pMnode == NULL || pConsumer == NULL) {
645,965!
955
    return;
565✔
956
  }
957
  SSdb *pSdb = pMnode->pSdb;
645,400✔
958
  sdbRelease(pSdb, pConsumer);
645,400✔
959
}
960

961
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
93✔
962
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
93!
963
    return TSDB_CODE_INVALID_PARA;
×
964
  }
965
  SMnode         *pMnode = pReq->info.node;
93✔
966
  SSdb           *pSdb = pMnode->pSdb;
93✔
967
  int32_t         numOfRows = 0;
93✔
968
  SMqConsumerObj *pConsumer = NULL;
93✔
969
  int32_t         code = 0;
93✔
970
  char           *parasStr = NULL;
93✔
971
  char           *status = NULL;
93✔
972

973
  while (numOfRows < rowsCapacity) {
275!
974
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
275✔
975
    if (pShow->pIter == NULL) {
275✔
976
      break;
93✔
977
    }
978

979
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
182✔
980
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
4!
981
      sdbRelease(pSdb, pConsumer);
4✔
982
      continue;
4✔
983
    }
984

985
    taosRLockLatch(&pConsumer->lock);
178✔
986
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
178!
987

988
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
178✔
989
    bool    hasTopic = true;
178✔
990
    if (topicSz == 0) {
178!
991
      hasTopic = false;
×
992
      topicSz = 1;
×
993
    }
994

995
    if (numOfRows + topicSz > rowsCapacity) {
178!
996
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
997
    }
998

999
    for (int32_t i = 0; i < topicSz; i++) {
372✔
1000
      SColumnInfoData *pColInfo = NULL;
194✔
1001
      int32_t          cols = 0;
194✔
1002

1003
      // consumer id
1004
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1005
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
194✔
1006
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
194✔
1007

1008
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1009
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1010
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
194!
1011

1012
      // consumer group
1013
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1014
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
194✔
1015

1016
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1017
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1018
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
194!
1019

1020
      // client id
1021
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1022
      STR_TO_VARSTR(clientId, pConsumer->clientId);
194✔
1023

1024
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1025
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1026
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
194!
1027

1028
      // user
1029
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1030
      STR_TO_VARSTR(user, pConsumer->user);
194✔
1031

1032
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1033
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1034
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
194!
1035

1036
      // fqdn
1037
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1038
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
194✔
1039

1040
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1041
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1042
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
194!
1043

1044
      // status
1045
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
194✔
1046
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
194!
1047
      MND_TMQ_NULL_CHECK(status);
194!
1048
      STR_TO_VARSTR(status, pStatusName);
194✔
1049

1050
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1051
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1052
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
194!
1053
      taosMemoryFreeClear(status);
194!
1054

1055
      // one subscribed topic
1056
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1057
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1058
      if (hasTopic) {
194!
1059
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
194✔
1060
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
194✔
1061
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
194✔
1062
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
194!
1063
      } else {
1064
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1065
      }
1066

1067
      // up time
1068
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1069
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1070
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
194!
1071

1072
      // subscribe time
1073
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1074
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1075
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
194!
1076

1077
      // rebalance time
1078
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1079
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1080
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
194!
1081

1082
      char         buf[TSDB_OFFSET_LEN] = {0};
194✔
1083
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
194✔
1084
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
194✔
1085

1086
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
194!
1087
      MND_TMQ_NULL_CHECK(parasStr);
194!
1088
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
194✔
1089
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
194✔
1090
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
194✔
1091

1092
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
194✔
1093
      MND_TMQ_NULL_CHECK(pColInfo);
194!
1094
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
194!
1095
      taosMemoryFreeClear(parasStr);
194!
1096
      numOfRows++;
194✔
1097
    }
1098

1099
    taosRUnLockLatch(&pConsumer->lock);
178✔
1100
    sdbRelease(pSdb, pConsumer);
178✔
1101

1102
    pBlock->info.rows = numOfRows;
178✔
1103
  }
1104

1105
  pShow->numOfRows += numOfRows;
93✔
1106
  return numOfRows;
93✔
1107

1108
END:
×
1109
  taosMemoryFreeClear(status);
×
1110
  taosMemoryFreeClear(parasStr);
×
1111
  return code;
×
1112
}
1113

1114
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1115
  if (pMnode == NULL || pIter == NULL) return;
×
1116
  SSdb *pSdb = pMnode->pSdb;
×
1117
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1118
}
1119

1120
const char *mndConsumerStatusName(int status) {
13,484✔
1121
  switch (status) {
13,484!
1122
    case MQ_CONSUMER_STATUS_READY:
5,414✔
1123
      return "ready";
5,414✔
1124
//    case MQ_CONSUMER_STATUS_LOST:
1125
//      return "lost";
1126
    case MQ_CONSUMER_STATUS_REBALANCE:
8,070✔
1127
      return "rebalancing";
8,070✔
1128
    default:
×
1129
      return "unknown";
×
1130
  }
1131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc