• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

taosdata / TDengine / #4828

29 Oct 2025 11:10AM UTC coverage: 61.071% (-0.3%) from 61.383%
#4828

push

travis-ci

web-flow
Merge pull request #33419 from taosdata/3.0

3.0

155924 of 324872 branches covered (48.0%)

Branch coverage included in aggregate %.

207404 of 270051 relevant lines covered (76.8%)

243478715.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.21
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
1,752,863✔
45
  SSdbTable table = {
1,752,863✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
1,752,863!
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
1,752,863✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
1,752,863✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
1,752,863✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
1,752,863✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
1,752,863✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
1,752,863✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
1,752,863✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
1,752,567✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
258,160✔
72
  if (pMnode == NULL || info == NULL) {
258,160!
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
258,160✔
76
  void   *msg  = rpcMallocCont(sizeof(int64_t));
258,160✔
77
  MND_TMQ_NULL_CHECK(msg);
258,160!
78

79
  *(int64_t*)msg = consumerId;
258,160✔
80
  SRpcMsg rpcMsg = {
258,160✔
81
      .msgType = msgType,
82
      .pCont = msg,
83
      .contLen = sizeof(int64_t),
84
      .info = *info,
85
  };
86

87
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
258,160!
88
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
258,160!
89
  return code;
258,160✔
90

91
END:
×
92
  taosMemoryFree(msg);
×
93
  return code;
×
94
}
95

96
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
688,811✔
97
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
688,811!
98
    return TSDB_CODE_INVALID_PARA;
×
99
  }
100
  SMqTopicObj *pTopic = NULL;
688,811✔
101
  int32_t      code = 0;
688,811✔
102

103
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
688,811✔
104
  for (int32_t i = 0; i < numOfTopics; i++) {
1,095,936✔
105
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
413,012✔
106
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
413,012✔
107
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
410,540✔
108
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
409,615!
109

110
    if (subscribe->enableReplay) {
409,615✔
111
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
5,810✔
112
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
1,660✔
113
        goto END;
1,660✔
114
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
4,150!
115
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
3,320✔
116
        if (pDb == NULL) {
3,320!
117
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
          goto END;
×
119
        }
120
        if (pDb->cfg.numOfVgroups != 1) {
3,320✔
121
          mndReleaseDb(pMnode, pDb);
830✔
122
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
830✔
123
          goto END;
830✔
124
        }
125
        mndReleaseDb(pMnode, pDb);
2,490✔
126
      }
127
    }
128
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
407,125✔
129
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
407,125!
130
    mndTransSetDbName(pTrans, pTopic->db, key);
407,125✔
131
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
407,125!
132
    mndReleaseTopic(pMnode, pTopic);
407,125✔
133
  }
134
  return 0;
682,924✔
135

136
END:
5,887✔
137
  mndReleaseTopic(pMnode, pTopic);
5,887✔
138
  return code;
5,887✔
139
}
140

141
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
258,160✔
142
  if (pMsg == NULL || pMsg->pCont == NULL) {
258,160!
143
    return TSDB_CODE_INVALID_PARA;
×
144
  }
145
  int32_t              code = 0;
258,160✔
146
  SMnode              *pMnode = pMsg->info.node;
258,160✔
147
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
258,160✔
148
  SMqConsumerObj      *pConsumerNew = NULL;
258,160✔
149
  STrans              *pTrans = NULL;
258,160✔
150
  SMqConsumerObj      *pConsumer = NULL;
258,160✔
151

152
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
258,160✔
153
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
253,018!
154
        mndConsumerStatusName(pConsumer->status));
155

156
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
253,018!
157
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
253,018✔
158
  MND_TMQ_NULL_CHECK(pTrans);
253,018!
159
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
253,018!
160
  code = mndTransPrepare(pMnode, pTrans);
253,018✔
161

162
END:
258,160✔
163
  mndReleaseConsumer(pMnode, pConsumer);
258,160✔
164
  tDeleteSMqConsumerObj(pConsumerNew);
258,160✔
165
  mndTransDrop(pTrans);
258,160✔
166
  return code;
258,160✔
167
}
168

169
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
2,642,312✔
170
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
2,642,312!
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
173
  int32_t code = 0;
2,642,312✔
174
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
2,642,312✔
175
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
2,642,312!
176
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
5,040,513✔
177
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,398,201✔
178
    SMqTopicObj *pTopic = NULL;
2,398,201✔
179
    code = mndAcquireTopic(pMnode, topic, &pTopic);
2,398,201✔
180
    if (code != TSDB_CODE_SUCCESS) {
2,398,201!
181
      continue;
×
182
    }
183
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
2,398,201✔
184
    MND_TMQ_NULL_CHECK(data);
2,398,201!
185
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
2,398,201!
186
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
4,793,627!
187
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
2,395,426✔
188
      data->noPrivilege = 1;
2,775✔
189
    } else {
190
      data->noPrivilege = 0;
2,395,426✔
191
    }
192
    mndReleaseTopic(pMnode, pTopic);
2,398,201✔
193
  }
194
END:
2,642,312✔
195
  return code;
2,642,312✔
196
}
197

198
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
2,642,312✔
199
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
2,642,312!
200
    return;
×
201
  }
202
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
4,937,789✔
203
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
2,295,477✔
204
    if (data == NULL){
2,295,477!
205
      continue;
×
206
    }
207
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
2,295,477!
208

209
    SMqSubscribeObj *pSub = NULL;
2,295,477✔
210
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,295,477✔
211
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
2,295,477!
212
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,295,477✔
213
    if (code != 0) {
2,295,477!
214
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
215
      continue;
×
216
    }
217
    taosWLockLatch(&pSub->lock);
2,295,477✔
218
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,295,477✔
219
    if (pConsumerEp) {
2,295,477✔
220
      taosArrayDestroy(pConsumerEp->offsetRows);
2,265,526✔
221
      pConsumerEp->offsetRows = data->offsetRows;
2,265,526✔
222
      data->offsetRows = NULL;
2,265,526✔
223
    }
224
    taosWUnLockLatch(&pSub->lock);
2,295,477✔
225

226
    mndReleaseSubscribe(pMnode, pSub);
2,295,477✔
227
  }
228
}
229

230
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
2,642,312✔
231
  if (pMsg == NULL || rsp == NULL){
2,642,312!
232
    return TSDB_CODE_INVALID_PARA;
×
233
  }
234
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
2,642,312✔
235
  if (tlen <= 0){
2,642,312!
236
    return TSDB_CODE_TMQ_INVALID_MSG;
×
237
  }
238
  void   *buf = rpcMallocCont(tlen);
2,642,312✔
239
  if (buf == NULL) {
2,642,312!
240
    return terrno;
×
241
  }
242

243
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
2,642,312!
244
    rpcFreeCont(buf);
×
245
    return TSDB_CODE_TMQ_INVALID_MSG;
×
246
  }
247
  pMsg->info.rsp = buf;
2,642,312✔
248
  pMsg->info.rspLen = tlen;
2,642,312✔
249
  return 0;
2,642,312✔
250
}
251

252
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
2,668,546✔
253
  if (pMsg == NULL) {
2,668,546!
254
    return TSDB_CODE_INVALID_PARA;
×
255
  }
256
  PRINT_LOG_START
2,668,546!
257
  int32_t         code = 0;
2,668,546✔
258
  SMnode         *pMnode = pMsg->info.node;
2,668,546✔
259
  SMqHbReq        req = {0};
2,668,546✔
260
  SMqHbRsp        rsp = {0};
2,668,546✔
261
  SMqConsumerObj *pConsumer = NULL;
2,668,546✔
262
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
2,668,546!
263
  int64_t consumerId = req.consumerId;
2,668,546✔
264
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
2,668,546✔
265
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
2,642,312!
266
  atomic_store_32(&pConsumer->hbStatus, 0);
2,642,312✔
267
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
2,642,312!
268
  if (req.pollFlag == 1){
2,642,312✔
269
    atomic_store_32(&pConsumer->pollStatus, 0);
1,053,881✔
270
    pConsumer->pollTime = taosGetTimestampMs();
2,107,762✔
271
  }
272

273
  storeOffsetRows(pMnode, &req, pConsumer);
2,642,312✔
274
  rsp.debugFlag = tqClientDebugFlag;
2,642,312✔
275
  code = buildMqHbRsp(pMsg, &rsp);
2,642,312✔
276

277
END:
2,668,546✔
278
  tDestroySMqHbRsp(&rsp);
2,668,546✔
279
  mndReleaseConsumer(pMnode, pConsumer);
2,668,546✔
280
  tDestroySMqHbReq(&req);
2,668,546✔
281
  PRINT_LOG_END(code)
2,668,546!
282
  return code;
2,668,546✔
283
}
284

285
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
2,970,555✔
286
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
2,970,555!
287
    return TSDB_CODE_INVALID_PARA;
×
288
  }
289
  taosRLockLatch(&pConsumer->lock);
2,970,555✔
290

291
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
2,970,555✔
292

293
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
2,970,555✔
294
  if (rsp->topics == NULL) {
2,970,555!
295
    taosRUnLockLatch(&pConsumer->lock);
×
296
    return terrno;
×
297
  }
298

299
  // handle all topics subscribed by this consumer
300
  for (int32_t i = 0; i < numOfTopics; i++) {
5,678,260✔
301
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,707,705✔
302
    SMqSubscribeObj *pSub = NULL;
2,707,705✔
303
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,707,705✔
304
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
2,707,705✔
305
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,707,705✔
306
    if (code != 0) {
2,707,705!
307
      continue;
×
308
    }
309
    taosRLockLatch(&pSub->lock);
2,707,705✔
310

311
    SMqSubTopicEp topicEp = {0};
2,707,705✔
312
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
2,707,705!
313

314
    // 2.1 fetch topic schema
315
    SMqTopicObj *pTopic = NULL;
2,707,705✔
316
    code = mndAcquireTopic(pMnode, topic, &pTopic);
2,707,705✔
317
    if (code != TSDB_CODE_SUCCESS) {
2,707,705!
318
      taosRUnLockLatch(&pSub->lock);
×
319
      mndReleaseSubscribe(pMnode, pSub);
×
320
      continue;
×
321
    }
322
    taosRLockLatch(&pTopic->lock);
2,707,705✔
323
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
2,707,705!
324
    topicEp.schema.nCols = pTopic->schema.nCols;
2,707,705✔
325
    if (topicEp.schema.nCols) {
2,707,705✔
326
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
1,689,427!
327
      if (topicEp.schema.pSchema == NULL) {
1,689,427!
328
        taosRUnLockLatch(&pTopic->lock);
×
329
        taosRUnLockLatch(&pSub->lock);
×
330
        mndReleaseSubscribe(pMnode, pSub);
×
331
        mndReleaseTopic(pMnode, pTopic);
×
332
        return terrno;
×
333
      }
334
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
1,689,427!
335
    }
336
    taosRUnLockLatch(&pTopic->lock);
2,707,705✔
337
    mndReleaseTopic(pMnode, pTopic);
2,707,705✔
338

339
    // 2.2 iterate all vg assigned to the consumer of that topic
340
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,707,705✔
341
    if (pConsumerEp == NULL) {
2,707,705!
342
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
343
      taosRUnLockLatch(&pConsumer->lock);
×
344
      taosRUnLockLatch(&pSub->lock);
×
345
      mndReleaseSubscribe(pMnode, pSub);
×
346
      return TSDB_CODE_OUT_OF_MEMORY;
×
347
    }
348
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
2,707,705✔
349
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
2,707,705✔
350
    if (topicEp.vgs == NULL) {
2,707,705!
351
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
352
      taosRUnLockLatch(&pConsumer->lock);
×
353
      taosRUnLockLatch(&pSub->lock);
×
354
      mndReleaseSubscribe(pMnode, pSub);
×
355
      return terrno;
×
356
    }
357

358
    for (int32_t j = 0; j < vgNum; j++) {
6,073,736✔
359
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
3,366,031✔
360
      if (pVgEp == NULL) {
3,366,031!
361
        continue;
×
362
      }
363
      if (epoch == -1) {
3,366,031✔
364
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,300,509✔
365
        if (pVgroup) {
2,300,509✔
366
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
126,697✔
367
          mndReleaseVgroup(pMnode, pVgroup);
126,697✔
368
        }
369
      }
370
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
3,366,031✔
371
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
6,732,062!
372
        taosMemoryFreeClear(topicEp.schema.pSchema);
×
373
        taosArrayDestroy(topicEp.vgs);
×
374
        taosRUnLockLatch(&pConsumer->lock);
×
375
        taosRUnLockLatch(&pSub->lock);
×
376
        mndReleaseSubscribe(pMnode, pSub);
×
377
        return terrno;
×
378
      }
379
    }
380
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
5,415,410!
381
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
382
      taosArrayDestroy(topicEp.vgs);
×
383
      taosRUnLockLatch(&pConsumer->lock);
×
384
      taosRUnLockLatch(&pSub->lock);
×
385
      mndReleaseSubscribe(pMnode, pSub);
×
386
      return terrno;
×
387
    }
388
    taosRUnLockLatch(&pSub->lock);
2,707,705✔
389
    mndReleaseSubscribe(pMnode, pSub);
2,707,705✔
390
  }
391
  taosRUnLockLatch(&pConsumer->lock);
2,970,555✔
392
  return 0;
2,970,555✔
393
}
394

395
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
8,329,454✔
396
  if (pMsg == NULL || rsp == NULL) {
8,329,454!
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399
  int32_t code = 0;
8,329,454✔
400
  // encode rsp
401
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
8,328,576✔
402
  void   *buf = rpcMallocCont(tlen);
8,328,576✔
403
  if (buf == NULL) {
8,328,576!
404
    return terrno;
×
405
  }
406

407
  SMqRspHead *pHead = buf;
8,328,576✔
408

409
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
8,328,576✔
410
  pHead->epoch = serverEpoch;
8,327,698✔
411
  pHead->consumerId = consumerId;
8,328,576✔
412
  pHead->walsver = 0;
8,327,698✔
413
  pHead->walever = 0;
8,327,698✔
414

415
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
8,329,454✔
416
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
8,329,454!
417
    rpcFreeCont(buf);
×
418
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
419
  }
420

421
  // send rsp
422
  pMsg->info.rsp = buf;
8,329,454✔
423
  pMsg->info.rspLen = tlen;
8,329,454✔
424
  return code;
8,329,454✔
425
}
426

427
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
11,346,744✔
428
  if (pMsg == NULL) {
11,346,744!
429
    return TSDB_CODE_INVALID_PARA;
×
430
  }
431
  SMnode     *pMnode = pMsg->info.node;
11,346,744✔
432
  SMqAskEpReq req = {0};
11,346,744✔
433
  SMqAskEpRsp rsp = {0};
11,346,744✔
434
  int32_t     code = 0;
11,344,056✔
435
  SMqConsumerObj *pConsumer = NULL;
11,344,056✔
436
  PRINT_LOG_START
11,344,056!
437

438
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
11,346,717!
439
  int64_t consumerId = req.consumerId;
11,348,527✔
440
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
11,348,527✔
441
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
11,319,611!
442
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
443
           pConsumer->cgroup);
444
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
445
    goto END;
×
446
  }
447

448
  // 1. check consumer status
449
  int32_t status = atomic_load_32(&pConsumer->status);
11,319,611✔
450
  if (status != MQ_CONSUMER_STATUS_READY) {
11,319,611✔
451
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
2,990,157!
452
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
2,990,157✔
453
    goto END;
2,990,157✔
454
  }
455

456
  int32_t epoch = req.epoch;
8,329,454✔
457
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
8,329,454✔
458

459
  // 2. check epoch, only send ep info when epochs do not match
460
  if (epoch != serverEpoch) {
8,328,576✔
461
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
2,970,555!
462
          consumerId, epoch, serverEpoch);
463
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
2,970,555!
464
  }
465

466
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
8,328,576✔
467

468
END:
11,348,527✔
469
  tDeleteSMqAskEpRsp(&rsp);
470
  mndReleaseConsumer(pMnode, pConsumer);
11,347,649✔
471
  PRINT_LOG_END(code);
11,348,527!
472
  return code;
11,348,527✔
473
}
474

475
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
256,001✔
476
  if (pConsumer == NULL || pTrans == NULL) {
256,001!
477
    return TSDB_CODE_INVALID_PARA;
×
478
  }
479
  int32_t  code = 0;
256,001✔
480
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
256,001✔
481
  MND_TMQ_NULL_CHECK(pCommitRaw);
256,001!
482
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
256,001✔
483
  if (code != 0) {
256,001!
484
    sdbFreeRaw(pCommitRaw);
×
485
    goto END;
×
486
  }
487
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
256,001!
488
END:
256,001✔
489
  return code;
256,001✔
490
}
491

492
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
1,394,615✔
493
  if (pConsumer == NULL || pTrans == NULL) {
1,394,615!
494
    return TSDB_CODE_INVALID_PARA;
×
495
  }
496
  int32_t  code = 0;
1,394,615✔
497
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
1,394,615✔
498
  MND_TMQ_NULL_CHECK(pCommitRaw);
1,394,615!
499
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
1,394,615✔
500
  if (code != 0) {
1,394,615!
501
    sdbFreeRaw(pCommitRaw);
×
502
    goto END;
×
503
  }
504
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
1,394,615!
505
END:
1,394,615✔
506
  return code;
1,394,615✔
507
}
508

509
static void freeItem(void *param) {
870✔
510
  if (param == NULL) {
870!
511
    return;
×
512
  }
513
  void *pItem = *(void **)param;
870✔
514
  if (pItem != NULL) {
870!
515
    taosMemoryFree(pItem);
870!
516
  }
517
}
518

519
#define ADD_TOPIC_TO_ARRAY(element, array) \
520
char *newTopicCopy = taosStrdup(element); \
521
MND_TMQ_NULL_CHECK(newTopicCopy);\
522
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
523
  taosMemoryFree(newTopicCopy);\
524
  code = terrno;\
525
  goto END;\
526
}
527

528
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
301,100✔
529
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
301,100!
530
    return TSDB_CODE_INVALID_PARA;
×
531
  }
532
  int32_t code = 0;
301,100✔
533
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
301,100✔
534
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
301,100!
535
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
301,100✔
536
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
301,100!
537

538
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
301,100✔
539
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
301,100✔
540
  int32_t i = 0, j = 0;
301,100✔
541
  while (i < oldTopicNum || j < newTopicNum) {
609,108✔
542
    if (i >= oldTopicNum) {
308,008✔
543
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
4,997✔
544
      MND_TMQ_NULL_CHECK(tmp);
4,997!
545
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
9,994!
546
      j++;
4,997✔
547
      continue;
4,997✔
548
    } else if (j >= newTopicNum) {
303,011✔
549
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
289,615✔
550
      MND_TMQ_NULL_CHECK(tmp);
289,615!
551
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
579,230!
552
      i++;
289,615✔
553
      continue;
289,615✔
554
    } else {
555
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
13,396✔
556
      MND_TMQ_NULL_CHECK(oldTopic);
13,396!
557
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
13,396✔
558
      MND_TMQ_NULL_CHECK(newTopic);
13,396!
559
      int   comp = strcmp(oldTopic, newTopic);
13,396!
560
      if (comp == 0) {
13,396!
561
        i++;
13,396✔
562
        j++;
13,396✔
563
        continue;
13,396✔
564
      } else if (comp < 0) {
×
565
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
566
        i++;
×
567
        continue;
×
568
      } else {
×
569
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
570
        j++;
×
571
        continue;
×
572
      }
573
    }
574
  }
575
  // no topics need to be rebalanced
576
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
301,100✔
577
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
13,396✔
578
  }
579

580
END:
287,704✔
581
  return code;
301,100✔
582
}
583

584
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
689,716✔
585
  if (pTopicList == NULL || pMnode == NULL) {
689,716!
586
    return TSDB_CODE_INVALID_PARA;
×
587
  }
588
  taosArraySort(pTopicList, taosArrayCompareString);
689,716✔
589
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
689,716✔
590

591
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
689,716✔
592
  for (int i = 0; i < newTopicNum; i++) {
1,102,728✔
593
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
413,917✔
594
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
413,917✔
595
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
905✔
596
    }
597
  }
598
  return 0;
688,811✔
599
}
600

601
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
682,924✔
602
  if (pMnode == NULL || subscribe == NULL) {
682,924!
603
    return TSDB_CODE_INVALID_PARA;
×
604
  }
605
  int64_t         consumerId = subscribe->consumerId;
682,924✔
606
  char           *cgroup     = subscribe->cgroup;
682,924✔
607
  SMqConsumerObj *pConsumerNew     = NULL;
682,924✔
608
  SMqConsumerObj *pExistedConsumer = NULL;
682,924✔
609
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
682,924✔
610
  if (code != 0) {
682,924✔
611
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
381,824!
612
              ",cgroup:%s, numOfTopics:%d", consumerId,
613
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
614

615
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
381,824!
616
  } else {
617
    int32_t status = atomic_load_32(&pExistedConsumer->status);
301,100✔
618

619
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
301,100!
620
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
621
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
622
          (int32_t)taosArrayGetSize(subscribe->topicNames));
623

624
    if (status != MQ_CONSUMER_STATUS_READY) {
301,100!
625
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
626
      goto END;
×
627
    }
628
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
301,100!
629
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
301,100✔
630
  }
631
  mndReleaseConsumer(pMnode, pExistedConsumer);
669,528✔
632
  if (ppConsumer){
669,528!
633
    *ppConsumer = pConsumerNew;
669,528✔
634
  }
635
  return code;
669,528✔
636

637
END:
13,396✔
638
  mndReleaseConsumer(pMnode, pExistedConsumer);
13,396✔
639
  tDeleteSMqConsumerObj(pConsumerNew);
13,396✔
640
  return code;
13,396✔
641
}
642

643
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
908,982✔
644
  if (pMsg == NULL) {
908,982!
645
    return TSDB_CODE_INVALID_PARA;
×
646
  }
647
  SMnode *pMnode = pMsg->info.node;
908,982✔
648
  char   *msgStr = pMsg->pCont;
908,982✔
649
  int32_t code = 0;
908,982✔
650
  SMqConsumerObj *pConsumerNew = NULL;
908,982✔
651
  STrans         *pTrans = NULL;
908,982✔
652

653
  PRINT_LOG_START
908,982!
654
  SCMSubscribeReq subscribe = {0};
908,982✔
655
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
1,817,964!
656
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
908,982✔
657
  if(unSubscribe){
908,982✔
658
    SMqConsumerObj *pConsumerTmp = NULL;
501,973✔
659
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
501,973✔
660
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
488,731✔
661
      mndReleaseConsumer(pMnode, pConsumerTmp);
206,024✔
662
      goto END;
206,024✔
663
    }
664
    mndReleaseConsumer(pMnode, pConsumerTmp);
282,707✔
665
  }
666
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
689,716✔
667
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
688,811✔
668
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
669
                          pMsg, "subscribe");
670
  MND_TMQ_NULL_CHECK(pTrans);
688,811!
671

672
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
688,811✔
673
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
682,924✔
674
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
669,528!
675
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
669,528!
676
  code = TSDB_CODE_ACTION_IN_PROGRESS;
669,528✔
677

678
END:
908,982✔
679
  mndTransDrop(pTrans);
908,982✔
680
  tDeleteSMqConsumerObj(pConsumerNew);
908,982✔
681
  taosArrayDestroyP(subscribe.topicNames, NULL);
908,982✔
682
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
908,982✔
683
  if (code != TSDB_CODE_ACTION_IN_PROGRESS){
908,982✔
684
    PRINT_LOG_END(code);
239,454!
685
  }
686
  return code;
908,982✔
687
}
688

689
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
1,972,536✔
690
  if (pConsumer == NULL) {
1,972,536!
691
    return NULL;
×
692
  }
693
  int32_t code = 0;
1,972,536✔
694
  int32_t lino = 0;
1,972,536✔
695
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,972,536✔
696

697
  void   *buf = NULL;
1,972,536✔
698
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
1,972,536✔
699
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
1,972,536✔
700

701
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
1,972,536✔
702
  if (pRaw == NULL) goto CM_ENCODE_OVER;
1,972,536!
703

704
  buf = taosMemoryMalloc(tlen);
1,972,536!
705
  if (buf == NULL) goto CM_ENCODE_OVER;
1,972,536!
706

707
  void *abuf = buf;
1,972,536✔
708
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
1,972,536!
709
    goto CM_ENCODE_OVER;
×
710
  }
711

712
  int32_t dataPos = 0;
1,972,536✔
713
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
1,972,536!
714
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
1,972,536!
715
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
1,972,536!
716
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
1,972,536!
717

718
  terrno = TSDB_CODE_SUCCESS;
1,972,536✔
719

720
CM_ENCODE_OVER:
1,972,536✔
721
  taosMemoryFreeClear(buf);
1,972,536!
722
  if (terrno != 0) {
1,972,536!
723
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
724
    sdbFreeRaw(pRaw);
×
725
    return NULL;
×
726
  }
727

728
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
1,972,536!
729
  return pRaw;
1,972,536✔
730
}
731

732
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
1,688,353✔
733
  if (pRaw == NULL) {
1,688,353!
734
    return NULL;
×
735
  }
736
  int32_t         code = 0;
1,688,353✔
737
  int32_t         lino = 0;
1,688,353✔
738
  SSdbRow        *pRow = NULL;
1,688,353✔
739
  SMqConsumerObj *pConsumer = NULL;
1,688,353✔
740
  void           *buf = NULL;
1,688,353✔
741

742
  terrno = 0;
1,688,353✔
743
  int8_t sver = 0;
1,688,353✔
744
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
1,688,353!
745
    goto CM_DECODE_OVER;
×
746
  }
747

748
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
1,688,353!
749
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
750
    goto CM_DECODE_OVER;
×
751
  }
752

753
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
1,688,353✔
754
  if (pRow == NULL) {
1,688,353!
755
    goto CM_DECODE_OVER;
×
756
  }
757

758
  pConsumer = sdbGetRowObj(pRow);
1,688,353✔
759
  if (pConsumer == NULL) {
1,688,353!
760
    goto CM_DECODE_OVER;
×
761
  }
762

763
  int32_t dataPos = 0;
1,688,353✔
764
  int32_t len;
1,687,863✔
765
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
1,688,353!
766
  buf = taosMemoryMalloc(len);
1,688,353!
767
  if (buf == NULL) {
1,688,353!
768
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
769
    goto CM_DECODE_OVER;
×
770
  }
771

772
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
1,688,353!
773
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
1,688,353!
774

775
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
1,688,353!
776
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
777
    goto CM_DECODE_OVER;
×
778
  }
779

780
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
1,688,353✔
781

782
CM_DECODE_OVER:
1,688,353✔
783
  taosMemoryFreeClear(buf);
1,688,353!
784
  if (terrno != TSDB_CODE_SUCCESS) {
1,688,353!
785
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
786
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
787
    taosMemoryFreeClear(pRow);
×
788
  }
789

790
  return pRow;
1,688,353✔
791
}
792

793
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
404,911✔
794
  if (pConsumer == NULL) {
404,911!
795
    return TSDB_CODE_INVALID_PARA;
×
796
  }
797
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
404,911!
798
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
799
  pConsumer->subscribeTime = pConsumer->createTime;
404,911✔
800
  return 0;
404,911✔
801
}
802

803
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
1,688,353✔
804
  if (pConsumer == NULL) {
1,688,353!
805
    return TSDB_CODE_INVALID_PARA;
×
806
  }
807
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
1,688,353!
808
        mndConsumerStatusName(pConsumer->status));
809
  tClearSMqConsumerObj(pConsumer);
1,688,353✔
810
  return 0;
1,688,353✔
811
}
812

813
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
814
//  int32_t status = pConsumer->status;
815
//
816
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
817
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
818
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
819
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
820
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
821
//    }
822
//  }
823
//}
824

825
// remove from topic list
826
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
1,002,617✔
827
  if (topicList == NULL || pTopic == NULL) {
1,002,617!
828
    return;
×
829
  }
830
  int32_t size = taosArrayGetSize(topicList);
1,002,617✔
831
  for (int32_t i = 0; i < size; i++) {
1,012,195✔
832
    char *p = taosArrayGetP(topicList, i);
1,001,749✔
833
    if (strcmp(pTopic, p) == 0) {
1,001,749!
834
      taosArrayRemove(topicList, i);
992,171✔
835
      taosMemoryFree(p);
992,171!
836

837
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
992,171!
838
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
839
      break;
992,171✔
840
    }
841
  }
842
}
843

844
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
398,629✔
845
  if (pConsumer == NULL || pTopic == NULL) {
398,629!
846
    return false;
×
847
  }
848
  bool    existing = false;
398,629✔
849
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
398,629✔
850
  for (int32_t i = 0; i < size; i++) {
406,352✔
851
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
7,723✔
852
    if (topic && strcmp(topic, pTopic) == 0) {
7,723!
853
      existing = true;
×
854
      break;
×
855
    }
856
  }
857

858
  return existing;
398,629✔
859
}
860

861
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
1,024,191✔
862
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
1,024,191!
863
    return TSDB_CODE_INVALID_PARA;
×
864
  }
865
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
1,024,191!
866
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
867

868
  taosWLockLatch(&pOldConsumer->lock);
1,024,191✔
869

870
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
1,024,191✔
871
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
290,954✔
872
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
290,954✔
873
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
290,954✔
874

875
    pOldConsumer->subscribeTime = taosGetTimestampMs();
290,954✔
876
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
290,954✔
877
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
290,954!
878
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
733,237✔
879
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
32,614✔
880
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
32,614✔
881
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
32,614!
882
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
700,623✔
883
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
398,629✔
884
    if (tmp == NULL){
398,629!
885
      return TSDB_CODE_TMQ_INVALID_MSG;
×
886
    }
887
    char *pNewTopic = taosStrdup(tmp);
398,629!
888
    if (pNewTopic == NULL) {
398,629!
889
      return terrno;
×
890
    }
891
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
398,629✔
892
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
398,629✔
893
    if (existing) {
398,629!
894
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
895
      taosMemoryFree(pNewTopic);
×
896
    } else {
897
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
797,258!
898
        taosMemoryFree(pNewTopic);
×
899
        return TSDB_CODE_TMQ_INVALID_MSG;
×
900
      }
901
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
398,629✔
902
    }
903

904
    int32_t status = pOldConsumer->status;
398,629✔
905
//    updateConsumerStatus(pOldConsumer);
906
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
398,629!
907
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
391,721✔
908
    }
909

910
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
398,629✔
911
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
398,629✔
912

913
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
398,629!
914
          ", current topics:%d, newTopics:%d, removeTopics:%d",
915
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
916
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
917
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
918
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
919

920
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
301,994!
921
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
301,994✔
922
    if (topic == NULL){
301,994!
923
      return TSDB_CODE_TMQ_INVALID_MSG;
×
924
    }
925
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
301,994✔
926
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
301,994✔
927

928
    int32_t status = pOldConsumer->status;
301,994✔
929
//    updateConsumerStatus(pOldConsumer);
930
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
301,994!
931
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
295,086✔
932
    }
933
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
301,994✔
934
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
301,994✔
935

936
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
301,994!
937
          ", current topics:%d, newTopics:%d, removeTopics:%d",
938
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
939
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
940
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
941
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
942
  }
943

944
  taosWUnLockLatch(&pOldConsumer->lock);
1,024,191✔
945
  return 0;
1,024,191✔
946
}
947

948
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
15,460,130✔
949
  if (pMnode == NULL || pConsumer == NULL) {
15,460,130!
950
    return TSDB_CODE_INVALID_PARA;
×
951
  }
952
  SSdb           *pSdb = pMnode->pSdb;
15,460,130✔
953
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
15,460,130✔
954
  if (*pConsumer == NULL) {
15,460,130✔
955
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
455,358✔
956
  }
957
  return 0;
15,004,772✔
958
}
959

960
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
66,989,149✔
961
  if (pMnode == NULL || pConsumer == NULL) {
66,989,149!
962
    return;
48,332,838✔
963
  }
964
  SSdb *pSdb = pMnode->pSdb;
18,656,311✔
965
  sdbRelease(pSdb, pConsumer);
18,656,311✔
966
}
967

968
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
54,685✔
969
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
54,685!
970
    return TSDB_CODE_INVALID_PARA;
×
971
  }
972
  SMnode         *pMnode = pReq->info.node;
54,685✔
973
  SSdb           *pSdb = pMnode->pSdb;
54,685✔
974
  int32_t         numOfRows = 0;
54,685✔
975
  SMqConsumerObj *pConsumer = NULL;
54,685✔
976
  int32_t         code = 0;
54,685✔
977
  char           *parasStr = NULL;
54,685✔
978
  char           *status = NULL;
54,685✔
979

980
  while (numOfRows < rowsCapacity) {
217,182!
981
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
217,182✔
982
    if (pShow->pIter == NULL) {
217,182✔
983
      break;
54,685✔
984
    }
985

986
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
162,497✔
987
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
2,150!
988
      sdbRelease(pSdb, pConsumer);
2,150✔
989
      continue;
2,150✔
990
    }
991

992
    taosRLockLatch(&pConsumer->lock);
160,347✔
993
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
160,347!
994

995
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
160,347✔
996
    bool    hasTopic = true;
160,347✔
997
    if (topicSz == 0) {
160,347!
998
      hasTopic = false;
×
999
      topicSz = 1;
×
1000
    }
1001

1002
    if (numOfRows + topicSz > rowsCapacity) {
160,347!
1003
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
1004
    }
1005

1006
    for (int32_t i = 0; i < topicSz; i++) {
320,694✔
1007
      SColumnInfoData *pColInfo = NULL;
160,347✔
1008
      int32_t          cols = 0;
160,347✔
1009

1010
      // consumer id
1011
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1012
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
160,347✔
1013
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
160,347!
1014

1015
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1016
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1017
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
160,347!
1018

1019
      // consumer group
1020
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1021
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
160,347!
1022

1023
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1024
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1025
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
160,347!
1026

1027
      // client id
1028
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1029
      STR_TO_VARSTR(clientId, pConsumer->clientId);
160,347!
1030

1031
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1032
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1033
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
160,347!
1034

1035
      // user
1036
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1037
      STR_TO_VARSTR(user, pConsumer->user);
160,347!
1038

1039
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1040
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1041
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
160,347!
1042

1043
      // fqdn
1044
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1045
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
160,347!
1046

1047
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1048
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1049
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
160,347!
1050

1051
      // status
1052
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
160,347✔
1053
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
160,347!
1054
      MND_TMQ_NULL_CHECK(status);
160,347!
1055
      STR_TO_VARSTR(status, pStatusName);
160,347!
1056

1057
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1058
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1059
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
160,347!
1060
      taosMemoryFreeClear(status);
160,347!
1061

1062
      // one subscribed topic
1063
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1064
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1065
      if (hasTopic) {
160,347!
1066
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
160,347✔
1067
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
160,347✔
1068
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
160,347!
1069
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
160,347!
1070
      } else {
1071
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1072
      }
1073

1074
      // up time
1075
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1076
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1077
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
160,347!
1078

1079
      // subscribe time
1080
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1081
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1082
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
160,347!
1083

1084
      // rebalance time
1085
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1086
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1087
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
160,347!
1088

1089
      char         buf[TSDB_OFFSET_LEN] = {0};
160,347✔
1090
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
160,347✔
1091
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
160,347✔
1092

1093
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
160,347!
1094
      MND_TMQ_NULL_CHECK(parasStr);
160,347!
1095
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
320,694✔
1096
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
320,694✔
1097
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
160,347!
1098

1099
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1100
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1101
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
160,347!
1102
      taosMemoryFreeClear(parasStr);
160,347!
1103

1104
      // rebalance time
1105
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
160,347✔
1106
      MND_TMQ_NULL_CHECK(pColInfo);
160,347!
1107
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
160,347!
1108
      numOfRows++;
160,347✔
1109
    }
1110

1111
    taosRUnLockLatch(&pConsumer->lock);
160,347✔
1112
    sdbRelease(pSdb, pConsumer);
160,347✔
1113

1114
    pBlock->info.rows = numOfRows;
160,347✔
1115
  }
1116

1117
  pShow->numOfRows += numOfRows;
54,685✔
1118
  return numOfRows;
54,685✔
1119

1120
END:
×
1121
  taosMemoryFreeClear(status);
×
1122
  taosMemoryFreeClear(parasStr);
×
1123
  return code;
×
1124
}
1125

1126
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1127
  if (pMnode == NULL || pIter == NULL) return;
×
1128
  SSdb *pSdb = pMnode->pSdb;
×
1129
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1130
}
1131

1132
const char *mndConsumerStatusName(int status) {
10,861,117✔
1133
  switch (status) {
10,861,117!
1134
    case MQ_CONSUMER_STATUS_READY:
4,689,668✔
1135
      return "ready";
4,689,668✔
1136
//    case MQ_CONSUMER_STATUS_LOST:
1137
//      return "lost";
1138
    case MQ_CONSUMER_STATUS_REBALANCE:
6,171,449✔
1139
      return "rebalancing";
6,171,449✔
1140
    default:
×
1141
      return "unknown";
×
1142
  }
1143
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc