• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.84
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
13✔
45
  SSdbTable table = {
13✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
13!
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
13✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
13✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
13✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
13✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
13✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
13✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
13✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
13✔
70

UNCOV
71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
×
UNCOV
72
  if (pMnode == NULL || info == NULL) {
×
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
UNCOV
75
  int32_t code = 0;
×
UNCOV
76
  void   *msg  = rpcMallocCont(sizeof(int64_t));
×
UNCOV
77
  MND_TMQ_NULL_CHECK(msg);
×
78

UNCOV
79
  *(int64_t*)msg = consumerId;
×
UNCOV
80
  SRpcMsg rpcMsg = {
×
81
      .msgType = msgType,
82
      .pCont = msg,
83
      .contLen = sizeof(int64_t),
84
      .info = *info,
85
  };
86

UNCOV
87
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
×
UNCOV
88
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
×
UNCOV
89
  return code;
×
90

91
END:
×
92
  taosMemoryFree(msg);
×
93
  return code;
×
94
}
95

UNCOV
96
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) {
×
UNCOV
97
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || pUser == NULL) {
×
98
    return TSDB_CODE_INVALID_PARA;
×
99
  }
UNCOV
100
  SMqTopicObj *pTopic = NULL;
×
UNCOV
101
  int32_t      code = 0;
×
102

UNCOV
103
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
×
UNCOV
104
  for (int32_t i = 0; i < numOfTopics; i++) {
×
UNCOV
105
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
×
UNCOV
106
    MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
×
UNCOV
107
    MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic));
×
UNCOV
108
    MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
×
109

UNCOV
110
    if (subscribe->enableReplay) {
×
UNCOV
111
      if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
×
UNCOV
112
        code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
×
UNCOV
113
        goto END;
×
UNCOV
114
      } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
×
UNCOV
115
        SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
×
UNCOV
116
        if (pDb == NULL) {
×
117
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
          goto END;
×
119
        }
UNCOV
120
        if (pDb->cfg.numOfVgroups != 1) {
×
UNCOV
121
          mndReleaseDb(pMnode, pDb);
×
UNCOV
122
          code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
×
UNCOV
123
          goto END;
×
124
        }
UNCOV
125
        mndReleaseDb(pMnode, pDb);
×
126
      }
127
    }
UNCOV
128
    char  key[TSDB_CONSUMER_ID_LEN] = {0};
×
UNCOV
129
    (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
×
UNCOV
130
    mndTransSetDbName(pTrans, pTopic->db, key);
×
UNCOV
131
    MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
UNCOV
132
    mndReleaseTopic(pMnode, pTopic);
×
133
  }
UNCOV
134
  return 0;
×
135

UNCOV
136
END:
×
UNCOV
137
  mndReleaseTopic(pMnode, pTopic);
×
UNCOV
138
  return code;
×
139
}
140

UNCOV
141
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
×
UNCOV
142
  if (pMsg == NULL || pMsg->pCont == NULL) {
×
143
    return TSDB_CODE_INVALID_PARA;
×
144
  }
UNCOV
145
  int32_t              code = 0;
×
UNCOV
146
  SMnode              *pMnode = pMsg->info.node;
×
UNCOV
147
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
×
UNCOV
148
  SMqConsumerObj      *pConsumerNew = NULL;
×
UNCOV
149
  STrans              *pTrans = NULL;
×
UNCOV
150
  SMqConsumerObj      *pConsumer = NULL;
×
151

UNCOV
152
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
×
UNCOV
153
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
×
154
        mndConsumerStatusName(pConsumer->status));
155

UNCOV
156
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
×
UNCOV
157
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
×
UNCOV
158
  MND_TMQ_NULL_CHECK(pTrans);
×
UNCOV
159
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
×
UNCOV
160
  code = mndTransPrepare(pMnode, pTrans);
×
161

UNCOV
162
END:
×
UNCOV
163
  mndReleaseConsumer(pMnode, pConsumer);
×
UNCOV
164
  tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
165
  mndTransDrop(pTrans);
×
UNCOV
166
  return code;
×
167
}
168

UNCOV
169
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
×
UNCOV
170
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
×
171
    return TSDB_CODE_INVALID_PARA;
×
172
  }
UNCOV
173
  int32_t code = 0;
×
UNCOV
174
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
×
UNCOV
175
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
×
UNCOV
176
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
×
UNCOV
177
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
178
    SMqTopicObj *pTopic = NULL;
×
UNCOV
179
    code = mndAcquireTopic(pMnode, topic, &pTopic);
×
UNCOV
180
    if (code != TDB_CODE_SUCCESS) {
×
181
      continue;
×
182
    }
UNCOV
183
    STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
×
UNCOV
184
    MND_TMQ_NULL_CHECK(data);
×
UNCOV
185
    tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
×
UNCOV
186
    if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
×
UNCOV
187
        grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
×
188
      data->noPrivilege = 1;
×
189
    } else {
UNCOV
190
      data->noPrivilege = 0;
×
191
    }
UNCOV
192
    mndReleaseTopic(pMnode, pTopic);
×
193
  }
UNCOV
194
END:
×
UNCOV
195
  return code;
×
196
}
197

UNCOV
198
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
×
UNCOV
199
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
×
200
    return;
×
201
  }
UNCOV
202
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
×
UNCOV
203
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
×
UNCOV
204
    if (data == NULL){
×
205
      continue;
×
206
    }
UNCOV
207
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
×
208

UNCOV
209
    SMqSubscribeObj *pSub = NULL;
×
UNCOV
210
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
211
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
×
UNCOV
212
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
UNCOV
213
    if (code != 0) {
×
214
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
215
      continue;
×
216
    }
UNCOV
217
    taosWLockLatch(&pSub->lock);
×
UNCOV
218
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
×
UNCOV
219
    if (pConsumerEp) {
×
UNCOV
220
      taosArrayDestroy(pConsumerEp->offsetRows);
×
UNCOV
221
      pConsumerEp->offsetRows = data->offsetRows;
×
UNCOV
222
      data->offsetRows = NULL;
×
223
    }
UNCOV
224
    taosWUnLockLatch(&pSub->lock);
×
225

UNCOV
226
    mndReleaseSubscribe(pMnode, pSub);
×
227
  }
228
}
229

UNCOV
230
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
×
UNCOV
231
  if (pMsg == NULL || rsp == NULL){
×
232
    return TSDB_CODE_INVALID_PARA;
×
233
  }
UNCOV
234
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
×
UNCOV
235
  if (tlen <= 0){
×
236
    return TSDB_CODE_TMQ_INVALID_MSG;
×
237
  }
UNCOV
238
  void   *buf = rpcMallocCont(tlen);
×
UNCOV
239
  if (buf == NULL) {
×
240
    return terrno;
×
241
  }
242

UNCOV
243
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
×
244
    rpcFreeCont(buf);
×
245
    return TSDB_CODE_TMQ_INVALID_MSG;
×
246
  }
UNCOV
247
  pMsg->info.rsp = buf;
×
UNCOV
248
  pMsg->info.rspLen = tlen;
×
UNCOV
249
  return 0;
×
250
}
251

UNCOV
252
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
×
UNCOV
253
  if (pMsg == NULL) {
×
254
    return TSDB_CODE_INVALID_PARA;
×
255
  }
UNCOV
256
  int32_t         code = 0;
×
UNCOV
257
  SMnode         *pMnode = pMsg->info.node;
×
UNCOV
258
  SMqHbReq        req = {0};
×
UNCOV
259
  SMqHbRsp        rsp = {0};
×
UNCOV
260
  SMqConsumerObj *pConsumer = NULL;
×
UNCOV
261
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
×
UNCOV
262
  int64_t consumerId = req.consumerId;
×
UNCOV
263
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
×
UNCOV
264
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
×
UNCOV
265
  atomic_store_32(&pConsumer->hbStatus, 0);
×
UNCOV
266
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus);
×
UNCOV
267
  if (req.pollFlag == 1){
×
UNCOV
268
    atomic_store_32(&pConsumer->pollStatus, 0);
×
269
  }
270

UNCOV
271
  storeOffsetRows(pMnode, &req, pConsumer);
×
UNCOV
272
  rsp.debugFlag = tqClientDebugFlag;
×
UNCOV
273
  code = buildMqHbRsp(pMsg, &rsp);
×
274

UNCOV
275
END:
×
UNCOV
276
  tDestroySMqHbRsp(&rsp);
×
UNCOV
277
  mndReleaseConsumer(pMnode, pConsumer);
×
UNCOV
278
  tDestroySMqHbReq(&req);
×
UNCOV
279
  return code;
×
280
}
281

UNCOV
282
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
×
UNCOV
283
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
×
284
    return TSDB_CODE_INVALID_PARA;
×
285
  }
UNCOV
286
  taosRLockLatch(&pConsumer->lock);
×
287

UNCOV
288
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
×
289

UNCOV
290
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
×
UNCOV
291
  if (rsp->topics == NULL) {
×
292
    taosRUnLockLatch(&pConsumer->lock);
×
293
    return terrno;
×
294
  }
295

296
  // handle all topics subscribed by this consumer
UNCOV
297
  for (int32_t i = 0; i < numOfTopics; i++) {
×
UNCOV
298
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
299
    SMqSubscribeObj *pSub = NULL;
×
UNCOV
300
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
×
UNCOV
301
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
×
UNCOV
302
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
×
UNCOV
303
    if (code != 0) {
×
304
      continue;
×
305
    }
UNCOV
306
    taosRLockLatch(&pSub->lock);
×
307

UNCOV
308
    SMqSubTopicEp topicEp = {0};
×
UNCOV
309
    tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
×
310

311
    // 2.1 fetch topic schema
UNCOV
312
    SMqTopicObj *pTopic = NULL;
×
UNCOV
313
    code = mndAcquireTopic(pMnode, topic, &pTopic);
×
UNCOV
314
    if (code != TDB_CODE_SUCCESS) {
×
315
      taosRUnLockLatch(&pSub->lock);
×
316
      mndReleaseSubscribe(pMnode, pSub);
×
317
      continue;
×
318
    }
UNCOV
319
    taosRLockLatch(&pTopic->lock);
×
UNCOV
320
    tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
×
UNCOV
321
    topicEp.schema.nCols = pTopic->schema.nCols;
×
UNCOV
322
    if (topicEp.schema.nCols) {
×
UNCOV
323
      topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
×
UNCOV
324
      if (topicEp.schema.pSchema == NULL) {
×
325
        taosRUnLockLatch(&pTopic->lock);
×
326
        taosRUnLockLatch(&pSub->lock);
×
327
        mndReleaseSubscribe(pMnode, pSub);
×
328
        mndReleaseTopic(pMnode, pTopic);
×
329
        return terrno;
×
330
      }
UNCOV
331
      (void)memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
×
332
    }
UNCOV
333
    taosRUnLockLatch(&pTopic->lock);
×
UNCOV
334
    mndReleaseTopic(pMnode, pTopic);
×
335

336
    // 2.2 iterate all vg assigned to the consumer of that topic
UNCOV
337
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
×
UNCOV
338
    if (pConsumerEp == NULL) {
×
339
      taosRUnLockLatch(&pConsumer->lock);
×
340
      taosRUnLockLatch(&pSub->lock);
×
341
      mndReleaseSubscribe(pMnode, pSub);
×
342
      return TSDB_CODE_OUT_OF_MEMORY;
×
343
    }
UNCOV
344
    int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
×
UNCOV
345
    topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
×
UNCOV
346
    if (topicEp.vgs == NULL) {
×
347
      taosRUnLockLatch(&pConsumer->lock);
×
348
      taosRUnLockLatch(&pSub->lock);
×
349
      mndReleaseSubscribe(pMnode, pSub);
×
350
      return terrno;
×
351
    }
352

UNCOV
353
    for (int32_t j = 0; j < vgNum; j++) {
×
UNCOV
354
      SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
×
UNCOV
355
      if (pVgEp == NULL) {
×
356
        continue;
×
357
      }
UNCOV
358
      if (epoch == -1) {
×
UNCOV
359
        SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
×
UNCOV
360
        if (pVgroup) {
×
UNCOV
361
          pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
362
          mndReleaseVgroup(pMnode, pVgroup);
×
363
        }
364
      }
UNCOV
365
      SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
×
UNCOV
366
      if (taosArrayPush(topicEp.vgs, &vgEp) == NULL) {
×
367
        taosArrayDestroy(topicEp.vgs);
×
368
        taosRUnLockLatch(&pConsumer->lock);
×
369
        taosRUnLockLatch(&pSub->lock);
×
370
        mndReleaseSubscribe(pMnode, pSub);
×
371
        return terrno;
×
372
      }
373
    }
UNCOV
374
    if (taosArrayPush(rsp->topics, &topicEp) == NULL) {
×
375
      taosArrayDestroy(topicEp.vgs);
×
376
      taosRUnLockLatch(&pConsumer->lock);
×
377
      taosRUnLockLatch(&pSub->lock);
×
378
      mndReleaseSubscribe(pMnode, pSub);
×
379
      return terrno;
×
380
    }
UNCOV
381
    taosRUnLockLatch(&pSub->lock);
×
UNCOV
382
    mndReleaseSubscribe(pMnode, pSub);
×
383
  }
UNCOV
384
  taosRUnLockLatch(&pConsumer->lock);
×
UNCOV
385
  return 0;
×
386
}
387

UNCOV
388
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
×
UNCOV
389
  if (pMsg == NULL || rsp == NULL) {
×
390
    return TSDB_CODE_INVALID_PARA;
×
391
  }
UNCOV
392
  int32_t code = 0;
×
393
  // encode rsp
UNCOV
394
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
×
UNCOV
395
  void   *buf = rpcMallocCont(tlen);
×
UNCOV
396
  if (buf == NULL) {
×
397
    return terrno;
×
398
  }
399

UNCOV
400
  SMqRspHead *pHead = buf;
×
401

UNCOV
402
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
×
UNCOV
403
  pHead->epoch = serverEpoch;
×
UNCOV
404
  pHead->consumerId = consumerId;
×
UNCOV
405
  pHead->walsver = 0;
×
UNCOV
406
  pHead->walever = 0;
×
407

UNCOV
408
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
×
UNCOV
409
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
×
410
    rpcFreeCont(buf);
×
411
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
412
  }
413

414
  // send rsp
UNCOV
415
  pMsg->info.rsp = buf;
×
UNCOV
416
  pMsg->info.rspLen = tlen;
×
UNCOV
417
  return code;
×
418
}
419

UNCOV
420
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
×
UNCOV
421
  if (pMsg == NULL) {
×
422
    return TSDB_CODE_INVALID_PARA;
×
423
  }
UNCOV
424
  SMnode     *pMnode = pMsg->info.node;
×
UNCOV
425
  SMqAskEpReq req = {0};
×
UNCOV
426
  SMqAskEpRsp rsp = {0};
×
UNCOV
427
  int32_t     code = 0;
×
UNCOV
428
  SMqConsumerObj *pConsumer = NULL;
×
429

UNCOV
430
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
×
UNCOV
431
  int64_t consumerId = req.consumerId;
×
UNCOV
432
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
×
UNCOV
433
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
×
434
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
435
           pConsumer->cgroup);
436
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
437
    goto END;
×
438
  }
439

440
  // 1. check consumer status
UNCOV
441
  int32_t status = atomic_load_32(&pConsumer->status);
×
UNCOV
442
  if (status != MQ_CONSUMER_STATUS_READY) {
×
UNCOV
443
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
×
UNCOV
444
    code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
UNCOV
445
    goto END;
×
446
  }
447

UNCOV
448
  int32_t epoch = req.epoch;
×
UNCOV
449
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
×
450

451
  // 2. check epoch, only send ep info when epochs do not match
UNCOV
452
  if (epoch != serverEpoch) {
×
UNCOV
453
    mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
×
454
          consumerId, epoch, serverEpoch);
UNCOV
455
    MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
×
456
  }
457

UNCOV
458
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
×
459

UNCOV
460
END:
×
461
  tDeleteSMqAskEpRsp(&rsp);
UNCOV
462
  mndReleaseConsumer(pMnode, pConsumer);
×
UNCOV
463
  return code;
×
464
}
465

UNCOV
466
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
×
UNCOV
467
  if (pConsumer == NULL || pTrans == NULL) {
×
468
    return TSDB_CODE_INVALID_PARA;
×
469
  }
UNCOV
470
  int32_t  code = 0;
×
UNCOV
471
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
×
UNCOV
472
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
UNCOV
473
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
UNCOV
474
  if (code != 0) {
×
475
    sdbFreeRaw(pCommitRaw);
×
476
    goto END;
×
477
  }
UNCOV
478
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
UNCOV
479
END:
×
UNCOV
480
  return code;
×
481
}
482

UNCOV
483
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
×
UNCOV
484
  if (pConsumer == NULL || pTrans == NULL) {
×
485
    return TSDB_CODE_INVALID_PARA;
×
486
  }
UNCOV
487
  int32_t  code = 0;
×
UNCOV
488
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
×
UNCOV
489
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
UNCOV
490
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
UNCOV
491
  if (code != 0) {
×
492
    sdbFreeRaw(pCommitRaw);
×
493
    goto END;
×
494
  }
UNCOV
495
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
UNCOV
496
END:
×
UNCOV
497
  return code;
×
498
}
499

UNCOV
500
static void freeItem(void *param) {
×
UNCOV
501
  if (param == NULL) {
×
502
    return;
×
503
  }
UNCOV
504
  void *pItem = *(void **)param;
×
UNCOV
505
  if (pItem != NULL) {
×
UNCOV
506
    taosMemoryFree(pItem);
×
507
  }
508
}
509

510
#define ADD_TOPIC_TO_ARRAY(element, array) \
511
char *newTopicCopy = taosStrdup(element); \
512
MND_TMQ_NULL_CHECK(newTopicCopy);\
513
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
514
  taosMemoryFree(newTopicCopy);\
515
  code = terrno;\
516
  goto END;\
517
}
518

UNCOV
519
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
×
UNCOV
520
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
×
521
    return TSDB_CODE_INVALID_PARA;
×
522
  }
UNCOV
523
  int32_t code = 0;
×
UNCOV
524
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
525
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
×
UNCOV
526
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
×
UNCOV
527
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
×
528

UNCOV
529
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
×
UNCOV
530
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
×
UNCOV
531
  int32_t i = 0, j = 0;
×
UNCOV
532
  while (i < oldTopicNum || j < newTopicNum) {
×
UNCOV
533
    if (i >= oldTopicNum) {
×
UNCOV
534
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
×
UNCOV
535
      MND_TMQ_NULL_CHECK(tmp);
×
UNCOV
536
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
×
UNCOV
537
      j++;
×
UNCOV
538
      continue;
×
UNCOV
539
    } else if (j >= newTopicNum) {
×
UNCOV
540
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
×
UNCOV
541
      MND_TMQ_NULL_CHECK(tmp);
×
UNCOV
542
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
×
UNCOV
543
      i++;
×
UNCOV
544
      continue;
×
545
    } else {
UNCOV
546
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
×
UNCOV
547
      MND_TMQ_NULL_CHECK(oldTopic);
×
UNCOV
548
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
×
UNCOV
549
      MND_TMQ_NULL_CHECK(newTopic);
×
UNCOV
550
      int   comp = strcmp(oldTopic, newTopic);
×
UNCOV
551
      if (comp == 0) {
×
UNCOV
552
        i++;
×
UNCOV
553
        j++;
×
UNCOV
554
        continue;
×
555
      } else if (comp < 0) {
×
556
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
557
        i++;
×
558
        continue;
×
559
      } else {
×
560
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
561
        j++;
×
562
        continue;
×
563
      }
564
    }
565
  }
566
  // no topics need to be rebalanced
UNCOV
567
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
×
UNCOV
568
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
×
569
  }
570

UNCOV
571
END:
×
UNCOV
572
  return code;
×
573
}
574

UNCOV
575
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
×
UNCOV
576
  if (pTopicList == NULL || pMnode == NULL) {
×
577
    return TSDB_CODE_INVALID_PARA;
×
578
  }
UNCOV
579
  taosArraySort(pTopicList, taosArrayCompareString);
×
UNCOV
580
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
×
581

UNCOV
582
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
×
UNCOV
583
  for (int i = 0; i < newTopicNum; i++) {
×
UNCOV
584
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
×
UNCOV
585
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
×
586
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
×
587
    }
588
  }
UNCOV
589
  return 0;
×
590
}
591

UNCOV
592
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
×
UNCOV
593
  if (pMnode == NULL || subscribe == NULL) {
×
594
    return TSDB_CODE_INVALID_PARA;
×
595
  }
UNCOV
596
  int64_t         consumerId = subscribe->consumerId;
×
UNCOV
597
  char           *cgroup     = subscribe->cgroup;
×
UNCOV
598
  SMqConsumerObj *pConsumerNew     = NULL;
×
UNCOV
599
  SMqConsumerObj *pExistedConsumer = NULL;
×
UNCOV
600
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
×
UNCOV
601
  if (code != 0) {
×
UNCOV
602
    mInfo("receive subscribe request from new consumer:0x%" PRIx64
×
603
              ",cgroup:%s, numOfTopics:%d", consumerId,
604
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
605

UNCOV
606
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
×
607
  } else {
UNCOV
608
    int32_t status = atomic_load_32(&pExistedConsumer->status);
×
609

UNCOV
610
    mInfo("receive subscribe request from existed consumer:0x%" PRIx64
×
611
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
612
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
613
          (int32_t)taosArrayGetSize(subscribe->topicNames));
614

UNCOV
615
    if (status != MQ_CONSUMER_STATUS_READY) {
×
616
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
617
      goto END;
×
618
    }
UNCOV
619
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
×
UNCOV
620
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
×
621
  }
UNCOV
622
  mndReleaseConsumer(pMnode, pExistedConsumer);
×
UNCOV
623
  if (ppConsumer){
×
UNCOV
624
    *ppConsumer = pConsumerNew;
×
625
  }
UNCOV
626
  return code;
×
627

UNCOV
628
END:
×
UNCOV
629
  mndReleaseConsumer(pMnode, pExistedConsumer);
×
UNCOV
630
  tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
631
  return code;
×
632
}
633

UNCOV
634
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
×
UNCOV
635
  if (pMsg == NULL) {
×
636
    return TSDB_CODE_INVALID_PARA;
×
637
  }
UNCOV
638
  SMnode *pMnode = pMsg->info.node;
×
UNCOV
639
  char   *msgStr = pMsg->pCont;
×
UNCOV
640
  int32_t code = 0;
×
UNCOV
641
  SMqConsumerObj *pConsumerNew = NULL;
×
UNCOV
642
  STrans         *pTrans = NULL;
×
643

UNCOV
644
  SCMSubscribeReq subscribe = {0};
×
UNCOV
645
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
×
UNCOV
646
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
×
UNCOV
647
  if(unSubscribe){
×
UNCOV
648
    SMqConsumerObj *pConsumerTmp = NULL;
×
UNCOV
649
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
×
UNCOV
650
    if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
×
UNCOV
651
      mndReleaseConsumer(pMnode, pConsumerTmp);
×
UNCOV
652
      goto END;
×
653
    }
UNCOV
654
    mndReleaseConsumer(pMnode, pConsumerTmp);
×
655
  }
UNCOV
656
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
×
UNCOV
657
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
×
658
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
659
                          pMsg, "subscribe");
UNCOV
660
  MND_TMQ_NULL_CHECK(pTrans);
×
661

UNCOV
662
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));
×
UNCOV
663
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
×
UNCOV
664
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
×
UNCOV
665
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
UNCOV
666
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
667

UNCOV
668
END:
×
UNCOV
669
  mndTransDrop(pTrans);
×
UNCOV
670
  tDeleteSMqConsumerObj(pConsumerNew);
×
UNCOV
671
  taosArrayDestroyP(subscribe.topicNames, NULL);
×
UNCOV
672
  return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
×
673
}
674

UNCOV
675
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
×
UNCOV
676
  if (pConsumer == NULL) {
×
677
    return NULL;
×
678
  }
UNCOV
679
  int32_t code = 0;
×
UNCOV
680
  int32_t lino = 0;
×
UNCOV
681
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
682

UNCOV
683
  void   *buf = NULL;
×
UNCOV
684
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
×
UNCOV
685
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
×
686

UNCOV
687
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
×
UNCOV
688
  if (pRaw == NULL) goto CM_ENCODE_OVER;
×
689

UNCOV
690
  buf = taosMemoryMalloc(tlen);
×
UNCOV
691
  if (buf == NULL) goto CM_ENCODE_OVER;
×
692

UNCOV
693
  void *abuf = buf;
×
UNCOV
694
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
×
695
    goto CM_ENCODE_OVER;
×
696
  }
697

UNCOV
698
  int32_t dataPos = 0;
×
UNCOV
699
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
×
UNCOV
700
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
×
UNCOV
701
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
×
UNCOV
702
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
×
703

UNCOV
704
  terrno = TSDB_CODE_SUCCESS;
×
705

UNCOV
706
CM_ENCODE_OVER:
×
UNCOV
707
  taosMemoryFreeClear(buf);
×
UNCOV
708
  if (terrno != 0) {
×
709
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
710
    sdbFreeRaw(pRaw);
×
711
    return NULL;
×
712
  }
713

UNCOV
714
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
×
UNCOV
715
  return pRaw;
×
716
}
717

UNCOV
718
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
×
UNCOV
719
  if (pRaw == NULL) {
×
720
    return NULL;
×
721
  }
UNCOV
722
  int32_t         code = 0;
×
UNCOV
723
  int32_t         lino = 0;
×
UNCOV
724
  SSdbRow        *pRow = NULL;
×
UNCOV
725
  SMqConsumerObj *pConsumer = NULL;
×
UNCOV
726
  void           *buf = NULL;
×
727

UNCOV
728
  terrno = 0;
×
UNCOV
729
  int8_t sver = 0;
×
UNCOV
730
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
731
    goto CM_DECODE_OVER;
×
732
  }
733

UNCOV
734
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
×
735
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
736
    goto CM_DECODE_OVER;
×
737
  }
738

UNCOV
739
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
×
UNCOV
740
  if (pRow == NULL) {
×
741
    goto CM_DECODE_OVER;
×
742
  }
743

UNCOV
744
  pConsumer = sdbGetRowObj(pRow);
×
UNCOV
745
  if (pConsumer == NULL) {
×
746
    goto CM_DECODE_OVER;
×
747
  }
748

UNCOV
749
  int32_t dataPos = 0;
×
750
  int32_t len;
UNCOV
751
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
×
UNCOV
752
  buf = taosMemoryMalloc(len);
×
UNCOV
753
  if (buf == NULL) {
×
754
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
755
    goto CM_DECODE_OVER;
×
756
  }
757

UNCOV
758
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
×
UNCOV
759
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
×
760

UNCOV
761
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
×
762
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
763
    goto CM_DECODE_OVER;
×
764
  }
765

UNCOV
766
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
×
767

UNCOV
768
CM_DECODE_OVER:
×
UNCOV
769
  taosMemoryFreeClear(buf);
×
UNCOV
770
  if (terrno != TSDB_CODE_SUCCESS) {
×
771
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
772
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
773
    taosMemoryFreeClear(pRow);
×
774
  }
775

UNCOV
776
  return pRow;
×
777
}
778

UNCOV
779
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
×
UNCOV
780
  if (pConsumer == NULL) {
×
781
    return TSDB_CODE_INVALID_PARA;
×
782
  }
UNCOV
783
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
×
784
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
UNCOV
785
  pConsumer->subscribeTime = pConsumer->createTime;
×
UNCOV
786
  return 0;
×
787
}
788

UNCOV
789
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
×
UNCOV
790
  if (pConsumer == NULL) {
×
791
    return TSDB_CODE_INVALID_PARA;
×
792
  }
UNCOV
793
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
×
794
        mndConsumerStatusName(pConsumer->status));
UNCOV
795
  tClearSMqConsumerObj(pConsumer);
×
UNCOV
796
  return 0;
×
797
}
798

799
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
800
//  int32_t status = pConsumer->status;
801
//
802
//  if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
803
//    if (status == MQ_CONSUMER_STATUS_REBALANCE) {
804
//      pConsumer->status = MQ_CONSUMER_STATUS_READY;
805
//    } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
806
//      pConsumer->status = MQ_CONSUMER_STATUS_LOST;
807
//    }
808
//  }
809
//}
810

811
// remove from topic list
UNCOV
812
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
×
UNCOV
813
  if (topicList == NULL || pTopic == NULL) {
×
814
    return;
×
815
  }
UNCOV
816
  int32_t size = taosArrayGetSize(topicList);
×
UNCOV
817
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
818
    char *p = taosArrayGetP(topicList, i);
×
UNCOV
819
    if (strcmp(pTopic, p) == 0) {
×
UNCOV
820
      taosArrayRemove(topicList, i);
×
UNCOV
821
      taosMemoryFree(p);
×
822

UNCOV
823
      mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
×
824
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
UNCOV
825
      break;
×
826
    }
827
  }
828
}
829

UNCOV
830
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
×
UNCOV
831
  if (pConsumer == NULL || pTopic == NULL) {
×
832
    return false;
×
833
  }
UNCOV
834
  bool    existing = false;
×
UNCOV
835
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
×
UNCOV
836
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
837
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
×
UNCOV
838
    if (topic && strcmp(topic, pTopic) == 0) {
×
839
      existing = true;
×
840
      break;
×
841
    }
842
  }
843

UNCOV
844
  return existing;
×
845
}
846

UNCOV
847
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
×
UNCOV
848
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
×
849
    return TSDB_CODE_INVALID_PARA;
×
850
  }
UNCOV
851
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
×
852
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
853

UNCOV
854
  taosWLockLatch(&pOldConsumer->lock);
×
855

UNCOV
856
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
×
UNCOV
857
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
×
UNCOV
858
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
×
UNCOV
859
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
×
860

UNCOV
861
    pOldConsumer->subscribeTime = taosGetTimestampMs();
×
UNCOV
862
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
×
UNCOV
863
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
×
UNCOV
864
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
×
UNCOV
865
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
×
UNCOV
866
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
×
UNCOV
867
    mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
×
UNCOV
868
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
×
UNCOV
869
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
×
UNCOV
870
    if (tmp == NULL){
×
871
      return TSDB_CODE_TMQ_INVALID_MSG;
×
872
    }
UNCOV
873
    char *pNewTopic = taosStrdup(tmp);
×
UNCOV
874
    if (pNewTopic == NULL) {
×
875
      return terrno;
×
876
    }
UNCOV
877
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
×
UNCOV
878
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
×
UNCOV
879
    if (existing) {
×
880
      mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
881
      taosMemoryFree(pNewTopic);
×
882
    } else {
UNCOV
883
      if (taosArrayPush(pOldConsumer->currentTopics, &pNewTopic) == NULL) {
×
884
        taosMemoryFree(pNewTopic);
×
885
        return TSDB_CODE_TMQ_INVALID_MSG;
×
886
      }
UNCOV
887
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
×
888
    }
889

UNCOV
890
    int32_t status = pOldConsumer->status;
×
891
//    updateConsumerStatus(pOldConsumer);
UNCOV
892
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
×
UNCOV
893
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
×
894
    }
895

UNCOV
896
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
×
UNCOV
897
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
×
898

UNCOV
899
    mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
×
900
          ", current topics:%d, newTopics:%d, removeTopics:%d",
901
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
902
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
903
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
904
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
905

UNCOV
906
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
×
UNCOV
907
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
×
UNCOV
908
    if (topic == NULL){
×
909
      return TSDB_CODE_TMQ_INVALID_MSG;
×
910
    }
UNCOV
911
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
×
UNCOV
912
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
×
913

UNCOV
914
    int32_t status = pOldConsumer->status;
×
915
//    updateConsumerStatus(pOldConsumer);
UNCOV
916
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
×
UNCOV
917
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
×
918
    }
UNCOV
919
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
×
UNCOV
920
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
×
921

UNCOV
922
    mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
×
923
          ", current topics:%d, newTopics:%d, removeTopics:%d",
924
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
925
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
926
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
927
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
928
  }
929

UNCOV
930
  taosWUnLockLatch(&pOldConsumer->lock);
×
UNCOV
931
  return 0;
×
932
}
933

UNCOV
934
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
×
UNCOV
935
  if (pMnode == NULL || pConsumer == NULL) {
×
936
    return TSDB_CODE_INVALID_PARA;
×
937
  }
UNCOV
938
  SSdb           *pSdb = pMnode->pSdb;
×
UNCOV
939
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
×
UNCOV
940
  if (*pConsumer == NULL) {
×
UNCOV
941
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
942
  }
UNCOV
943
  return 0;
×
944
}
945

UNCOV
946
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
×
UNCOV
947
  if (pMnode == NULL || pConsumer == NULL) {
×
UNCOV
948
    return;
×
949
  }
UNCOV
950
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
951
  sdbRelease(pSdb, pConsumer);
×
952
}
953

UNCOV
954
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
955
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
×
956
    return TSDB_CODE_INVALID_PARA;
×
957
  }
UNCOV
958
  SMnode         *pMnode = pReq->info.node;
×
UNCOV
959
  SSdb           *pSdb = pMnode->pSdb;
×
UNCOV
960
  int32_t         numOfRows = 0;
×
UNCOV
961
  SMqConsumerObj *pConsumer = NULL;
×
UNCOV
962
  int32_t         code = 0;
×
UNCOV
963
  char           *parasStr = NULL;
×
UNCOV
964
  char           *status = NULL;
×
965

UNCOV
966
  while (numOfRows < rowsCapacity) {
×
UNCOV
967
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
×
UNCOV
968
    if (pShow->pIter == NULL) {
×
UNCOV
969
      break;
×
970
    }
971

UNCOV
972
    if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
×
UNCOV
973
      mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
×
UNCOV
974
      sdbRelease(pSdb, pConsumer);
×
UNCOV
975
      continue;
×
976
    }
977

UNCOV
978
    taosRLockLatch(&pConsumer->lock);
×
UNCOV
979
    mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
×
980

UNCOV
981
    int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
×
UNCOV
982
    bool    hasTopic = true;
×
UNCOV
983
    if (topicSz == 0) {
×
984
      hasTopic = false;
×
985
      topicSz = 1;
×
986
    }
987

UNCOV
988
    if (numOfRows + topicSz > rowsCapacity) {
×
989
      MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, numOfRows + topicSz));
×
990
    }
991

UNCOV
992
    for (int32_t i = 0; i < topicSz; i++) {
×
UNCOV
993
      SColumnInfoData *pColInfo = NULL;
×
UNCOV
994
      int32_t          cols = 0;
×
995

996
      // consumer id
UNCOV
997
      char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
998
      (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
×
UNCOV
999
      varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
×
1000

UNCOV
1001
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1002
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1003
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
×
1004

1005
      // consumer group
UNCOV
1006
      char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1007
      STR_TO_VARSTR(cgroup, pConsumer->cgroup);
×
1008

UNCOV
1009
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1010
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1011
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
×
1012

1013
      // client id
UNCOV
1014
      char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1015
      STR_TO_VARSTR(clientId, pConsumer->clientId);
×
1016

UNCOV
1017
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1018
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1019
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
×
1020

1021
      // user
UNCOV
1022
      char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1023
      STR_TO_VARSTR(user, pConsumer->user);
×
1024

UNCOV
1025
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1026
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1027
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
×
1028

1029
      // fqdn
UNCOV
1030
      char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1031
      STR_TO_VARSTR(fqdn, pConsumer->fqdn);
×
1032

UNCOV
1033
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1034
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1035
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
×
1036

1037
      // status
UNCOV
1038
      const char *pStatusName = mndConsumerStatusName(pConsumer->status);
×
UNCOV
1039
      status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
×
UNCOV
1040
      MND_TMQ_NULL_CHECK(status);
×
UNCOV
1041
      STR_TO_VARSTR(status, pStatusName);
×
1042

UNCOV
1043
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1044
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1045
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
×
UNCOV
1046
      taosMemoryFreeClear(status);
×
1047

1048
      // one subscribed topic
UNCOV
1049
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1050
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1051
      if (hasTopic) {
×
UNCOV
1052
        char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1053
        mndTopicGetShowName(taosArrayGetP(pConsumer->assignedTopics, i), topic + VARSTR_HEADER_SIZE);
×
UNCOV
1054
        *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
×
UNCOV
1055
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
×
1056
      } else {
1057
        MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1058
      }
1059

1060
      // up time
UNCOV
1061
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1062
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1063
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
×
1064

1065
      // subscribe time
UNCOV
1066
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1067
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1068
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
×
1069

1070
      // rebalance time
UNCOV
1071
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1072
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1073
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
×
1074

UNCOV
1075
      char         buf[TSDB_OFFSET_LEN] = {0};
×
UNCOV
1076
      STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
×
UNCOV
1077
      tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
×
1078

UNCOV
1079
      parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
×
UNCOV
1080
      MND_TMQ_NULL_CHECK(parasStr);
×
UNCOV
1081
      (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
×
UNCOV
1082
              pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
×
UNCOV
1083
      varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
×
1084

UNCOV
1085
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1086
      MND_TMQ_NULL_CHECK(pColInfo);
×
UNCOV
1087
      MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
×
UNCOV
1088
      taosMemoryFreeClear(parasStr);
×
UNCOV
1089
      numOfRows++;
×
1090
    }
1091

UNCOV
1092
    taosRUnLockLatch(&pConsumer->lock);
×
UNCOV
1093
    sdbRelease(pSdb, pConsumer);
×
1094

UNCOV
1095
    pBlock->info.rows = numOfRows;
×
1096
  }
1097

UNCOV
1098
  pShow->numOfRows += numOfRows;
×
UNCOV
1099
  return numOfRows;
×
1100

1101
END:
×
1102
  taosMemoryFreeClear(status);
×
1103
  taosMemoryFreeClear(parasStr);
×
1104
  return code;
×
1105
}
1106

1107
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1108
  if (pMnode == NULL || pIter == NULL) return;
×
1109
  SSdb *pSdb = pMnode->pSdb;
×
1110
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1111
}
1112

UNCOV
1113
const char *mndConsumerStatusName(int status) {
×
UNCOV
1114
  switch (status) {
×
UNCOV
1115
    case MQ_CONSUMER_STATUS_READY:
×
UNCOV
1116
      return "ready";
×
1117
//    case MQ_CONSUMER_STATUS_LOST:
1118
//      return "lost";
UNCOV
1119
    case MQ_CONSUMER_STATUS_REBALANCE:
×
UNCOV
1120
      return "rebalancing";
×
1121
    default:
×
1122
      return "unknown";
×
1123
  }
1124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc