• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4106

19 May 2025 07:15AM UTC coverage: 62.857% (-0.2%) from 63.042%
#4106

push

travis-ci

GitHub
Merge pull request #31115 from taosdata/merge/mainto3.0

156749 of 318088 branches covered (49.28%)

Branch coverage included in aggregate %.

242535 of 317143 relevant lines covered (76.47%)

18746393.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.98
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
2,196✔
45
  SSdbTable table = {
2,196✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
2,196!
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
2,196✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
2,196✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
2,196✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
2,196✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
2,196✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
2,196✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
2,196✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
2,195✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
376✔
72
  if (pMnode == NULL || info == NULL) {
376!
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
376✔
76
  void   *msg  = rpcMallocCont(sizeof(int64_t));
376✔
77
  MND_TMQ_NULL_CHECK(msg);
376!
78

79
  *(int64_t*)msg = consumerId;
376✔
80
  SRpcMsg rpcMsg = {
376✔
81
      .msgType = msgType,
82
      .pCont = msg,
83
      .contLen = sizeof(int64_t),
84
      .info = *info,
85
  };
86

87
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
376!
88
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
376!
89
  return code;
376✔
90

91
END:
×
92
  taosMemoryFree(msg);
×
93
  return code;
×
94
}
95

96
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
952✔
97
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
952!
98
    return TSDB_CODE_INVALID_PARA;
×
99
  }
100
  SMqTopicObj *pTopic = NULL;
952✔
101
  int32_t      code = 0;
952✔
102

103
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
952✔
104
  for (int32_t i = 0; i < numOfTopics; i++) {
1,548✔
105
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
599✔
106
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
602!
107
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
599!
108
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
599!
109

110
    if (subscribe->enableReplay) {
599✔
111
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
7✔
112
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
2✔
113
        goto END;
2✔
114
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
5!
115
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
4✔
116
        if (pDb == NULL) {
4!
117
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
          goto END;
×
119
        }
120
        if (pDb->cfg.numOfVgroups != 1) {
4✔
121
          mndReleaseDb(pMnode, pDb);
1✔
122
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
1✔
123
          goto END;
1✔
124
        }
125
        mndReleaseDb(pMnode, pDb);
3✔
126
      }
127
    }
128
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
596✔
129
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
596✔
130
    mndTransSetDbName(pTrans, pTopic->db, key);
596✔
131
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
596!
132
    mndReleaseTopic(pMnode, pTopic);
596✔
133
  }
134
  return 0;
949✔
135

136
END:
3✔
137
  mndReleaseTopic(pMnode, pTopic);
3✔
138
  return code;
3✔
139
}
140

141
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
376✔
142
  if (pMsg == NULL || pMsg->pCont == NULL) {
376!
143
    return TSDB_CODE_INVALID_PARA;
×
144
  }
145
  int32_t              code = 0;
376✔
146
  SMnode              *pMnode = pMsg->info.node;
376✔
147
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
376✔
148
  SMqConsumerObj      *pConsumerNew = NULL;
376✔
149
  STrans              *pTrans = NULL;
376✔
150
  SMqConsumerObj      *pConsumer = NULL;
376✔
151

152
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
376!
153
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
376!
154
        mndConsumerStatusName(pConsumer->status));
155

156
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
376!
157
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
376✔
158
  MND_TMQ_NULL_CHECK(pTrans);
376!
159
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
376!
160
  code = mndTransPrepare(pMnode, pTrans);
376✔
161

162
END:
376✔
163
  mndReleaseConsumer(pMnode, pConsumer);
376✔
164
  tDeleteSMqConsumerObj(pConsumerNew);
376✔
165
  mndTransDrop(pTrans);
376✔
166
  return code;
376✔
167
}
168

169
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
2,628✔
170
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
2,628!
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  int32_t code = 0;
2,628✔
174
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
2,628✔
175
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
2,628!
176
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
5,060✔
177
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,432✔
178
    SMqTopicObj *pTopic = NULL;
2,432✔
179
    code = mndAcquireTopic(pMnode, topic, &pTopic);
2,432✔
180
    if (code != TDB_CODE_SUCCESS) {
2,432!
181
      continue;
×
182
    }
183
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
2,432✔
184
    MND_TMQ_NULL_CHECK(data);
2,432!
185
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
2,432✔
186
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
4,864!
187
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
2,432✔
188
      data->noPrivilege = 1;
×
189
    } else {
190
      data->noPrivilege = 0;
2,432✔
191
    }
192
    mndReleaseTopic(pMnode, pTopic);
2,432✔
193
  }
194
END:
2,628✔
195
  return code;
2,628✔
196
}
197

198
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
2,628✔
199
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
2,628!
200
    return;
×
201
  }
202
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
5,090✔
203
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
2,462✔
204
    if (data == NULL){
2,462!
205
      continue;
×
206
    }
207
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
2,462!
208

209
    SMqSubscribeObj *pSub = NULL;
2,462✔
210
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,462✔
211
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
2,462✔
212
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,462✔
213
    if (code != 0) {
2,462!
214
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
215
      continue;
×
216
    }
217
    taosWLockLatch(&pSub->lock);
2,462✔
218
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,462✔
219
    if (pConsumerEp) {
2,462✔
220
      taosArrayDestroy(pConsumerEp->offsetRows);
2,399✔
221
      pConsumerEp->offsetRows = data->offsetRows;
2,399✔
222
      data->offsetRows = NULL;
2,399✔
223
    }
224
    taosWUnLockLatch(&pSub->lock);
2,462✔
225

226
    mndReleaseSubscribe(pMnode, pSub);
2,462✔
227
  }
228
}
229

230
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
2,628✔
231
  if (pMsg == NULL || rsp == NULL){
2,628!
232
    return TSDB_CODE_INVALID_PARA;
×
233
  }
234
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
2,628✔
235
  if (tlen <= 0){
2,628!
236
    return TSDB_CODE_TMQ_INVALID_MSG;
×
237
  }
238
  void   *buf = rpcMallocCont(tlen);
2,628✔
239
  if (buf == NULL) {
2,628!
240
    return terrno;
×
241
  }
242

243
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
2,628!
244
    rpcFreeCont(buf);
×
245
    return TSDB_CODE_TMQ_INVALID_MSG;
×
246
  }
247
  pMsg->info.rsp = buf;
2,628✔
248
  pMsg->info.rspLen = tlen;
2,628✔
249
  return 0;
2,628✔
250
}
251

252
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
2,647✔
253
  if (pMsg == NULL) {
2,647!
254
    return TSDB_CODE_INVALID_PARA;
×
255
  }
256
  PRINT_LOG_START
2,647!
257
  int32_t         code = 0;
2,647✔
258
  SMnode         *pMnode = pMsg->info.node;
2,647✔
259
  SMqHbReq        req = {0};
2,647✔
260
  SMqHbRsp        rsp = {0};
2,647✔
261
  SMqConsumerObj *pConsumer = NULL;
2,647✔
262
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
2,647!
263
  int64_t consumerId = req.consumerId;
2,647✔
264
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
2,647✔
265
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
2,628!
266
  atomic_store_32(&pConsumer->hbStatus, 0);
2,628✔
267
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
2,628!
268
  if (req.pollFlag == 1){
2,628✔
269
    atomic_store_32(&pConsumer->pollStatus, 0);
848✔
270
    pConsumer->pollTime = taosGetTimestampMs();
1,696✔
271
  }
272

273
  storeOffsetRows(pMnode, &req, pConsumer);
2,628✔
274
  rsp.debugFlag = tqClientDebugFlag;
2,628✔
275
  code = buildMqHbRsp(pMsg, &rsp);
2,628✔
276

277
END:
2,647✔
278
  tDestroySMqHbRsp(&rsp);
2,647✔
279
  mndReleaseConsumer(pMnode, pConsumer);
2,647✔
280
  tDestroySMqHbReq(&req);
2,647✔
281
  PRINT_LOG_END(code)
2,647!
282
  return code;
2,647✔
283
}
284

285
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
3,796✔
286
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
3,796!
287
    return TSDB_CODE_INVALID_PARA;
×
288
  }
289
  taosRLockLatch(&pConsumer->lock);
3,796✔
290

291
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
3,796✔
292

293
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
3,796✔
294
  if (rsp->topics == NULL) {
3,796!
295
    taosRUnLockLatch(&pConsumer->lock);
×
296
    return terrno;
×
297
  }
298

299
  // handle all topics subscribed by this consumer
300
  for (int32_t i = 0; i < numOfTopics; i++) {
7,269✔
301
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
3,473✔
302
    SMqSubscribeObj *pSub = NULL;
3,473✔
303
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
3,473✔
304
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
3,473✔
305
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
3,473✔
306
    if (code != 0) {
3,473!
307
      continue;
×
308
    }
309
    taosRLockLatch(&pSub->lock);
3,473✔
310

311
    SMqSubTopicEp topicEp = {0};
3,473✔
312
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
3,473✔
313

314
    // 2.1 fetch topic schema
315
    SMqTopicObj *pTopic = NULL;
3,473✔
316
    code = mndAcquireTopic(pMnode, topic, &pTopic);
3,473✔
317
    if (code != TDB_CODE_SUCCESS) {
3,473!
318
      taosRUnLockLatch(&pSub->lock);
×
319
      mndReleaseSubscribe(pMnode, pSub);
×
320
      continue;
×
321
    }
322
    taosRLockLatch(&pTopic->lock);
3,473✔
323
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
3,473✔
324
    topicEp.schema.nCols = pTopic->schema.nCols;
3,473✔
325
    if (topicEp.schema.nCols) {
3,473✔
326
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
2,139!
327
      if (topicEp.schema.pSchema == NULL) {
2,139!
328
        taosRUnLockLatch(&pTopic->lock);
×
329
        taosRUnLockLatch(&pSub->lock);
×
330
        mndReleaseSubscribe(pMnode, pSub);
×
331
        mndReleaseTopic(pMnode, pTopic);
×
332
        return terrno;
×
333
      }
334
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
2,139✔
335
    }
336
    taosRUnLockLatch(&pTopic->lock);
3,473✔
337
    mndReleaseTopic(pMnode, pTopic);
3,473✔
338

339
    // 2.2 iterate all vg assigned to the consumer of that topic
340
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
3,473✔
341
    if (pConsumerEp == NULL) {
3,473!
342
      taosRUnLockLatch(&pConsumer->lock);
×
343
      taosRUnLockLatch(&pSub->lock);
×
344
      mndReleaseSubscribe(pMnode, pSub);
×
345
      return TSDB_CODE_OUT_OF_MEMORY;
×
346
    }
347
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
3,473✔
348
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
3,473✔
349
    if (topicEp.vgs == NULL) {
3,473!
350
      taosRUnLockLatch(&pConsumer->lock);
×
351
      taosRUnLockLatch(&pSub->lock);
×
352
      mndReleaseSubscribe(pMnode, pSub);
×
353
      return terrno;
×
354
    }
355

356
    for (int32_t j = 0; j < vgNum; j++) {
7,793✔
357
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
4,320✔
358
      if (pVgEp == NULL) {
4,320!
359
        continue;
×
360
      }
361
      if (epoch == -1) {
4,320✔
362
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,805✔
363
        if (pVgroup) {
2,805✔
364
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
162✔
365
          mndReleaseVgroup(pMnode, pVgroup);
162✔
366
        }
367
      }
368
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
4,320✔
369
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
8,640!
370
        taosArrayDestroy(topicEp.vgs);
×
371
        taosRUnLockLatch(&pConsumer->lock);
×
372
        taosRUnLockLatch(&pSub->lock);
×
373
        mndReleaseSubscribe(pMnode, pSub);
×
374
        return terrno;
×
375
      }
376
    }
377
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
6,946!
378
      taosArrayDestroy(topicEp.vgs);
×
379
      taosRUnLockLatch(&pConsumer->lock);
×
380
      taosRUnLockLatch(&pSub->lock);
×
381
      mndReleaseSubscribe(pMnode, pSub);
×
382
      return terrno;
×
383
    }
384
    taosRUnLockLatch(&pSub->lock);
3,473✔
385
    mndReleaseSubscribe(pMnode, pSub);
3,473✔
386
  }
387
  taosRUnLockLatch(&pConsumer->lock);
3,796✔
388
  return 0;
3,796✔
389
}
390

391
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
8,359✔
392
  if (pMsg == NULL || rsp == NULL) {
8,359!
393
    return TSDB_CODE_INVALID_PARA;
×
394
  }
395
  int32_t code = 0;
8,359✔
396
  // encode rsp
397
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
8,359✔
398
  void   *buf = rpcMallocCont(tlen);
8,359✔
399
  if (buf == NULL) {
8,359!
400
    return terrno;
×
401
  }
402

403
  SMqRspHead *pHead = buf;
8,359✔
404

405
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
8,359✔
406
  pHead->epoch = serverEpoch;
8,359✔
407
  pHead->consumerId = consumerId;
8,359✔
408
  pHead->walsver = 0;
8,359✔
409
  pHead->walever = 0;
8,359✔
410

411
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
8,359✔
412
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
8,359!
413
    rpcFreeCont(buf);
×
414
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
415
  }
416

417
  // send rsp
418
  pMsg->info.rsp = buf;
8,359✔
419
  pMsg->info.rspLen = tlen;
8,359✔
420
  return code;
8,359✔
421
}
422

423
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
11,606✔
424
  if (pMsg == NULL) {
11,606!
425
    return TSDB_CODE_INVALID_PARA;
×
426
  }
427
  SMnode     *pMnode = pMsg->info.node;
11,606✔
428
  SMqAskEpReq req = {0};
11,606✔
429
  SMqAskEpRsp rsp = {0};
11,606✔
430
  int32_t     code = 0;
11,606✔
431
  SMqConsumerObj *pConsumer = NULL;
11,606✔
432
  PRINT_LOG_START
11,606!
433

434
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
11,606!
435
  int64_t consumerId = req.consumerId;
11,606✔
436
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
11,606✔
437
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
11,573!
438
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
439
           pConsumer->cgroup);
440
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
441
    goto END;
×
442
  }
443

444
  // 1. check consumer status
445
  int32_t status = atomic_load_32(&pConsumer->status);
11,573✔
446
  if (status != MQ_CONSUMER_STATUS_READY) {
11,573✔
447
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
3,214!
448
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
3,214✔
449
    goto END;
3,214✔
450
  }
451

452
  int32_t epoch = req.epoch;
8,359✔
453
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
8,359✔
454

455
  // 2. check epoch, only send ep info when epochs do not match
456
  if (epoch != serverEpoch) {
8,359✔
457
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
3,796!
458
          consumerId, epoch, serverEpoch);
459
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
3,796!
460
  }
461

462
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
8,359✔
463

464
END:
11,606✔
465
  tDeleteSMqAskEpRsp(&rsp);
466
  mndReleaseConsumer(pMnode, pConsumer);
11,606✔
467
  PRINT_LOG_END(code);
11,606!
468
  return code;
11,606✔
469
}
470

471
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
378✔
472
  if (pConsumer == NULL || pTrans == NULL) {
378!
473
    return TSDB_CODE_INVALID_PARA;
×
474
  }
475
  int32_t  code = 0;
378✔
476
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
378✔
477
  MND_TMQ_NULL_CHECK(pCommitRaw);
378!
478
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
378✔
479
  if (code != 0) {
378!
480
    sdbFreeRaw(pCommitRaw);
×
481
    goto END;
×
482
  }
483
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
378!
484
END:
378✔
485
  return code;
378✔
486
}
487

488
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
2,077✔
489
  if (pConsumer == NULL || pTrans == NULL) {
2,077!
490
    return TSDB_CODE_INVALID_PARA;
×
491
  }
492
  int32_t  code = 0;
2,077✔
493
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
2,077✔
494
  MND_TMQ_NULL_CHECK(pCommitRaw);
2,077!
495
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
2,077✔
496
  if (code != 0) {
2,077!
497
    sdbFreeRaw(pCommitRaw);
×
498
    goto END;
×
499
  }
500
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
2,077!
501
END:
2,077✔
502
  return code;
2,077✔
503
}
504

505
static void freeItem(void *param) {
1✔
506
  if (param == NULL) {
1!
507
    return;
×
508
  }
509
  void *pItem = *(void **)param;
1✔
510
  if (pItem != NULL) {
1!
511
    taosMemoryFree(pItem);
1!
512
  }
513
}
514

515
#define ADD_TOPIC_TO_ARRAY(element, array) \
516
char *newTopicCopy = taosStrdup(element); \
517
MND_TMQ_NULL_CHECK(newTopicCopy);\
518
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
519
  taosMemoryFree(newTopicCopy);\
520
  code = terrno;\
521
  goto END;\
522
}
523

524
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
426✔
525
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
426!
526
    return TSDB_CODE_INVALID_PARA;
×
527
  }
528
  int32_t code = 0;
426✔
529
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
426✔
530
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
426!
531
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
426✔
532
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
426!
533

534
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
426✔
535
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
426✔
536
  int32_t i = 0, j = 0;
426✔
537
  while (i < oldTopicNum || j < newTopicNum) {
912✔
538
    if (i >= oldTopicNum) {
486✔
539
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
4✔
540
      MND_TMQ_NULL_CHECK(tmp);
4!
541
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
8!
542
      j++;
4✔
543
      continue;
4✔
544
    } else if (j >= newTopicNum) {
482✔
545
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
475✔
546
      MND_TMQ_NULL_CHECK(tmp);
475!
547
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
950!
548
      i++;
475✔
549
      continue;
475✔
550
    } else {
551
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
7✔
552
      MND_TMQ_NULL_CHECK(oldTopic);
7!
553
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
7✔
554
      MND_TMQ_NULL_CHECK(newTopic);
7!
555
      int   comp = strcmp(oldTopic, newTopic);
7✔
556
      if (comp == 0) {
7!
557
        i++;
7✔
558
        j++;
7✔
559
        continue;
7✔
560
      } else if (comp < 0) {
×
561
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
562
        i++;
×
563
        continue;
×
564
      } else {
×
565
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
566
        j++;
×
567
        continue;
×
568
      }
569
    }
570
  }
571
  // no topics need to be rebalanced
572
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
426✔
573
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
7✔
574
  }
575

576
END:
419✔
577
  return code;
426✔
578
}
579

580
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
953✔
581
  if (pTopicList == NULL || pMnode == NULL) {
953!
582
    return TSDB_CODE_INVALID_PARA;
×
583
  }
584
  taosArraySort(pTopicList, taosArrayCompareString);
953✔
585
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
953✔
586

587
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
953✔
588
  for (int i = 0; i < newTopicNum; i++) {
1,552✔
589
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
600✔
590
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
600✔
591
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
1✔
592
    }
593
  }
594
  return 0;
952✔
595
}
596

597
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
949✔
598
  if (pMnode == NULL || subscribe == NULL) {
949!
599
    return TSDB_CODE_INVALID_PARA;
×
600
  }
601
  int64_t         consumerId = subscribe->consumerId;
949✔
602
  char           *cgroup     = subscribe->cgroup;
949✔
603
  SMqConsumerObj *pConsumerNew     = NULL;
949✔
604
  SMqConsumerObj *pExistedConsumer = NULL;
949✔
605
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
949✔
606
  if (code != 0) {
949✔
607
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
523!
608
              ",cgroup:%s, numOfTopics:%d", consumerId,
609
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
610

611
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
523!
612
  } else {
613
    int32_t status = atomic_load_32(&pExistedConsumer->status);
426✔
614

615
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
426!
616
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
617
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
618
          (int32_t)taosArrayGetSize(subscribe->topicNames));
619

620
    if (status != MQ_CONSUMER_STATUS_READY) {
426!
621
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
622
      goto END;
×
623
    }
624
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
426!
625
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
426✔
626
  }
627
  mndReleaseConsumer(pMnode, pExistedConsumer);
942✔
628
  if (ppConsumer){
942!
629
    *ppConsumer = pConsumerNew;
942✔
630
  }
631
  return code;
942✔
632

633
END:
7✔
634
  mndReleaseConsumer(pMnode, pExistedConsumer);
7✔
635
  tDeleteSMqConsumerObj(pConsumerNew);
7✔
636
  return code;
7✔
637
}
638

639
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
1,314✔
640
  if (pMsg == NULL) {
1,314!
641
    return TSDB_CODE_INVALID_PARA;
×
642
  }
643
  SMnode *pMnode = pMsg->info.node;
1,314✔
644
  char   *msgStr = pMsg->pCont;
1,314✔
645
  int32_t code = 0;
1,314✔
646
  SMqConsumerObj *pConsumerNew = NULL;
1,314✔
647
  STrans         *pTrans = NULL;
1,314✔
648

649
  PRINT_LOG_START
1,314!
650
  SCMSubscribeReq subscribe = {0};
1,314✔
651
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
2,628!
652
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
1,314✔
653
  if(unSubscribe){
1,314✔
654
    SMqConsumerObj *pConsumerTmp = NULL;
776✔
655
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
1,123✔
656
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
762✔
657
      mndReleaseConsumer(pMnode, pConsumerTmp);
347✔
658
      goto END;
347✔
659
    }
660
    mndReleaseConsumer(pMnode, pConsumerTmp);
415✔
661
  }
662
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
953✔
663
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
952✔
664
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
665
                          pMsg, "subscribe");
666
  MND_TMQ_NULL_CHECK(pTrans);
952!
667

668
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
952✔
669
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
949✔
670
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
942!
671
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
942!
672
  code = TSDB_CODE_ACTION_IN_PROGRESS;
942✔
673

674
END:
1,314✔
675
  mndTransDrop(pTrans);
1,314✔
676
  tDeleteSMqConsumerObj(pConsumerNew);
1,314✔
677
  taosArrayDestroyP(subscribe.topicNames, NULL);
1,314✔
678
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
1,314✔
679
  PRINT_LOG_END(code);
1,314!
680
  return code;
1,314✔
681
}
682

683
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
2,830✔
684
  if (pConsumer == NULL) {
2,830!
685
    return NULL;
×
686
  }
687
  int32_t code = 0;
2,830✔
688
  int32_t lino = 0;
2,830✔
689
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,830✔
690

691
  void   *buf = NULL;
2,830✔
692
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
2,830✔
693
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
2,830✔
694

695
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
2,830✔
696
  if (pRaw == NULL) goto CM_ENCODE_OVER;
2,830!
697

698
  buf = taosMemoryMalloc(tlen);
2,830!
699
  if (buf == NULL) goto CM_ENCODE_OVER;
2,830!
700

701
  void *abuf = buf;
2,830✔
702
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
2,830!
703
    goto CM_ENCODE_OVER;
×
704
  }
705

706
  int32_t dataPos = 0;
2,830✔
707
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
2,830!
708
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
2,830!
709
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
2,830!
710
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
2,830!
711

712
  terrno = TSDB_CODE_SUCCESS;
2,830✔
713

714
CM_ENCODE_OVER:
2,830✔
715
  taosMemoryFreeClear(buf);
2,830!
716
  if (terrno != 0) {
2,830!
717
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
718
    sdbFreeRaw(pRaw);
×
719
    return NULL;
×
720
  }
721

722
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
2,830✔
723
  return pRaw;
2,830✔
724
}
725

726
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
2,507✔
727
  if (pRaw == NULL) {
2,507!
728
    return NULL;
×
729
  }
730
  int32_t         code = 0;
2,507✔
731
  int32_t         lino = 0;
2,507✔
732
  SSdbRow        *pRow = NULL;
2,507✔
733
  SMqConsumerObj *pConsumer = NULL;
2,507✔
734
  void           *buf = NULL;
2,507✔
735

736
  terrno = 0;
2,507✔
737
  int8_t sver = 0;
2,507✔
738
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
2,507!
739
    goto CM_DECODE_OVER;
×
740
  }
741

742
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
2,507!
743
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
744
    goto CM_DECODE_OVER;
×
745
  }
746

747
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
2,507✔
748
  if (pRow == NULL) {
2,507!
749
    goto CM_DECODE_OVER;
×
750
  }
751

752
  pConsumer = sdbGetRowObj(pRow);
2,507✔
753
  if (pConsumer == NULL) {
2,507!
754
    goto CM_DECODE_OVER;
×
755
  }
756

757
  int32_t dataPos = 0;
2,507✔
758
  int32_t len;
759
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
2,507!
760
  buf = taosMemoryMalloc(len);
2,507!
761
  if (buf == NULL) {
2,507!
762
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
763
    goto CM_DECODE_OVER;
×
764
  }
765

766
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
2,507!
767
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
2,507!
768

769
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
2,507!
770
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
771
    goto CM_DECODE_OVER;
×
772
  }
773

774
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
2,507✔
775

776
CM_DECODE_OVER:
2,507✔
777
  taosMemoryFreeClear(buf);
2,507!
778
  if (terrno != TSDB_CODE_SUCCESS) {
2,507!
779
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
780
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
781
    taosMemoryFreeClear(pRow);
×
782
  }
783

784
  return pRow;
2,507✔
785
}
786

787
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
554✔
788
  if (pConsumer == NULL) {
554!
789
    return TSDB_CODE_INVALID_PARA;
×
790
  }
791
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
554!
792
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
793
  pConsumer->subscribeTime = pConsumer->createTime;
554✔
794
  return 0;
554✔
795
}
796

797
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
2,507✔
798
  if (pConsumer == NULL) {
2,507!
799
    return TSDB_CODE_INVALID_PARA;
×
800
  }
801
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
2,507!
802
        mndConsumerStatusName(pConsumer->status));
803
  tClearSMqConsumerObj(pConsumer);
2,507✔
804
  return 0;
2,507✔
805
}
806

807
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
808
//  int32_t status = pConsumer->status;
809
//
810
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
811
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
812
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
813
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
814
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
815
//    }
816
//  }
817
//}
818

819
// remove from topic list
820
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
1,578✔
821
  if (topicList == NULL || pTopic == NULL) {
1,578!
822
    return;
×
823
  }
824
  int32_t size = taosArrayGetSize(topicList);
1,578✔
825
  for (int32_t i = 0; i < size; i++) {
1,706✔
826
    char *p = taosArrayGetP(topicList, i);
1,695✔
827
    if (strcmp(pTopic, p) == 0) {
1,695✔
828
      taosArrayRemove(topicList, i);
1,567✔
829
      taosMemoryFree(p);
1,567!
830

831
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
1,567!
832
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
833
      break;
1,567✔
834
    }
835
  }
836
}
837

838
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
596✔
839
  if (pConsumer == NULL || pTopic == NULL) {
596!
840
    return false;
×
841
  }
842
  bool    existing = false;
596✔
843
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
596✔
844
  for (int32_t i = 0; i < size; i++) {
677✔
845
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
81✔
846
    if (topic && strcmp(topic, pTopic) == 0) {
81!
847
      existing = true;
×
848
      break;
×
849
    }
850
  }
851

852
  return existing;
596✔
853
}
854

855
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
1,571✔
856
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
1,571!
857
    return TSDB_CODE_INVALID_PARA;
×
858
  }
859
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
1,571!
860
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
861

862
  taosWLockLatch(&pOldConsumer->lock);
1,571✔
863

864
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
1,571✔
865
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
424✔
866
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
424✔
867
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
424✔
868

869
    pOldConsumer->subscribeTime = taosGetTimestampMs();
424✔
870
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
424✔
871
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
424!
872
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
1,147✔
873
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
60✔
874
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
60✔
875
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
60!
876
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
1,087✔
877
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
596✔
878
    if (tmp == NULL){
596!
879
      return TSDB_CODE_TMQ_INVALID_MSG;
×
880
    }
881
    char *pNewTopic = taosStrdup(tmp);
596!
882
    if (pNewTopic == NULL) {
596!
883
      return terrno;
×
884
    }
885
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
596✔
886
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
596✔
887
    if (existing) {
596!
888
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
889
      taosMemoryFree(pNewTopic);
×
890
    } else {
891
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
1,192!
892
        taosMemoryFree(pNewTopic);
×
893
        return TSDB_CODE_TMQ_INVALID_MSG;
×
894
      }
895
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
596✔
896
    }
897

898
    int32_t status = pOldConsumer->status;
596✔
899
//    updateConsumerStatus(pOldConsumer);
900
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
596!
901
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
534✔
902
    }
903

904
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
596✔
905
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
596✔
906

907
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
596!
908
          ", current topics:%d, newTopics:%d, removeTopics:%d",
909
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
910
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
911
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
912
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
913

914
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
491!
915
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
491✔
916
    if (topic == NULL){
491!
917
      return TSDB_CODE_TMQ_INVALID_MSG;
×
918
    }
919
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
491✔
920
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
491✔
921

922
    int32_t status = pOldConsumer->status;
491✔
923
//    updateConsumerStatus(pOldConsumer);
924
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
491!
925
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
431✔
926
    }
927
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
491✔
928
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
491✔
929

930
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
491!
931
          ", current topics:%d, newTopics:%d, removeTopics:%d",
932
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
933
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
934
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
935
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
936
  }
937

938
  taosWUnLockLatch(&pOldConsumer->lock);
1,571✔
939
  return 0;
1,571✔
940
}
941

942
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
17,195✔
943
  if (pMnode == NULL || pConsumer == NULL) {
17,195!
944
    return TSDB_CODE_INVALID_PARA;
×
945
  }
946
  SSdb           *pSdb = pMnode->pSdb;
17,195✔
947
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
17,195✔
948
  if (*pConsumer == NULL) {
17,195✔
949
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
589✔
950
  }
951
  return 0;
16,606✔
952
}
953

954
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
20,734✔
955
  if (pMnode == NULL || pConsumer == NULL) {
20,734!
956
    return;
575✔
957
  }
958
  SSdb *pSdb = pMnode->pSdb;
20,159✔
959
  sdbRelease(pSdb, pConsumer);
20,159✔
960
}
961

962
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
8,737✔
963
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
8,737!
964
    return TSDB_CODE_INVALID_PARA;
×
965
  }
966
  SMnode         *pMnode = pReq->info.node;
8,741✔
967
  SSdb           *pSdb = pMnode->pSdb;
8,741✔
968
  int32_t         numOfRows = 0;
8,741✔
969
  SMqConsumerObj *pConsumer = NULL;
8,741✔
970
  int32_t         code = 0;
8,741✔
971
  char           *parasStr = NULL;
8,741✔
972
  char           *status = NULL;
8,741✔
973

974
  while (numOfRows < rowsCapacity) {
8,920!
975
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
8,920✔
976
    if (pShow->pIter == NULL) {
8,924✔
977
      break;
8,743✔
978
    }
979

980
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
181✔
981
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
4!
982
      sdbRelease(pSdb, pConsumer);
4✔
983
      continue;
4✔
984
    }
985

986
    taosRLockLatch(&pConsumer->lock);
177✔
987
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
177!
988

989
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
177✔
990
    bool    hasTopic = true;
177✔
991
    if (topicSz == 0) {
177!
992
      hasTopic = false;
×
993
      topicSz = 1;
×
994
    }
995

996
    if (numOfRows + topicSz > rowsCapacity) {
177!
997
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
998
    }
999

1000
    for (int32_t i = 0; i < topicSz; i++) {
370✔
1001
      SColumnInfoData *pColInfo = NULL;
193✔
1002
      int32_t          cols = 0;
193✔
1003

1004
      // consumer id
1005
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1006
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
193✔
1007
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
193✔
1008

1009
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1010
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1011
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
193!
1012

1013
      // consumer group
1014
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1015
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
193✔
1016

1017
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1018
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1019
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
193!
1020

1021
      // client id
1022
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1023
      STR_TO_VARSTR(clientId, pConsumer->clientId);
193✔
1024

1025
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1026
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1027
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
193!
1028

1029
      // user
1030
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1031
      STR_TO_VARSTR(user, pConsumer->user);
193✔
1032

1033
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1034
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1035
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
193!
1036

1037
      // fqdn
1038
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1039
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
193✔
1040

1041
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1042
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1043
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
193!
1044

1045
      // status
1046
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
193✔
1047
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
193!
1048
      MND_TMQ_NULL_CHECK(status);
193!
1049
      STR_TO_VARSTR(status, pStatusName);
193✔
1050

1051
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1052
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1053
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
193!
1054
      taosMemoryFreeClear(status);
193!
1055

1056
      // one subscribed topic
1057
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1058
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1059
      if (hasTopic) {
193!
1060
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
193✔
1061
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
193✔
1062
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
193✔
1063
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
193!
1064
      } else {
1065
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1066
      }
1067

1068
      // up time
1069
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1070
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1071
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
193!
1072

1073
      // subscribe time
1074
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1075
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1076
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
193!
1077

1078
      // rebalance time
1079
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1080
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1081
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
193!
1082

1083
      char         buf[TSDB_OFFSET_LEN] = {0};
193✔
1084
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
193✔
1085
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
193✔
1086

1087
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
193!
1088
      MND_TMQ_NULL_CHECK(parasStr);
193!
1089
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
193✔
1090
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
193✔
1091
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
193✔
1092

1093
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1094
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1095
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
193!
1096
      taosMemoryFreeClear(parasStr);
193!
1097

1098
      // rebalance time
1099
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
193✔
1100
      MND_TMQ_NULL_CHECK(pColInfo);
193!
1101
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
193!
1102
      numOfRows++;
193✔
1103
    }
1104

1105
    taosRUnLockLatch(&pConsumer->lock);
177✔
1106
    sdbRelease(pSdb, pConsumer);
177✔
1107

1108
    pBlock->info.rows = numOfRows;
175✔
1109
  }
1110

1111
  pShow->numOfRows += numOfRows;
8,743✔
1112
  return numOfRows;
8,743✔
1113

1114
END:
×
1115
  taosMemoryFreeClear(status);
×
1116
  taosMemoryFreeClear(parasStr);
×
1117
  return code;
×
1118
}
1119

1120
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1121
  if (pMnode == NULL || pIter == NULL) return;
×
1122
  SSdb *pSdb = pMnode->pSdb;
×
1123
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1124
}
1125

1126
const char *mndConsumerStatusName(int status) {
13,005✔
1127
  switch (status) {
13,005!
1128
    case MQ_CONSUMER_STATUS_READY:
4,988✔
1129
      return "ready";
4,988✔
1130
//    case MQ_CONSUMER_STATUS_LOST:
1131
//      return "lost";
1132
    case MQ_CONSUMER_STATUS_REBALANCE:
8,017✔
1133
      return "rebalancing";
8,017✔
1134
    default:
×
1135
      return "unknown";
×
1136
  }
1137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc