• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3525

10 Nov 2024 03:50AM UTC coverage: 60.818% (-0.08%) from 60.898%
#3525

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

118634 of 249004 branches covered (47.64%)

Branch coverage included in aggregate %.

136 of 169 new or added lines in 23 files covered. (80.47%)

542 existing lines in 129 files now uncovered.

199071 of 273386 relevant lines covered (72.82%)

15691647.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.99
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
1,974✔
45
  SSdbTable table = {
1,974✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
1,974✔
56
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
1,974✔
57
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
1,974✔
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
1,974✔
59

60
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
1,974✔
61
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
1,974✔
62

63
  return sdbSetTable(pMnode->pSdb, table);
1,974✔
64
}
65

66
void mndCleanupConsumer(SMnode *pMnode) {}
1,973✔
67

68
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
417✔
69
  int32_t code = 0;
417✔
70
  void   *msg  = rpcMallocCont(sizeof(int64_t));
417✔
71
  MND_TMQ_NULL_CHECK(msg);
417!
72

73
  *(int64_t*)msg = consumerId;
417✔
74
  SRpcMsg rpcMsg = {
417✔
75
      .msgType = msgType,
76
      .pCont = msg,
77
      .contLen = sizeof(int64_t),
78
      .info = *info,
79
  };
80

81
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
417!
82
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
417!
83
  return code;
417✔
84

85
END:
×
86
  taosMemoryFree(msg);
×
87
  return code;
×
88
}
89

90
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
1,017✔
91
  SMqTopicObj *pTopic = NULL;
1,017✔
92
  int32_t      code = 0;
1,017✔
93

94
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
1,017✔
95
  for (int32_t i = 0; i < numOfTopics; i++) {
1,643✔
96
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
630✔
97
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
634!
98
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
630✔
99
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
629!
100

101
    if (subscribe->enableReplay) {
629✔
102
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
7✔
103
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
2✔
104
        goto END;
2✔
105
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
5!
106
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
4✔
107
        if (pDb == NULL) {
4!
108
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
109
          goto END;
×
110
        }
111
        if (pDb->cfg.numOfVgroups != 1) {
4✔
112
          mndReleaseDb(pMnode, pDb);
1✔
113
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
1✔
114
          goto END;
1✔
115
        }
116
        mndReleaseDb(pMnode, pDb);
3✔
117
      }
118
    }
119
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
626✔
120
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
626✔
121
    mndTransSetDbName(pTrans, pTopic->db, key);
626✔
122
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
626!
123
    mndReleaseTopic(pMnode, pTopic);
626✔
124
  }
125
  return 0;
1,013✔
126

127
END:
4✔
128
  mndReleaseTopic(pMnode, pTopic);
4✔
129
  return code;
4✔
130
}
131

132
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
417✔
133
  int32_t              code = 0;
417✔
134
  SMnode              *pMnode = pMsg->info.node;
417✔
135
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
417✔
136
  SMqConsumerObj      *pConsumerNew = NULL;
417✔
137
  STrans              *pTrans = NULL;
417✔
138
  SMqConsumerObj      *pConsumer = NULL;
417✔
139

140
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
417✔
141
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
411!
142
        mndConsumerStatusName(pConsumer->status));
143

144
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
411!
145
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
411✔
146
  MND_TMQ_NULL_CHECK(pTrans);
411!
147
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
411!
148
  code = mndTransPrepare(pMnode, pTrans);
411✔
149

150
END:
417✔
151
  mndReleaseConsumer(pMnode, pConsumer);
417✔
152
  tDeleteSMqConsumerObj(pConsumerNew);
417✔
153
  mndTransDrop(pTrans);
417✔
154
  return code;
417✔
155
}
156

157
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
3,012✔
158
  int32_t code = 0;
3,012✔
159
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
3,012✔
160
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
3,012!
161
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
5,788✔
162
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,776✔
163
    SMqTopicObj *pTopic = NULL;
2,776✔
164
    code = mndAcquireTopic(pMnode, topic, &pTopic);
2,776✔
165
    if (code != TDB_CODE_SUCCESS) {
2,776!
166
      continue;
×
167
    }
168
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
2,776✔
169
    MND_TMQ_NULL_CHECK(data);
2,776!
170
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
2,776✔
171
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
5,549!
172
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
2,773✔
173
      data->noPrivilege = 1;
3✔
174
    } else {
175
      data->noPrivilege = 0;
2,773✔
176
    }
177
    mndReleaseTopic(pMnode, pTopic);
2,776✔
178
  }
179
END:
3,012✔
180
  return code;
3,012✔
181
}
182

183
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
3,012✔
184
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
5,792✔
185
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
2,780✔
186
    if (data == NULL){
2,780!
187
      continue;
×
188
    }
189
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
2,780!
190

191
    SMqSubscribeObj *pSub = NULL;
2,780✔
192
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
2,780✔
193
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
2,780✔
194
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
2,780✔
195
    if (code != 0) {
2,780!
196
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
197
      continue;
×
198
    }
199
    taosWLockLatch(&pSub->lock);
2,780✔
200
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
2,780✔
201
    if (pConsumerEp) {
2,780✔
202
      taosArrayDestroy(pConsumerEp->offsetRows);
2,721✔
203
      pConsumerEp->offsetRows = data->offsetRows;
2,721✔
204
      data->offsetRows = NULL;
2,721✔
205
    }
206
    taosWUnLockLatch(&pSub->lock);
2,780✔
207

208
    mndReleaseSubscribe(pMnode, pSub);
2,780✔
209
  }
210
}
3,012✔
211

212
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
3,012✔
213
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
3,012✔
214
  if (tlen <= 0){
3,012!
215
    return TSDB_CODE_TMQ_INVALID_MSG;
×
216
  }
217
  void   *buf = rpcMallocCont(tlen);
3,012✔
218
  if (buf == NULL) {
3,012!
219
    return terrno;
×
220
  }
221

222
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
3,012!
223
    rpcFreeCont(buf);
×
224
    return TSDB_CODE_TMQ_INVALID_MSG;
×
225
  }
226
  pMsg->info.rsp = buf;
3,012✔
227
  pMsg->info.rspLen = tlen;
3,012✔
228
  return 0;
3,012✔
229
}
230

231
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
3,045✔
232
  int32_t         code = 0;
3,045✔
233
  SMnode         *pMnode = pMsg->info.node;
3,045✔
234
  SMqHbReq        req = {0};
3,045✔
235
  SMqHbRsp        rsp = {0};
3,045✔
236
  SMqConsumerObj *pConsumer = NULL;
3,045✔
237
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
3,045!
238
  int64_t consumerId = req.consumerId;
3,045✔
239
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
3,045✔
240
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
3,012!
241
  atomic_store_32(&pConsumer->hbStatus, 0);
3,012✔
242
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus);
3,012!
243
  if (req.pollFlag == 1){
3,012✔
244
    atomic_store_32(&pConsumer->pollStatus, 0);
1,098✔
245
  }
246

247
  storeOffsetRows(pMnode, &req, pConsumer);
3,012✔
248
  rsp.debugFlag = tqClientDebugFlag;
3,012✔
249
  code = buildMqHbRsp(pMsg, &rsp);
3,012✔
250

251
END:
3,045✔
252
  tDestroySMqHbRsp(&rsp);
3,045✔
253
  mndReleaseConsumer(pMnode, pConsumer);
3,045✔
254
  tDestroySMqHbReq(&req);
3,045✔
255
  return code;
3,045✔
256
}
257

258
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
3,880✔
259
  taosRLockLatch(&pConsumer->lock);
3,880✔
260

261
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
3,880✔
262

263
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
3,880✔
264
  if (rsp->topics == NULL) {
3,880!
265
    taosRUnLockLatch(&pConsumer->lock);
×
266
    return terrno;
×
267
  }
268

269
  // handle all topics subscribed by this consumer
270
  for (int32_t i = 0; i < numOfTopics; i++) {
7,409✔
271
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
3,529✔
272
    SMqSubscribeObj *pSub = NULL;
3,529✔
273
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
3,529✔
274
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
3,529✔
275
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
3,529✔
276
    if (code != 0) {
3,529!
277
      continue;
×
278
    }
279
    taosRLockLatch(&pSub->lock);
3,529✔
280

281
    SMqSubTopicEp topicEp = {0};
3,529✔
282
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
3,529✔
283

284
    // 2.1 fetch topic schema
285
    SMqTopicObj *pTopic = NULL;
3,529✔
286
    code = mndAcquireTopic(pMnode, topic, &pTopic);
3,529✔
287
    if (code != TDB_CODE_SUCCESS) {
3,529!
288
      taosRUnLockLatch(&pSub->lock);
×
289
      mndReleaseSubscribe(pMnode, pSub);
×
290
      continue;
×
291
    }
292
    taosRLockLatch(&pTopic->lock);
3,529✔
293
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
3,529✔
294
    topicEp.schema.nCols = pTopic->schema.nCols;
3,529✔
295
    if (topicEp.schema.nCols) {
3,529✔
296
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
2,353✔
297
      if (topicEp.schema.pSchema == NULL) {
2,353!
298
        taosRUnLockLatch(&pTopic->lock);
×
299
        taosRUnLockLatch(&pSub->lock);
×
300
        mndReleaseSubscribe(pMnode, pSub);
×
301
        mndReleaseTopic(pMnode, pTopic);
×
302
        return terrno;
×
303
      }
304
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
2,353✔
305
    }
306
    taosRUnLockLatch(&pTopic->lock);
3,529✔
307
    mndReleaseTopic(pMnode, pTopic);
3,529✔
308

309
    // 2.2 iterate all vg assigned to the consumer of that topic
310
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
3,529✔
311
    if (pConsumerEp == NULL) {
3,529!
312
      taosRUnLockLatch(&pConsumer->lock);
×
313
      taosRUnLockLatch(&pSub->lock);
×
314
      mndReleaseSubscribe(pMnode, pSub);
×
315
      return TSDB_CODE_OUT_OF_MEMORY;
×
316
    }
317
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
3,529✔
318
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
3,529✔
319
    if (topicEp.vgs == NULL) {
3,529!
320
      taosRUnLockLatch(&pConsumer->lock);
×
321
      taosRUnLockLatch(&pSub->lock);
×
322
      mndReleaseSubscribe(pMnode, pSub);
×
323
      return terrno;
×
324
    }
325

326
    for (int32_t j = 0; j < vgNum; j++) {
8,126✔
327
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
4,597✔
328
      if (pVgEp == NULL) {
4,597!
329
        continue;
×
330
      }
331
      if (epoch == -1) {
4,597✔
332
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
2,779✔
333
        if (pVgroup) {
2,779✔
334
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
173✔
335
          mndReleaseVgroup(pMnode, pVgroup);
173✔
336
        }
337
      }
338
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
4,597✔
339
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
9,194!
340
        taosArrayDestroy(topicEp.vgs);
×
341
        taosRUnLockLatch(&pConsumer->lock);
×
342
        taosRUnLockLatch(&pSub->lock);
×
343
        mndReleaseSubscribe(pMnode, pSub);
×
344
        return terrno;
×
345
      }
346
    }
347
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
7,058!
348
      taosArrayDestroy(topicEp.vgs);
×
349
      taosRUnLockLatch(&pConsumer->lock);
×
350
      taosRUnLockLatch(&pSub->lock);
×
351
      mndReleaseSubscribe(pMnode, pSub);
×
352
      return terrno;
×
353
    }
354
    taosRUnLockLatch(&pSub->lock);
3,529✔
355
    mndReleaseSubscribe(pMnode, pSub);
3,529✔
356
  }
357
  taosRUnLockLatch(&pConsumer->lock);
3,880✔
358
  return 0;
3,880✔
359
}
360

361
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
9,655✔
362
  int32_t code = 0;
9,655✔
363
  // encode rsp
364
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
9,655✔
365
  void   *buf = rpcMallocCont(tlen);
9,655✔
366
  if (buf == NULL) {
9,656!
367
    return terrno;
×
368
  }
369

370
  SMqRspHead *pHead = buf;
9,656✔
371

372
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
9,656✔
373
  pHead->epoch = serverEpoch;
9,656✔
374
  pHead->consumerId = consumerId;
9,656✔
375
  pHead->walsver = 0;
9,656✔
376
  pHead->walever = 0;
9,656✔
377

378
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
9,656✔
379
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
9,656!
380
    rpcFreeCont(buf);
×
381
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
382
  }
383

384
  // send rsp
385
  pMsg->info.rsp = buf;
9,656✔
386
  pMsg->info.rspLen = tlen;
9,656✔
387
  return code;
9,656✔
388
}
389

390
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
12,734✔
391
  SMnode     *pMnode = pMsg->info.node;
12,734✔
392
  SMqAskEpReq req = {0};
12,734✔
393
  SMqAskEpRsp rsp = {0};
12,734✔
394
  int32_t     code = 0;
12,734✔
395
  SMqConsumerObj *pConsumer = NULL;
12,734✔
396

397
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
12,734!
398
  int64_t consumerId = req.consumerId;
12,733✔
399
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
12,733✔
400
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
12,601!
401
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
402
           pConsumer->cgroup);
403
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
404
    goto END;
×
405
  }
406

407
  // 1. check consumer status
408
  int32_t status = atomic_load_32(&pConsumer->status);
12,601✔
409
  if (status != MQ_CONSUMER_STATUS_READY) {
12,600✔
410
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
2,945!
411
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
2,945✔
412
    goto END;
2,945✔
413
  }
414

415
  int32_t epoch = req.epoch;
9,655✔
416
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
9,655✔
417

418
  // 2. check epoch, only send ep info when epochs do not match
419
  if (epoch != serverEpoch) {
9,655✔
420
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
3,880!
421
          consumerId, epoch, serverEpoch);
422
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
3,880!
423
  }
424

425
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
9,655✔
426

427
END:
12,734✔
428
  tDeleteSMqAskEpRsp(&rsp);
429
  mndReleaseConsumer(pMnode, pConsumer);
12,734✔
430
  return code;
12,734✔
431
}
432

433
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
411✔
434
  int32_t  code = 0;
411✔
435
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
411✔
436
  MND_TMQ_NULL_CHECK(pCommitRaw);
411!
437
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
411✔
438
  if (code != 0) {
411!
439
    sdbFreeRaw(pCommitRaw);
×
440
    goto END;
×
441
  }
442
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
411!
443
END:
411✔
444
  return code;
411✔
445
}
446

447
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
2,202✔
448
  int32_t  code = 0;
2,202✔
449
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
2,202✔
450
  MND_TMQ_NULL_CHECK(pCommitRaw);
2,202!
451
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
2,202✔
452
  if (code != 0) {
2,202!
453
    sdbFreeRaw(pCommitRaw);
×
454
    goto END;
×
455
  }
456
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
2,202!
457
END:
2,202✔
458
  return code;
2,202✔
459
}
460

461
static void freeItem(void *param) {
1✔
462
  void *pItem = *(void **)param;
1✔
463
  if (pItem != NULL) {
1!
464
    taosMemoryFree(pItem);
1✔
465
  }
466
}
1✔
467

468
#define ADD_TOPIC_TO_ARRAY(element, array) \
469
char *newTopicCopy = taosStrdup(element); \
470
MND_TMQ_NULL_CHECK(newTopicCopy);\
471
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
472
  taosMemoryFree(newTopicCopy);\
473
  code = terrno;\
474
  goto END;\
475
}
476

477
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
457✔
478
  int32_t code = 0;
457✔
479
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
457✔
480
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
457!
481
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
457✔
482
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
457!
483

484
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
457✔
485
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
457✔
486
  int32_t i = 0, j = 0;
457✔
487
  while (i < oldTopicNum || j < newTopicNum) {
972!
488
    if (i >= oldTopicNum) {
515!
UNCOV
489
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
×
UNCOV
490
      MND_TMQ_NULL_CHECK(tmp);
×
UNCOV
491
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
×
UNCOV
492
      j++;
×
UNCOV
493
      continue;
×
494
    } else if (j >= newTopicNum) {
515✔
495
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
505✔
496
      MND_TMQ_NULL_CHECK(tmp);
505!
497
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
1,010!
498
      i++;
505✔
499
      continue;
505✔
500
    } else {
501
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
10✔
502
      MND_TMQ_NULL_CHECK(oldTopic);
10!
503
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
10✔
504
      MND_TMQ_NULL_CHECK(newTopic);
10!
505
      int   comp = strcmp(oldTopic, newTopic);
10✔
506
      if (comp == 0) {
10!
507
        i++;
10✔
508
        j++;
10✔
509
        continue;
10✔
510
      } else if (comp < 0) {
×
511
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
512
        i++;
×
513
        continue;
×
514
      } else {
×
515
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
516
        j++;
×
517
        continue;
×
518
      }
519
    }
520
  }
521
  // no topics need to be rebalanced
522
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
457!
523
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
10✔
524
  }
525

526
END:
447✔
527
  return code;
457✔
528
}
529

530
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
1,018✔
531
  taosArraySort(pTopicList, taosArrayCompareString);
1,018✔
532
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
1,018✔
533

534
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
1,018✔
535
  for (int i = 0; i < newTopicNum; i++) {
1,648✔
536
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
631✔
537
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
631✔
538
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
1✔
539
    }
540
  }
541
  return 0;
1,017✔
542
}
543

544
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
1,013✔
545
  int64_t         consumerId = subscribe->consumerId;
1,013✔
546
  char           *cgroup     = subscribe->cgroup;
1,013✔
547
  SMqConsumerObj *pConsumerNew     = NULL;
1,013✔
548
  SMqConsumerObj *pExistedConsumer = NULL;
1,013✔
549
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
1,013✔
550
  if (code != 0) {
1,013✔
551
    mInfo("receive subscribe request from new consumer:0x%" PRIx64
556!
552
              ",cgroup:%s, numOfTopics:%d", consumerId,
553
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
554

555
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
556!
556
  } else {
557
    int32_t status = atomic_load_32(&pExistedConsumer->status);
457✔
558

559
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
457!
560
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
561
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
562
          (int32_t)taosArrayGetSize(subscribe->topicNames));
563

564
    if (status != MQ_CONSUMER_STATUS_READY) {
457!
565
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
566
      goto END;
×
567
    }
568
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
457!
569
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
457✔
570
  }
571
  mndReleaseConsumer(pMnode, pExistedConsumer);
1,003✔
572
  if (ppConsumer){
1,003!
573
    *ppConsumer = pConsumerNew;
1,003✔
574
  }
575
  return code;
1,003✔
576

577
END:
10✔
578
  mndReleaseConsumer(pMnode, pExistedConsumer);
10✔
579
  tDeleteSMqConsumerObj(pConsumerNew);
10✔
580
  return code;
10✔
581
}
582

583
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
1,400✔
584
  SMnode *pMnode = pMsg->info.node;
1,400✔
585
  char   *msgStr = pMsg->pCont;
1,400✔
586
  int32_t code = 0;
1,400✔
587
  SMqConsumerObj *pConsumerNew = NULL;
1,400✔
588
  STrans         *pTrans = NULL;
1,400✔
589

590
  SCMSubscribeReq subscribe = {0};
1,400✔
591
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
2,800!
592
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
1,400✔
593
  if(unSubscribe){
1,400✔
594
    SMqConsumerObj *pConsumerTmp = NULL;
829✔
595
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
1,204✔
596
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
822✔
597
      mndReleaseConsumer(pMnode, pConsumerTmp);
375✔
598
      goto END;
375✔
599
    }
600
    mndReleaseConsumer(pMnode, pConsumerTmp);
447✔
601
  }
602
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
1,018✔
603
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
1,017✔
604
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
605
                          pMsg, "subscribe");
606
  MND_TMQ_NULL_CHECK(pTrans);
1,017!
607

608
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
1,017✔
609
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
1,013✔
610
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
1,003!
611
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
1,003!
612
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,003✔
613

614
END:
1,400✔
615
  mndTransDrop(pTrans);
1,400✔
616
  tDeleteSMqConsumerObj(pConsumerNew);
1,400✔
617
  taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
1,400✔
618
  return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
1,400✔
619
}
620

621
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
2,984✔
622
  int32_t code = 0;
2,984✔
623
  int32_t lino = 0;
2,984✔
624
  terrno = TSDB_CODE_OUT_OF_MEMORY;
2,984✔
625

626
  void   *buf = NULL;
2,984✔
627
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
2,984✔
628
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
2,984✔
629

630
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
2,984✔
631
  if (pRaw == NULL) goto CM_ENCODE_OVER;
2,984!
632

633
  buf = taosMemoryMalloc(tlen);
2,984✔
634
  if (buf == NULL) goto CM_ENCODE_OVER;
2,984!
635

636
  void *abuf = buf;
2,984✔
637
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
2,984!
638
    goto CM_ENCODE_OVER;
×
639
  }
640

641
  int32_t dataPos = 0;
2,984✔
642
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
2,984!
643
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
2,984!
644
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
2,984!
645
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
2,984!
646

647
  terrno = TSDB_CODE_SUCCESS;
2,984✔
648

649
CM_ENCODE_OVER:
2,984✔
650
  taosMemoryFreeClear(buf);
2,984!
651
  if (terrno != 0) {
2,984!
652
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
653
    sdbFreeRaw(pRaw);
×
654
    return NULL;
×
655
  }
656

657
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
2,984✔
658
  return pRaw;
2,984✔
659
}
660

661
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
2,657✔
662
  int32_t         code = 0;
2,657✔
663
  int32_t         lino = 0;
2,657✔
664
  SSdbRow        *pRow = NULL;
2,657✔
665
  SMqConsumerObj *pConsumer = NULL;
2,657✔
666
  void           *buf = NULL;
2,657✔
667

668
  terrno = 0;
2,657✔
669
  int8_t sver = 0;
2,657✔
670
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
2,657!
671
    goto CM_DECODE_OVER;
×
672
  }
673

674
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
2,657!
675
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
676
    goto CM_DECODE_OVER;
×
677
  }
678

679
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
2,657✔
680
  if (pRow == NULL) {
2,657!
681
    goto CM_DECODE_OVER;
×
682
  }
683

684
  pConsumer = sdbGetRowObj(pRow);
2,657✔
685
  if (pConsumer == NULL) {
2,657!
686
    goto CM_DECODE_OVER;
×
687
  }
688

689
  int32_t dataPos = 0;
2,657✔
690
  int32_t len;
691
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
2,657!
692
  buf = taosMemoryMalloc(len);
2,657✔
693
  if (buf == NULL) {
2,657!
694
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
695
    goto CM_DECODE_OVER;
×
696
  }
697

698
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
2,657!
699
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
2,657!
700

701
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
2,657!
702
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
703
    goto CM_DECODE_OVER;
×
704
  }
705

706
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
2,657✔
707

708
CM_DECODE_OVER:
2,657✔
709
  taosMemoryFreeClear(buf);
2,657!
710
  if (terrno != TSDB_CODE_SUCCESS) {
2,657!
711
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
712
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
713
    taosMemoryFreeClear(pRow);
×
714
  }
715

716
  return pRow;
2,657✔
717
}
718

719
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
582✔
720
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
582!
721
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
722
  pConsumer->subscribeTime = pConsumer->createTime;
582✔
723
  return 0;
582✔
724
}
725

726
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
2,657✔
727
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
2,657!
728
        mndConsumerStatusName(pConsumer->status));
729
  tClearSMqConsumerObj(pConsumer);
2,657✔
730
  return 0;
2,657✔
731
}
732

733
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
734
//  int32_t status = pConsumer->status;
735
//
736
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
737
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
738
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
739
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
740
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
741
//    }
742
//  }
743
//}
744

745
// remove from topic list
746
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
1,660✔
747
  int32_t size = taosArrayGetSize(topicList);
1,660✔
748
  for (int32_t i = 0; i < size; i++) {
1,790✔
749
    char *p = taosArrayGetP(topicList, i);
1,780✔
750
    if (strcmp(pTopic, p) == 0) {
1,780✔
751
      taosArrayRemove(topicList, i);
1,650✔
752
      taosMemoryFree(p);
1,650✔
753

754
      mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
1,650!
755
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
756
      break;
1,650✔
757
    }
758
  }
759
}
1,660✔
760

761
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
622✔
762
  bool    existing = false;
622✔
763
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
622✔
764
  for (int32_t i = 0; i < size; i++) {
700✔
765
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
78✔
766
    if (topic && strcmp(topic, pTopic) == 0) {
78!
767
      existing = true;
×
768
      break;
×
769
    }
770
  }
771

772
  return existing;
622✔
773
}
774

775
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
1,660✔
776
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
1,660!
777
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
778

779
  taosWLockLatch(&pOldConsumer->lock);
1,660✔
780

781
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
1,660✔
782
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
451✔
783
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
451✔
784
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
451✔
785

786
    pOldConsumer->subscribeTime = taosGetTimestampMs();
451✔
787
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
451✔
788
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
451!
789
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
1,209✔
790
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
68✔
791
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
68✔
792
    mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
68!
793
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
1,141✔
794
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
622✔
795
    if (tmp == NULL){
622!
796
      return TSDB_CODE_TMQ_INVALID_MSG;
×
797
    }
798
    char *pNewTopic = taosStrdup(tmp);
622✔
799
    if (pNewTopic == NULL) {
622!
800
      return terrno;
×
801
    }
802
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
622✔
803
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
622✔
804
    if (existing) {
622!
805
      mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
806
      taosMemoryFree(pNewTopic);
×
807
    } else {
808
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
1,244!
809
        taosMemoryFree(pNewTopic);
×
810
        return TSDB_CODE_TMQ_INVALID_MSG;
×
811
      }
812
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
622✔
813
    }
814

815
    int32_t status = pOldConsumer->status;
622✔
816
//    updateConsumerStatus(pOldConsumer);
817
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
622!
818
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
562✔
819
    }
820

821
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
622✔
822
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
622✔
823

824
    mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
622!
825
          ", current topics:%d, newTopics:%d, removeTopics:%d",
826
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
827
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
828
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
829
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
830

831
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
519!
832
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
519✔
833
    if (topic == NULL){
519!
834
      return TSDB_CODE_TMQ_INVALID_MSG;
×
835
    }
836
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
519✔
837
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
519✔
838

839
    int32_t status = pOldConsumer->status;
519✔
840
//    updateConsumerStatus(pOldConsumer);
841
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
519!
842
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
461✔
843
    }
844
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
519✔
845
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
519✔
846

847
    mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
519!
848
          ", current topics:%d, newTopics:%d, removeTopics:%d",
849
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
850
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
851
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
852
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
853
  }
854

855
  taosWUnLockLatch(&pOldConsumer->lock);
1,660✔
856
  return 0;
1,660✔
857
}
858

859
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
18,878✔
860
  SSdb           *pSdb = pMnode->pSdb;
18,878✔
861
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
18,878✔
862
  if (*pConsumer == NULL) {
18,879✔
863
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
735✔
864
  }
865
  return 0;
18,144✔
866
}
867

868
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
22,958✔
869
  SSdb *pSdb = pMnode->pSdb;
22,958✔
870
  sdbRelease(pSdb, pConsumer);
22,958✔
871
}
22,958✔
872

873
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
87✔
874
  SMnode         *pMnode = pReq->info.node;
87✔
875
  SSdb           *pSdb = pMnode->pSdb;
87✔
876
  int32_t         numOfRows = 0;
87✔
877
  SMqConsumerObj *pConsumer = NULL;
87✔
878
  int32_t         code = 0;
87✔
879
  char           *parasStr = NULL;
87✔
880
  char           *status = NULL;
87✔
881

882
  while (numOfRows < rowsCapacity) {
264!
883
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
264✔
884
    if (pShow->pIter == NULL) {
264✔
885
      break;
87✔
886
    }
887

888
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
177✔
889
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
4!
890
      sdbRelease(pSdb, pConsumer);
4✔
891
      continue;
4✔
892
    }
893

894
    taosRLockLatch(&pConsumer->lock);
173✔
895
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
173!
896

897
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
173✔
898
    bool    hasTopic = true;
173✔
899
    if (topicSz == 0) {
173!
900
      hasTopic = false;
×
901
      topicSz = 1;
×
902
    }
903

904
    if (numOfRows + topicSz > rowsCapacity) {
173!
905
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
906
    }
907

908
    for (int32_t i = 0; i < topicSz; i++) {
360✔
909
      SColumnInfoData *pColInfo = NULL;
187✔
910
      int32_t          cols = 0;
187✔
911

912
      // consumer id
913
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
914
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
187✔
915
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
187✔
916

917
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
918
      MND_TMQ_NULL_CHECK(pColInfo);
187!
919
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
187!
920

921
      // consumer group
922
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
923
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
187✔
924

925
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
926
      MND_TMQ_NULL_CHECK(pColInfo);
187!
927
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
187!
928

929
      // client id
930
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
931
      STR_TO_VARSTR(clientId, pConsumer->clientId);
187✔
932

933
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
934
      MND_TMQ_NULL_CHECK(pColInfo);
187!
935
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
187!
936

937
      // user
938
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
939
      STR_TO_VARSTR(user, pConsumer->user);
187✔
940

941
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
942
      MND_TMQ_NULL_CHECK(pColInfo);
187!
943
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
187!
944

945
      // fqdn
946
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
947
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
187✔
948

949
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
950
      MND_TMQ_NULL_CHECK(pColInfo);
187!
951
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
187!
952

953
      // status
954
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
187✔
955
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
187✔
956
      MND_TMQ_NULL_CHECK(status);
187!
957
      STR_TO_VARSTR(status, pStatusName);
187✔
958

959
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
960
      MND_TMQ_NULL_CHECK(pColInfo);
187!
961
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
187!
962
      taosMemoryFreeClear(status);
187!
963

964
      // one subscribed topic
965
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
966
      MND_TMQ_NULL_CHECK(pColInfo);
187!
967
      if (hasTopic) {
187!
968
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
187✔
969
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
187✔
970
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
187✔
971
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
187!
972
      } else {
973
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
974
      }
975

976
      // up time
977
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
978
      MND_TMQ_NULL_CHECK(pColInfo);
187!
979
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
187!
980

981
      // subscribe time
982
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
983
      MND_TMQ_NULL_CHECK(pColInfo);
187!
984
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
187!
985

986
      // rebalance time
987
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
988
      MND_TMQ_NULL_CHECK(pColInfo);
187!
989
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
187!
990

991
      char         buf[TSDB_OFFSET_LEN] = {0};
187✔
992
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
187✔
993
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
187✔
994

995
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
187✔
996
      MND_TMQ_NULL_CHECK(parasStr);
187!
997
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
187✔
998
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
187✔
999
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
187✔
1000

1001
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
187✔
1002
      MND_TMQ_NULL_CHECK(pColInfo);
187!
1003
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
187!
1004
      taosMemoryFreeClear(parasStr);
187!
1005
      numOfRows++;
187✔
1006
    }
1007

1008
    taosRUnLockLatch(&pConsumer->lock);
173✔
1009
    sdbRelease(pSdb, pConsumer);
173✔
1010

1011
    pBlock->info.rows = numOfRows;
173✔
1012
  }
1013

1014
  pShow->numOfRows += numOfRows;
87✔
1015
  return numOfRows;
87✔
1016

1017
END:
×
1018
  taosMemoryFreeClear(status);
×
1019
  taosMemoryFreeClear(parasStr);
×
1020
  return code;
×
1021
}
1022

1023
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1024
  SSdb *pSdb = pMnode->pSdb;
×
1025
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1026
}
×
1027

1028
const char *mndConsumerStatusName(int status) {
13,607✔
1029
  switch (status) {
13,607!
1030
    case MQ_CONSUMER_STATUS_READY:
5,652✔
1031
      return "ready";
5,652✔
1032
//    case MQ_CONSUMER_STATUS_LOST:
1033
//      return "lost";
1034
    case MQ_CONSUMER_STATUS_REBALANCE:
7,955✔
1035
      return "rebalancing";
7,955✔
1036
    default:
×
1037
      return "unknown";
×
1038
  }
1039
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc