• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4851

14 Nov 2025 08:06AM UTC coverage: 63.754% (+0.03%) from 63.728%
#4851

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

354 of 675 new or added lines in 18 files covered. (52.44%)

3145 existing lines in 113 files now uncovered.

149128 of 233910 relevant lines covered (63.75%)

117183401.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
490,476✔
45
  SSdbTable table = {
490,476✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
490,476✔
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
490,476✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
490,476✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
490,476✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
490,476✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
490,476✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
490,476✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
490,476✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
489,662✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
35,777✔
72
  if (pMnode == NULL || info == NULL) {
35,777✔
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
35,777✔
76
  void   *msg  = rpcMallocCont(sizeof(int64_t));
35,777✔
77
  MND_TMQ_NULL_CHECK(msg);
35,777✔
78

79
  *(int64_t*)msg = consumerId;
35,777✔
80
  SRpcMsg rpcMsg = {
35,777✔
81
      .msgType = msgType,
82
      .pCont = msg,
83
      .contLen = sizeof(int64_t),
84
      .info = *info,
85
  };
86

87
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
35,777✔
88
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
35,777✔
89

90
END:
35,777✔
91
  return code;
35,777✔
92
}
93

94
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
93,035✔
95
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
93,035✔
UNCOV
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqTopicObj *pTopic = NULL;
93,035✔
99
  int32_t      code = 0;
93,035✔
100

101
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
93,035✔
102
  for (int32_t i = 0; i < numOfTopics; i++) {
147,146✔
103
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
54,432✔
104
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
54,432✔
105
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
54,360✔
106
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
54,228✔
107

108
    if (subscribe->enableReplay) {
54,228✔
109
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
273✔
110
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
78✔
111
        goto END;
78✔
112
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
195✔
113
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
156✔
114
        if (pDb == NULL) {
156✔
UNCOV
115
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
116
          goto END;
×
117
        }
118
        if (pDb->cfg.numOfVgroups != 1) {
156✔
119
          mndReleaseDb(pMnode, pDb);
39✔
120
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
39✔
121
          goto END;
39✔
122
        }
123
        mndReleaseDb(pMnode, pDb);
117✔
124
      }
125
    }
126
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
54,111✔
127
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
54,111✔
128
    mndTransSetDbName(pTrans, pTopic->db, key);
54,111✔
129
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
54,111✔
130
    mndReleaseTopic(pMnode, pTopic);
54,111✔
131
  }
132
  return 0;
92,714✔
133

134
END:
321✔
135
  mndReleaseTopic(pMnode, pTopic);
321✔
136
  return code;
321✔
137
}
138

139
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
35,777✔
140
  if (pMsg == NULL || pMsg->pCont == NULL) {
35,777✔
UNCOV
141
    return TSDB_CODE_INVALID_PARA;
×
142
  }
143
  int32_t              code = 0;
35,777✔
144
  SMnode              *pMnode = pMsg->info.node;
35,777✔
145
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
35,777✔
146
  SMqConsumerObj      *pConsumerNew = NULL;
35,777✔
147
  STrans              *pTrans = NULL;
35,777✔
148
  SMqConsumerObj      *pConsumer = NULL;
35,777✔
149

150
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
35,777✔
151
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
35,501✔
152
        mndConsumerStatusName(pConsumer->status));
153

154
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
35,501✔
155
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
35,501✔
156
  MND_TMQ_NULL_CHECK(pTrans);
35,501✔
157
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
35,501✔
158
  code = mndTransPrepare(pMnode, pTrans);
35,501✔
159

160
END:
35,777✔
161
  mndReleaseConsumer(pMnode, pConsumer);
35,777✔
162
  tDeleteSMqConsumerObj(pConsumerNew);
35,777✔
163
  mndTransDrop(pTrans);
35,777✔
164
  return code;
35,777✔
165
}
166

167
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
297,781✔
168
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
297,781✔
UNCOV
169
    return TSDB_CODE_INVALID_PARA;
×
170
  }
171
  int32_t code = 0;
297,781✔
172
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
297,781✔
173
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
297,781✔
174
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
560,637✔
175
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
262,856✔
176
    SMqTopicObj *pTopic = NULL;
262,856✔
177
    code = mndAcquireTopic(pMnode, topic, &pTopic);
262,856✔
178
    if (code != TSDB_CODE_SUCCESS) {
262,856✔
UNCOV
179
      continue;
×
180
    }
181
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
262,856✔
182
    MND_TMQ_NULL_CHECK(data);
262,856✔
183
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
262,856✔
184
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
525,316✔
185
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
262,460✔
186
      data->noPrivilege = 1;
396✔
187
    } else {
188
      data->noPrivilege = 0;
262,460✔
189
    }
190
    mndReleaseTopic(pMnode, pTopic);
262,856✔
191
  }
192
END:
297,781✔
193
  return code;
297,781✔
194
}
195

196
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
297,781✔
197
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
297,781✔
UNCOV
198
    return;
×
199
  }
200
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
551,596✔
201
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
253,815✔
202
    if (data == NULL){
253,815✔
UNCOV
203
      continue;
×
204
    }
205
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
253,815✔
206

207
    SMqSubscribeObj *pSub = NULL;
253,815✔
208
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
253,815✔
209
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
253,815✔
210
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
253,815✔
211
    if (code != 0) {
253,815✔
UNCOV
212
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
UNCOV
213
      continue;
×
214
    }
215
    taosWLockLatch(&pSub->lock);
253,815✔
216
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
253,815✔
217
    if (pConsumerEp) {
253,815✔
218
      taosArrayDestroy(pConsumerEp->offsetRows);
246,597✔
219
      pConsumerEp->offsetRows = data->offsetRows;
246,597✔
220
      data->offsetRows = NULL;
246,597✔
221
    }
222
    taosWUnLockLatch(&pSub->lock);
253,815✔
223

224
    mndReleaseSubscribe(pMnode, pSub);
253,815✔
225
  }
226
}
227

228
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
297,781✔
229
  if (pMsg == NULL || rsp == NULL){
297,781✔
UNCOV
230
    return TSDB_CODE_INVALID_PARA;
×
231
  }
232
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
297,781✔
233
  if (tlen <= 0){
297,781✔
UNCOV
234
    return TSDB_CODE_TMQ_INVALID_MSG;
×
235
  }
236
  void   *buf = rpcMallocCont(tlen);
297,781✔
237
  if (buf == NULL) {
297,781✔
UNCOV
238
    return terrno;
×
239
  }
240

241
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
297,781✔
UNCOV
242
    rpcFreeCont(buf);
×
UNCOV
243
    return TSDB_CODE_TMQ_INVALID_MSG;
×
244
  }
245
  pMsg->info.rsp = buf;
297,781✔
246
  pMsg->info.rspLen = tlen;
297,781✔
247
  return 0;
297,781✔
248
}
249

250
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
301,159✔
251
  if (pMsg == NULL) {
301,159✔
UNCOV
252
    return TSDB_CODE_INVALID_PARA;
×
253
  }
254
  PRINT_LOG_START
301,159✔
255
  int32_t         code = 0;
301,159✔
256
  SMnode         *pMnode = pMsg->info.node;
301,159✔
257
  SMqHbReq        req = {0};
301,159✔
258
  SMqHbRsp        rsp = {0};
301,159✔
259
  SMqConsumerObj *pConsumer = NULL;
301,159✔
260
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
301,159✔
261
  int64_t consumerId = req.consumerId;
301,159✔
262
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
301,159✔
263
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
297,781✔
264
  atomic_store_32(&pConsumer->hbStatus, 0);
297,781✔
265
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
297,781✔
266
  if (req.pollFlag == 1){
297,781✔
267
    atomic_store_32(&pConsumer->pollStatus, 0);
144,705✔
268
    pConsumer->pollTime = taosGetTimestampMs();
289,410✔
269
  }
270

271
  storeOffsetRows(pMnode, &req, pConsumer);
297,781✔
272
  rsp.debugFlag = tqClientDebugFlag;
297,781✔
273
  code = buildMqHbRsp(pMsg, &rsp);
297,781✔
274

275
END:
301,159✔
276
  tDestroySMqHbRsp(&rsp);
301,159✔
277
  mndReleaseConsumer(pMnode, pConsumer);
301,159✔
278
  tDestroySMqHbReq(&req);
301,159✔
279
  PRINT_LOG_END(code)
301,159✔
280
  return code;
301,159✔
281
}
282

283
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
164,909✔
284
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
164,909✔
UNCOV
285
    return TSDB_CODE_INVALID_PARA;
×
286
  }
287
  taosRLockLatch(&pConsumer->lock);
164,909✔
288

289
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
164,909✔
290

291
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
164,909✔
292
  if (rsp->topics == NULL) {
164,909✔
UNCOV
293
    taosRUnLockLatch(&pConsumer->lock);
×
UNCOV
294
    return terrno;
×
295
  }
296

297
  // handle all topics subscribed by this consumer
298
  for (int32_t i = 0; i < numOfTopics; i++) {
291,007✔
299
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
126,098✔
300
    SMqSubscribeObj *pSub = NULL;
126,098✔
301
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
126,098✔
302
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
126,098✔
303
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
126,098✔
304
    if (code != 0) {
126,098✔
UNCOV
305
      continue;
×
306
    }
307
    taosRLockLatch(&pSub->lock);
126,098✔
308

309
    SMqSubTopicEp topicEp = {0};
126,098✔
310
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
126,098✔
311

312
    // 2.1 fetch topic schema
313
    SMqTopicObj *pTopic = NULL;
126,098✔
314
    code = mndAcquireTopic(pMnode, topic, &pTopic);
126,098✔
315
    if (code != TSDB_CODE_SUCCESS) {
126,098✔
316
      taosRUnLockLatch(&pSub->lock);
×
UNCOV
317
      mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
318
      continue;
×
319
    }
320
    taosRLockLatch(&pTopic->lock);
126,098✔
321
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
126,098✔
322
    topicEp.schema.nCols = pTopic->schema.nCols;
126,098✔
323
    if (topicEp.schema.nCols) {
126,098✔
324
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
93,947✔
325
      if (topicEp.schema.pSchema == NULL) {
93,947✔
326
        taosRUnLockLatch(&pTopic->lock);
×
327
        taosRUnLockLatch(&pSub->lock);
×
328
        mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
329
        mndReleaseTopic(pMnode, pTopic);
×
UNCOV
330
        return terrno;
×
331
      }
332
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
93,947✔
333
    }
334
    taosRUnLockLatch(&pTopic->lock);
126,098✔
335
    mndReleaseTopic(pMnode, pTopic);
126,098✔
336

337
    // 2.2 iterate all vg assigned to the consumer of that topic
338
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
126,098✔
339
    if (pConsumerEp == NULL) {
126,098✔
340
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
341
      taosRUnLockLatch(&pConsumer->lock);
×
342
      taosRUnLockLatch(&pSub->lock);
×
UNCOV
343
      mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
344
      return TSDB_CODE_OUT_OF_MEMORY;
×
345
    }
346
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
126,098✔
347
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
126,098✔
348
    if (topicEp.vgs == NULL) {
126,098✔
349
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
350
      taosRUnLockLatch(&pConsumer->lock);
×
351
      taosRUnLockLatch(&pSub->lock);
×
UNCOV
352
      mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
353
      return terrno;
×
354
    }
355

356
    for (int32_t j = 0; j < vgNum; j++) {
375,639✔
357
      SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
249,541✔
358
      if (pVgEp == NULL) {
249,541✔
UNCOV
359
        continue;
×
360
      }
361
      if (epoch == -1) {
249,541✔
362
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
70,830✔
363
        if (pVgroup) {
70,830✔
364
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,345✔
365
          mndReleaseVgroup(pMnode, pVgroup);
3,345✔
366
        }
367
      }
368
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
249,541✔
369
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
499,082✔
370
        taosMemoryFreeClear(topicEp.schema.pSchema);
×
371
        taosArrayDestroy(topicEp.vgs);
×
372
        taosRUnLockLatch(&pConsumer->lock);
×
373
        taosRUnLockLatch(&pSub->lock);
×
UNCOV
374
        mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
375
        return terrno;
×
376
      }
377
    }
378
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
252,196✔
379
      taosMemoryFreeClear(topicEp.schema.pSchema);
×
380
      taosArrayDestroy(topicEp.vgs);
×
381
      taosRUnLockLatch(&pConsumer->lock);
×
382
      taosRUnLockLatch(&pSub->lock);
×
UNCOV
383
      mndReleaseSubscribe(pMnode, pSub);
×
UNCOV
384
      return terrno;
×
385
    }
386
    taosRUnLockLatch(&pSub->lock);
126,098✔
387
    mndReleaseSubscribe(pMnode, pSub);
126,098✔
388
  }
389
  taosRUnLockLatch(&pConsumer->lock);
164,909✔
390
  return 0;
164,909✔
391
}
392

393
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
699,922✔
394
  if (pMsg == NULL || rsp == NULL) {
699,922✔
UNCOV
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397
  int32_t code = 0;
700,024✔
398
  // encode rsp
399
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
699,831✔
400
  void   *buf = rpcMallocCont(tlen);
699,831✔
401
  if (buf == NULL) {
699,831✔
UNCOV
402
    return terrno;
×
403
  }
404

405
  SMqRspHead *pHead = buf;
699,831✔
406

407
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
699,831✔
408
  pHead->epoch = serverEpoch;
699,831✔
409
  pHead->consumerId = consumerId;
699,831✔
410
  pHead->walsver = 0;
699,740✔
411
  pHead->walever = 0;
699,740✔
412

413
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
699,922✔
414
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
699,630✔
UNCOV
415
    rpcFreeCont(buf);
×
UNCOV
416
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
417
  }
418

419
  // send rsp
420
  pMsg->info.rsp = buf;
699,630✔
421
  pMsg->info.rspLen = tlen;
699,740✔
422
  return code;
699,922✔
423
}
424

425
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
1,077,764✔
426
  if (pMsg == NULL) {
1,077,764✔
UNCOV
427
    return TSDB_CODE_INVALID_PARA;
×
428
  }
429
  SMnode     *pMnode = pMsg->info.node;
1,077,764✔
430
  SMqAskEpReq req = {0};
1,077,764✔
431
  SMqAskEpRsp rsp = {0};
1,077,764✔
432
  int32_t     code = 0;
1,077,673✔
433
  SMqConsumerObj *pConsumer = NULL;
1,077,673✔
434
  PRINT_LOG_START
1,077,582✔
435

436
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
1,077,764✔
437
  int64_t consumerId = req.consumerId;
1,077,764✔
438
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
1,077,764✔
439
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
1,074,405✔
440
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
441
           pConsumer->cgroup);
UNCOV
442
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
UNCOV
443
    goto END;
×
444
  }
445

446
  // 1. check consumer status
447
  int32_t status = atomic_load_32(&pConsumer->status);
1,074,121✔
448
  if (status != MQ_CONSUMER_STATUS_READY) {
1,074,314✔
449
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
374,381✔
450
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
374,381✔
451
    goto END;
374,381✔
452
  }
453

454
  int32_t epoch = req.epoch;
699,933✔
455
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
699,933✔
456

457
  // 2. check epoch, only send ep info when epochs do not match
458
  if (epoch != serverEpoch) {
699,831✔
459
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
164,909✔
460
          consumerId, epoch, serverEpoch);
461
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
164,909✔
462
  }
463

464
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
699,831✔
465

466
END:
1,077,480✔
467
  tDeleteSMqAskEpRsp(&rsp);
468
  mndReleaseConsumer(pMnode, pConsumer);
1,077,461✔
469
  PRINT_LOG_END(code);
1,077,764✔
470
  return code;
1,077,764✔
471
}
472

473
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
36,032✔
474
  if (pConsumer == NULL || pTrans == NULL) {
36,032✔
UNCOV
475
    return TSDB_CODE_INVALID_PARA;
×
476
  }
477
  int32_t  code = 0;
36,032✔
478
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
36,032✔
479
  MND_TMQ_NULL_CHECK(pCommitRaw);
36,032✔
480
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
36,032✔
481
  if (code != 0) {
36,032✔
UNCOV
482
    sdbFreeRaw(pCommitRaw);
×
UNCOV
483
    goto END;
×
484
  }
485
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
36,032✔
486
END:
36,032✔
487
  return code;
36,032✔
488
}
489

490
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
191,683✔
491
  if (pConsumer == NULL || pTrans == NULL) {
191,683✔
UNCOV
492
    return TSDB_CODE_INVALID_PARA;
×
493
  }
494
  int32_t  code = 0;
191,683✔
495
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
191,683✔
496
  MND_TMQ_NULL_CHECK(pCommitRaw);
191,683✔
497
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
191,683✔
498
  if (code != 0) {
191,683✔
UNCOV
499
    sdbFreeRaw(pCommitRaw);
×
UNCOV
500
    goto END;
×
501
  }
502
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
191,683✔
503
END:
191,683✔
504
  return code;
191,683✔
505
}
506

507
static void freeItem(void *param) {
72✔
508
  if (param == NULL) {
72✔
UNCOV
509
    return;
×
510
  }
511
  void *pItem = *(void **)param;
72✔
512
  if (pItem != NULL) {
72✔
513
    taosMemoryFree(pItem);
72✔
514
  }
515
}
516

517
#define ADD_TOPIC_TO_ARRAY(element, array) \
518
char *newTopicCopy = taosStrdup(element); \
519
MND_TMQ_NULL_CHECK(newTopicCopy);\
520
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
521
  taosMemoryFree(newTopicCopy);\
522
  code = terrno;\
523
  goto END;\
524
}
525

526
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
40,727✔
527
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
40,727✔
UNCOV
528
    return TSDB_CODE_INVALID_PARA;
×
529
  }
530
  int32_t code = 0;
40,727✔
531
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
40,727✔
532
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
40,727✔
533
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
40,727✔
534
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
40,727✔
535

536
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
40,727✔
537
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
40,727✔
538
  int32_t i = 0, j = 0;
40,727✔
539
  while (i < oldTopicNum || j < newTopicNum) {
82,040✔
540
    if (i >= oldTopicNum) {
41,313✔
541
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
494✔
542
      MND_TMQ_NULL_CHECK(tmp);
494✔
543
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
988✔
544
      j++;
494✔
545
      continue;
494✔
546
    } else if (j >= newTopicNum) {
40,819✔
547
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
39,775✔
548
      MND_TMQ_NULL_CHECK(tmp);
39,775✔
549
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
79,550✔
550
      i++;
39,775✔
551
      continue;
39,775✔
552
    } else {
553
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
1,044✔
554
      MND_TMQ_NULL_CHECK(oldTopic);
1,044✔
555
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
1,044✔
556
      MND_TMQ_NULL_CHECK(newTopic);
1,044✔
557
      int   comp = strcmp(oldTopic, newTopic);
1,044✔
558
      if (comp == 0) {
1,044✔
559
        i++;
1,044✔
560
        j++;
1,044✔
561
        continue;
1,044✔
562
      } else if (comp < 0) {
×
563
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
564
        i++;
×
565
        continue;
×
566
      } else {
×
567
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
UNCOV
568
        j++;
×
UNCOV
569
        continue;
×
570
      }
571
    }
572
  }
573
  // no topics need to be rebalanced
574
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
40,727✔
575
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
1,044✔
576
  }
577

578
END:
39,683✔
579
  return code;
40,727✔
580
}
581

582
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
93,145✔
583
  if (pTopicList == NULL || pMnode == NULL) {
93,145✔
UNCOV
584
    return TSDB_CODE_INVALID_PARA;
×
585
  }
586
  taosArraySort(pTopicList, taosArrayCompareString);
93,145✔
587
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
93,145✔
588

589
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
93,145✔
590
  for (int i = 0; i < newTopicNum; i++) {
147,577✔
591
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
54,542✔
592
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
54,542✔
593
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
110✔
594
    }
595
  }
596
  return 0;
93,035✔
597
}
598

599
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
92,714✔
600
  if (pMnode == NULL || subscribe == NULL) {
92,714✔
UNCOV
601
    return TSDB_CODE_INVALID_PARA;
×
602
  }
603
  int64_t         consumerId = subscribe->consumerId;
92,714✔
604
  char           *cgroup     = subscribe->cgroup;
92,714✔
605
  SMqConsumerObj *pConsumerNew     = NULL;
92,714✔
606
  SMqConsumerObj *pExistedConsumer = NULL;
92,714✔
607
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
92,714✔
608
  if (code != 0) {
92,714✔
609
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
51,987✔
610
              ",cgroup:%s, numOfTopics:%d", consumerId,
611
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
612

613
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
51,987✔
614
  } else {
615
    int32_t status = atomic_load_32(&pExistedConsumer->status);
40,727✔
616

617
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
40,727✔
618
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
619
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
620
          (int32_t)taosArrayGetSize(subscribe->topicNames));
621

622
    if (status != MQ_CONSUMER_STATUS_READY) {
40,727✔
UNCOV
623
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
UNCOV
624
      goto END;
×
625
    }
626
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
40,727✔
627
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
40,727✔
628
  }
629
  mndReleaseConsumer(pMnode, pExistedConsumer);
91,670✔
630
  if (ppConsumer){
91,670✔
631
    *ppConsumer = pConsumerNew;
91,670✔
632
  }
633
  return code;
91,670✔
634

635
END:
1,044✔
636
  mndReleaseConsumer(pMnode, pExistedConsumer);
1,044✔
637
  tDeleteSMqConsumerObj(pConsumerNew);
1,044✔
638
  return code;
1,044✔
639
}
640

641
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
126,251✔
642
  if (pMsg == NULL) {
126,251✔
UNCOV
643
    return TSDB_CODE_INVALID_PARA;
×
644
  }
645
  SMnode *pMnode = pMsg->info.node;
126,251✔
646
  char   *msgStr = pMsg->pCont;
126,251✔
647
  int32_t code = 0;
126,251✔
648
  SMqConsumerObj *pConsumerNew = NULL;
126,251✔
649
  STrans         *pTrans = NULL;
126,251✔
650

651
  PRINT_LOG_START
126,251✔
652
  SCMSubscribeReq subscribe = {0};
126,251✔
653
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
252,502✔
654
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
126,251✔
655
  if(unSubscribe){
126,251✔
656
    SMqConsumerObj *pConsumerTmp = NULL;
72,295✔
657
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
72,295✔
658
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
70,550✔
659
      mndReleaseConsumer(pMnode, pConsumerTmp);
31,361✔
660
      goto END;
31,361✔
661
    }
662
    mndReleaseConsumer(pMnode, pConsumerTmp);
39,189✔
663
  }
664
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
93,145✔
665
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
93,035✔
666
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
667
                          pMsg, "subscribe");
668
  MND_TMQ_NULL_CHECK(pTrans);
93,035✔
669

670
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
93,035✔
671
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
92,714✔
672
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
91,670✔
673
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
91,670✔
674
  code = TSDB_CODE_ACTION_IN_PROGRESS;
91,670✔
675

676
END:
126,251✔
677
  mndTransDrop(pTrans);
126,251✔
678
  tDeleteSMqConsumerObj(pConsumerNew);
126,251✔
679
  taosArrayDestroyP(subscribe.topicNames, NULL);
126,251✔
680
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
126,251✔
681
  if (code != TSDB_CODE_ACTION_IN_PROGRESS){
126,251✔
682
    PRINT_LOG_END(code);
34,581✔
683
  }
684
  return code;
126,251✔
685
}
686

687
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
265,717✔
688
  if (pConsumer == NULL) {
265,717✔
UNCOV
689
    return NULL;
×
690
  }
691
  int32_t code = 0;
265,717✔
692
  int32_t lino = 0;
265,717✔
693
  terrno = TSDB_CODE_OUT_OF_MEMORY;
265,717✔
694

695
  void   *buf = NULL;
265,717✔
696
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
265,717✔
697
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
265,717✔
698

699
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
265,717✔
700
  if (pRaw == NULL) goto CM_ENCODE_OVER;
265,717✔
701

702
  buf = taosMemoryMalloc(tlen);
265,717✔
703
  if (buf == NULL) goto CM_ENCODE_OVER;
265,717✔
704

705
  void *abuf = buf;
265,717✔
706
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
265,717✔
UNCOV
707
    goto CM_ENCODE_OVER;
×
708
  }
709

710
  int32_t dataPos = 0;
265,717✔
711
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
265,717✔
712
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
265,717✔
713
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
265,717✔
714
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
265,717✔
715

716
  terrno = TSDB_CODE_SUCCESS;
265,717✔
717

718
CM_ENCODE_OVER:
265,717✔
719
  taosMemoryFreeClear(buf);
265,717✔
720
  if (terrno != 0) {
265,717✔
721
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
UNCOV
722
    sdbFreeRaw(pRaw);
×
UNCOV
723
    return NULL;
×
724
  }
725

726
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
265,717✔
727
  return pRaw;
265,717✔
728
}
729

730
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
229,242✔
731
  if (pRaw == NULL) {
229,242✔
UNCOV
732
    return NULL;
×
733
  }
734
  int32_t         code = 0;
229,242✔
735
  int32_t         lino = 0;
229,242✔
736
  SSdbRow        *pRow = NULL;
229,242✔
737
  SMqConsumerObj *pConsumer = NULL;
229,242✔
738
  void           *buf = NULL;
229,242✔
739

740
  terrno = 0;
229,242✔
741
  int8_t sver = 0;
229,242✔
742
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
229,242✔
UNCOV
743
    goto CM_DECODE_OVER;
×
744
  }
745

746
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
229,242✔
UNCOV
747
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
UNCOV
748
    goto CM_DECODE_OVER;
×
749
  }
750

751
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
229,242✔
752
  if (pRow == NULL) {
229,242✔
UNCOV
753
    goto CM_DECODE_OVER;
×
754
  }
755

756
  pConsumer = sdbGetRowObj(pRow);
229,242✔
757
  if (pConsumer == NULL) {
229,242✔
UNCOV
758
    goto CM_DECODE_OVER;
×
759
  }
760

761
  int32_t dataPos = 0;
229,242✔
762
  int32_t len;
226,386✔
763
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
229,242✔
764
  buf = taosMemoryMalloc(len);
229,242✔
765
  if (buf == NULL) {
229,242✔
UNCOV
766
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
767
    goto CM_DECODE_OVER;
×
768
  }
769

770
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
229,242✔
771
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
229,242✔
772

773
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
229,242✔
UNCOV
774
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
UNCOV
775
    goto CM_DECODE_OVER;
×
776
  }
777

778
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
229,242✔
779

780
CM_DECODE_OVER:
229,242✔
781
  taosMemoryFreeClear(buf);
229,242✔
782
  if (terrno != TSDB_CODE_SUCCESS) {
229,242✔
783
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
784
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
UNCOV
785
    taosMemoryFreeClear(pRow);
×
786
  }
787

788
  return pRow;
229,242✔
789
}
790

791
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
53,052✔
792
  if (pConsumer == NULL) {
53,052✔
UNCOV
793
    return TSDB_CODE_INVALID_PARA;
×
794
  }
795
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
53,052✔
796
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
797
  pConsumer->subscribeTime = pConsumer->createTime;
53,052✔
798
  return 0;
53,052✔
799
}
800

801
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
229,242✔
802
  if (pConsumer == NULL) {
229,242✔
UNCOV
803
    return TSDB_CODE_INVALID_PARA;
×
804
  }
805
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
229,242✔
806
        mndConsumerStatusName(pConsumer->status));
807
  tClearSMqConsumerObj(pConsumer);
229,242✔
808
  return 0;
229,242✔
809
}
810

811
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
812
//  int32_t status = pConsumer->status;
813
//
814
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
815
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
816
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
817
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
818
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
819
//    }
820
//  }
821
//}
822

823
// remove from topic list
824
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
136,198✔
825
  if (topicList == NULL || pTopic == NULL) {
136,198✔
UNCOV
826
    return;
×
827
  }
828
  int32_t size = taosArrayGetSize(topicList);
136,198✔
829
  for (int32_t i = 0; i < size; i++) {
137,173✔
830
    char *p = taosArrayGetP(topicList, i);
135,567✔
831
    if (strcmp(pTopic, p) == 0) {
135,567✔
832
      taosArrayRemove(topicList, i);
134,592✔
833
      taosMemoryFree(p);
134,592✔
834

835
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
134,592✔
836
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
837
      break;
134,592✔
838
    }
839
  }
840
}
841

842
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
53,236✔
843
  if (pConsumer == NULL || pTopic == NULL) {
53,236✔
UNCOV
844
    return false;
×
845
  }
846
  bool    existing = false;
53,236✔
847
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
53,236✔
848
  for (int32_t i = 0; i < size; i++) {
53,846✔
849
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
610✔
850
    if (topic && strcmp(topic, pTopic) == 0) {
610✔
UNCOV
851
      existing = true;
×
UNCOV
852
      break;
×
853
    }
854
  }
855

856
  return existing;
53,236✔
857
}
858

859
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
140,065✔
860
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
140,065✔
UNCOV
861
    return TSDB_CODE_INVALID_PARA;
×
862
  }
863
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
140,065✔
864
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
865

866
  taosWLockLatch(&pOldConsumer->lock);
140,065✔
867

868
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
140,065✔
869
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
39,783✔
870
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
39,783✔
871
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
39,783✔
872

873
    pOldConsumer->subscribeTime = taosGetTimestampMs();
39,783✔
874
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
39,783✔
875
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
39,783✔
876
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
100,282✔
877
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
5,565✔
878
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
5,565✔
879
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
5,565✔
880
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
94,717✔
881
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
53,236✔
882
    if (tmp == NULL){
53,236✔
UNCOV
883
      return TSDB_CODE_TMQ_INVALID_MSG;
×
884
    }
885
    char *pNewTopic = taosStrdup(tmp);
53,236✔
886
    if (pNewTopic == NULL) {
53,236✔
UNCOV
887
      return terrno;
×
888
    }
889
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
53,236✔
890
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
53,236✔
891
    if (existing) {
53,236✔
UNCOV
892
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
UNCOV
893
      taosMemoryFree(pNewTopic);
×
894
    } else {
895
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
106,472✔
UNCOV
896
        taosMemoryFree(pNewTopic);
×
UNCOV
897
        return TSDB_CODE_TMQ_INVALID_MSG;
×
898
      }
899
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
53,236✔
900
    }
901

902
    int32_t status = pOldConsumer->status;
53,236✔
903
//    updateConsumerStatus(pOldConsumer);
904
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
53,236✔
905
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
52,650✔
906
    }
907

908
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
53,236✔
909
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
53,236✔
910

911
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
53,236✔
912
          ", current topics:%d, newTopics:%d, removeTopics:%d",
913
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
914
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
915
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
916
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
917

918
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
41,481✔
919
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
41,481✔
920
    if (topic == NULL){
41,481✔
UNCOV
921
      return TSDB_CODE_TMQ_INVALID_MSG;
×
922
    }
923
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
41,481✔
924
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
41,481✔
925

926
    int32_t status = pOldConsumer->status;
41,481✔
927
//    updateConsumerStatus(pOldConsumer);
928
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
41,481✔
929
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
40,895✔
930
    }
931
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
41,481✔
932
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
41,481✔
933

934
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
41,481✔
935
          ", current topics:%d, newTopics:%d, removeTopics:%d",
936
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
937
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
938
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
939
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
940
  }
941

942
  taosWUnLockLatch(&pOldConsumer->lock);
140,065✔
943
  return 0;
140,065✔
944
}
945

946
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
1,579,709✔
947
  if (pMnode == NULL || pConsumer == NULL) {
1,579,709✔
UNCOV
948
    return TSDB_CODE_INVALID_PARA;
×
949
  }
950
  SSdb           *pSdb = pMnode->pSdb;
1,579,709✔
951
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
1,579,709✔
952
  if (*pConsumer == NULL) {
1,579,709✔
953
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
60,745✔
954
  }
955
  return 0;
1,518,964✔
956
}
957

958
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
13,911,390✔
959
  if (pMnode == NULL || pConsumer == NULL) {
13,911,390✔
960
    return;
11,972,968✔
961
  }
962
  SSdb *pSdb = pMnode->pSdb;
1,938,422✔
963
  sdbRelease(pSdb, pConsumer);
1,938,422✔
964
}
965

966
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
9,879✔
967
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
9,879✔
UNCOV
968
    return TSDB_CODE_INVALID_PARA;
×
969
  }
970
  SMnode         *pMnode = pReq->info.node;
9,879✔
971
  SSdb           *pSdb = pMnode->pSdb;
9,879✔
972
  int32_t         numOfRows = 0;
9,879✔
973
  SMqConsumerObj *pConsumer = NULL;
9,879✔
974
  int32_t         code = 0;
9,879✔
975
  char           *parasStr = NULL;
9,879✔
976
  char           *status = NULL;
9,879✔
977

978
  while (numOfRows < rowsCapacity) {
24,846✔
979
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
24,846✔
980
    if (pShow->pIter == NULL) {
24,846✔
981
      break;
9,879✔
982
    }
983

984
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
14,967✔
985
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
481✔
986
      sdbRelease(pSdb, pConsumer);
481✔
987
      continue;
481✔
988
    }
989

990
    taosRLockLatch(&pConsumer->lock);
14,486✔
991
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
14,486✔
992

993
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
14,486✔
994
    bool    hasTopic = true;
14,486✔
995
    if (topicSz == 0) {
14,486✔
UNCOV
996
      hasTopic = false;
×
UNCOV
997
      topicSz = 1;
×
998
    }
999

1000
    if (numOfRows + topicSz > rowsCapacity) {
14,486✔
UNCOV
1001
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
1002
    }
1003

1004
    for (int32_t i = 0; i < topicSz; i++) {
28,972✔
1005
      SColumnInfoData *pColInfo = NULL;
14,486✔
1006
      int32_t          cols = 0;
14,486✔
1007

1008
      // consumer id
1009
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1010
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
14,486✔
1011
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
14,486✔
1012

1013
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1014
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1015
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
14,486✔
1016

1017
      // consumer group
1018
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1019
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
14,486✔
1020

1021
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1022
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1023
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
14,486✔
1024

1025
      // client id
1026
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1027
      STR_TO_VARSTR(clientId, pConsumer->clientId);
14,486✔
1028

1029
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1030
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1031
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
14,486✔
1032

1033
      // user
1034
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1035
      STR_TO_VARSTR(user, pConsumer->user);
14,486✔
1036

1037
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1038
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1039
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
14,486✔
1040

1041
      // fqdn
1042
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1043
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
14,486✔
1044

1045
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1046
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1047
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
14,486✔
1048

1049
      // status
1050
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
14,486✔
1051
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
14,486✔
1052
      MND_TMQ_NULL_CHECK(status);
14,486✔
1053
      STR_TO_VARSTR(status, pStatusName);
14,486✔
1054

1055
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1056
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1057
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
14,486✔
1058
      taosMemoryFreeClear(status);
14,486✔
1059

1060
      // one subscribed topic
1061
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1062
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1063
      if (hasTopic) {
14,486✔
1064
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
14,486✔
1065
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
14,486✔
1066
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
14,486✔
1067
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
14,486✔
1068
      } else {
UNCOV
1069
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1070
      }
1071

1072
      // up time
1073
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1074
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1075
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
14,486✔
1076

1077
      // subscribe time
1078
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1079
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1080
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
14,486✔
1081

1082
      // rebalance time
1083
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1084
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1085
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
14,486✔
1086

1087
      char         buf[TSDB_OFFSET_LEN] = {0};
14,486✔
1088
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
14,486✔
1089
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
14,486✔
1090

1091
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
14,486✔
1092
      MND_TMQ_NULL_CHECK(parasStr);
14,486✔
1093
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
28,972✔
1094
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
28,972✔
1095
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
14,486✔
1096

1097
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1098
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1099
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
14,486✔
1100
      taosMemoryFreeClear(parasStr);
14,486✔
1101

1102
      // rebalance time
1103
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
14,486✔
1104
      MND_TMQ_NULL_CHECK(pColInfo);
14,486✔
1105
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
14,486✔
1106
      numOfRows++;
14,486✔
1107
    }
1108

1109
    taosRUnLockLatch(&pConsumer->lock);
14,486✔
1110
    sdbRelease(pSdb, pConsumer);
14,486✔
1111

1112
    pBlock->info.rows = numOfRows;
14,486✔
1113
  }
1114

1115
  pShow->numOfRows += numOfRows;
9,879✔
1116
  return numOfRows;
9,879✔
1117

1118
END:
×
1119
  taosMemoryFreeClear(status);
×
UNCOV
1120
  taosMemoryFreeClear(parasStr);
×
UNCOV
1121
  return code;
×
1122
}
1123

1124
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1125
  if (pMnode == NULL || pIter == NULL) return;
×
UNCOV
1126
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
1127
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1128
}
1129

1130
const char *mndConsumerStatusName(int status) {
1,357,887✔
1131
  switch (status) {
1,357,887✔
1132
    case MQ_CONSUMER_STATUS_READY:
558,192✔
1133
      return "ready";
558,192✔
1134
//    case MQ_CONSUMER_STATUS_LOST:
1135
//      return "lost";
1136
    case MQ_CONSUMER_STATUS_REBALANCE:
799,695✔
1137
      return "rebalancing";
799,695✔
UNCOV
1138
    default:
×
UNCOV
1139
      return "unknown";
×
1140
  }
1141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc