• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4922

09 Jan 2026 08:13AM UTC coverage: 65.161% (-0.4%) from 65.541%
#4922

push

travis-ci

web-flow
merge: from main to 3.0 branch #34232

33 of 56 new or added lines in 8 files covered. (58.93%)

2171 existing lines in 120 files now uncovered.

197632 of 303297 relevant lines covered (65.16%)

117870313.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.56
/source/dnode/mnode/impl/src/mndConsumer.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndConsumer.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndSubscribe.h"
22
#include "mndTopic.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tcompare.h"
26
#include "tname.h"
27

28
#define MND_CONSUMER_VER_NUMBER   3
29
#define MND_CONSUMER_RESERVE_SIZE 64
30

31
#define MND_MAX_GROUP_PER_TOPIC 100
32

33
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
34
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
35
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
36
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
37
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
38

39
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
40
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
41
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
42
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
43

44
int32_t mndInitConsumer(SMnode *pMnode) {
406,166✔
45
  SSdbTable table = {
406,166✔
46
      .sdbType = SDB_CONSUMER,
47
      .keyType = SDB_KEY_INT64,
48
      .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
49
      .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
50
      .insertFp = (SdbInsertFp)mndConsumerActionInsert,
51
      .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
52
      .deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
53
  };
54

55
  if (pMnode == NULL){
406,166✔
56
    return TSDB_CODE_INVALID_PARA;
×
57
  }
58
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
406,166✔
59
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
406,166✔
60
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
406,166✔
61
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
406,166✔
62

63
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
406,166✔
64
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
406,166✔
65

66
  return sdbSetTable(pMnode->pSdb, table);
406,166✔
67
}
68

69
void mndCleanupConsumer(SMnode *pMnode) {}
406,105✔
70

71
int32_t mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
104,129✔
72
  if (pMnode == NULL || info == NULL) {
104,129✔
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  int32_t code = 0;
104,129✔
76
  int32_t lino = 0;
104,129✔
77
  PRINT_LOG_START
104,129✔
78
  void   *msg  = rpcMallocCont(sizeof(int64_t));
104,129✔
79
  MND_TMQ_NULL_CHECK(msg);
104,129✔
80

81
  *(int64_t*)msg = consumerId;
104,129✔
82
  SRpcMsg rpcMsg = {
104,129✔
83
      .msgType = msgType,
84
      .pCont = msg,
85
      .contLen = sizeof(int64_t),
86
      .info = *info,
87
  };
88

89
  mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
104,129✔
90
  MND_TMQ_RETURN_CHECK(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
104,129✔
91

92
END:
104,129✔
93
  PRINT_LOG_END
104,129✔
94
  return code;
104,129✔
95
}
96

97
static int32_t validateOneTopic(STrans* pTrans,char *pOneTopic, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
156,685✔
98
  int32_t      code = 0;
156,685✔
99
  int32_t lino = 0;
156,685✔
100
  SMqTopicObj *pTopic = NULL;
156,685✔
101

102
  PRINT_LOG_START
156,685✔
103
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic));
156,685✔
104
  taosRLockLatch(&pTopic->lock);
155,273✔
105

106
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic));
155,273✔
107
  MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION));
155,112✔
108

109
  if (subscribe->enableReplay) {
155,112✔
110
    if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
1,897✔
111
      code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
542✔
112
      goto END;
542✔
113
    } 
114
    if (pTopic->stbName[0] != 0) {
1,355✔
115
      SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
1,084✔
116
      if (pDb == NULL) {
1,084✔
117
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
118
        goto END;
×
119
      }
120
      if (pDb->cfg.numOfVgroups != 1) {
1,084✔
121
        mndReleaseDb(pMnode, pDb);
271✔
122
        code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
271✔
123
        goto END;
271✔
124
      }
125
      mndReleaseDb(pMnode, pDb);
813✔
126
    }
127
  }
128
  char  key[TSDB_CONSUMER_ID_LEN] = {0};
154,299✔
129
  (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId);
154,299✔
130
  mndTransSetDbName(pTrans, pTopic->db, key);
154,299✔
131
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
154,299✔
132

133
END:
156,685✔
134
  PRINT_LOG_END
156,685✔
135
  if (pTopic != NULL) {
156,685✔
136
    taosRUnLockLatch(&pTopic->lock);
155,273✔
137
  }
138
  mndReleaseTopic(pMnode, pTopic);
156,685✔
139
  return code;
156,685✔
140
}
141

142
static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *user, const char* token) {
261,720✔
143
  if (pTrans == NULL || subscribe == NULL || pMnode == NULL || user == NULL) {
261,720✔
144
    return TSDB_CODE_INVALID_PARA;
×
145
  }
146
  int32_t      code = 0;
261,720✔
147
  int32_t lino = 0;
261,720✔
148

149
  PRINT_LOG_START
261,720✔
150
  int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames);
261,720✔
151
  for (int32_t i = 0; i < numOfTopics; i++) {
416,019✔
152
    char *pOneTopic = taosArrayGetP(subscribe->topicNames, i);
156,685✔
153
    MND_TMQ_RETURN_CHECK(validateOneTopic(pTrans, pOneTopic, subscribe, pMnode, user, token));
156,685✔
154
  }
155

156
END:
259,334✔
157
  PRINT_LOG_END
261,720✔
158
  return code;
261,720✔
159
}
160

161
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
104,129✔
162
  if (pMsg == NULL || pMsg->pCont == NULL) {
104,129✔
163
    return TSDB_CODE_INVALID_PARA;
×
164
  }
165
  int32_t              code = 0;
104,129✔
166
  int32_t              lino = 0;
104,129✔
167
  SMnode              *pMnode = pMsg->info.node;
104,129✔
168
  SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
104,129✔
169
  SMqConsumerObj      *pConsumerNew = NULL;
104,129✔
170
  STrans              *pTrans = NULL;
104,129✔
171
  SMqConsumerObj      *pConsumer = NULL;
104,129✔
172

173
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, pClearMsg->consumerId, &pConsumer));
104,129✔
174
  taosRLockLatch(&pConsumer->lock);
102,311✔
175
  mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
102,311✔
176
        mndConsumerStatusName(pConsumer->status));
177

178
  MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
102,311✔
179

180
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
102,311✔
181
  MND_TMQ_NULL_CHECK(pTrans);
102,311✔
182
  MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
102,311✔
183
  code = mndTransPrepare(pMnode, pTrans);
102,311✔
184

185
END:
104,129✔
186
  PRINT_LOG_END
104,129✔
187
  if (pConsumer != NULL) {
104,129✔
188
    taosRUnLockLatch(&pConsumer->lock);
102,311✔
189
  }
190
  mndReleaseConsumer(pMnode, pConsumer);
104,129✔
191
  tDeleteSMqConsumerObj(pConsumerNew);
104,129✔
192
  mndTransDrop(pTrans);
104,129✔
193
  return code;
104,129✔
194
}
195

196
static void checkOnePrivilege(const char* topic, SMnode *pMnode, SMqHbRsp *rsp, const char *user, const char* token) {
922,171✔
197
  int32_t code = 0;
922,171✔
198
  int32_t lino = 0;
922,171✔
199
  SMqTopicObj *pTopic = NULL;
922,171✔
200
  PRINT_LOG_START
922,171✔
201
  MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, topic, &pTopic));
922,171✔
202
  taosRLockLatch(&pTopic->lock);
922,171✔
203
  STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
922,171✔
204
  MND_TMQ_NULL_CHECK(data);
922,171✔
205
  tstrncpy(data->topic, topic, TSDB_TOPIC_FNAME_LEN);
922,171✔
206
  if (mndCheckTopicPrivilege(pMnode, user, token, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
1,843,859✔
207
      grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
921,688✔
208
    data->noPrivilege = 1;
483✔
209
  } else {
210
    data->noPrivilege = 0;
921,688✔
211
  }
212

213
END:
922,171✔
214
  PRINT_LOG_END
922,171✔
215
  if (pTopic != NULL) {
922,171✔
216
    taosRUnLockLatch(&pTopic->lock);
922,171✔
217
  }
218
  mndReleaseTopic(pMnode, pTopic);
922,171✔
219
}
922,171✔
220

221
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, const char *user, const char* token) {
1,006,806✔
222
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL || user == NULL) {
1,006,806✔
223
    return TSDB_CODE_INVALID_PARA;
×
224
  }
225
  int32_t code = 0;
1,006,806✔
226
  int32_t lino = 0;
1,006,806✔
227
  PRINT_LOG_START
1,006,806✔
228

229
  rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
1,006,806✔
230
  MND_TMQ_NULL_CHECK(rsp->topicPrivileges);
1,006,806✔
231
  for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
1,928,977✔
232
    char        *topic = taosArrayGetP(pConsumer->currentTopics, i);
922,171✔
233
    checkOnePrivilege(topic, pMnode, rsp, user, token);
922,171✔
234
  }
235

236
END:
1,006,806✔
237
  PRINT_LOG_END
1,006,806✔
238
  return code;
1,006,806✔
239
}
240

241
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
1,006,806✔
242
  if (pMnode == NULL || req == NULL || pConsumer == NULL){
1,006,806✔
243
    return;
×
244
  }
245
  for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
1,919,360✔
246
    TopicOffsetRows *data = taosArrayGet(req->topics, i);
912,554✔
247
    if (data == NULL){
912,554✔
248
      continue;
×
249
    }
250
    mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
912,554✔
251

252
    SMqSubscribeObj *pSub = NULL;
912,554✔
253
    char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
912,554✔
254
    (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, data->topicName);
912,554✔
255
    int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub);
912,554✔
256
    if (code != 0) {
912,554✔
257
      mError("failed to acquire subscribe by key:%s, code:%d", key, code);
×
258
      continue;
×
259
    }
260
    taosWLockLatch(&pSub->lock);
912,554✔
261
    SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
912,554✔
262
    if (pConsumerEp) {
912,554✔
263
      taosArrayDestroy(pConsumerEp->offsetRows);
900,357✔
264
      pConsumerEp->offsetRows = data->offsetRows;
900,357✔
265
      data->offsetRows = NULL;
900,357✔
266
    }
267
    taosWUnLockLatch(&pSub->lock);
912,554✔
268

269
    mndReleaseSubscribe(pMnode, pSub);
912,554✔
270
  }
271
}
272

273
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
1,006,806✔
274
  if (pMsg == NULL || rsp == NULL){
1,006,806✔
275
    return TSDB_CODE_INVALID_PARA;
×
276
  }
277
  int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
1,006,806✔
278
  if (tlen <= 0){
1,006,806✔
279
    return TSDB_CODE_TMQ_INVALID_MSG;
×
280
  }
281
  void   *buf = rpcMallocCont(tlen);
1,006,806✔
282
  if (buf == NULL) {
1,006,806✔
283
    return terrno;
×
284
  }
285

286
  if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
1,006,806✔
287
    rpcFreeCont(buf);
×
288
    return TSDB_CODE_TMQ_INVALID_MSG;
×
289
  }
290
  pMsg->info.rsp = buf;
1,006,806✔
291
  pMsg->info.rspLen = tlen;
1,006,806✔
292
  return 0;
1,006,806✔
293
}
294

295
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
1,012,598✔
296
  if (pMsg == NULL) {
1,012,598✔
297
    return TSDB_CODE_INVALID_PARA;
×
298
  }
299
  int32_t         code = 0;
1,012,598✔
300
  int32_t         lino = 0;
1,012,598✔
301
  SMnode         *pMnode = pMsg->info.node;
1,012,598✔
302
  SMqHbReq        req = {0};
1,012,598✔
303
  SMqHbRsp        rsp = {0};
1,012,598✔
304
  SMqConsumerObj *pConsumer = NULL;
1,012,598✔
305
  PRINT_LOG_START
1,012,598✔
306

307
  MND_TMQ_RETURN_CHECK(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req));
1,012,598✔
308
  int64_t consumerId = req.consumerId;
1,012,598✔
309
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
1,012,598✔
310
  taosWLockLatch(&pConsumer->lock);
1,006,806✔
311
  MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
1,006,806✔
312
  atomic_store_32(&pConsumer->hbStatus, 0);
1,006,806✔
313
  mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
1,006,806✔
314
  if (req.pollFlag == 1){
1,006,806✔
315
    atomic_store_32(&pConsumer->pollStatus, 0);
483,642✔
316
    pConsumer->pollTime = taosGetTimestampMs();
967,284✔
317
  }
318

319
  storeOffsetRows(pMnode, &req, pConsumer);
1,006,806✔
320
  rsp.debugFlag = tqClientDebugFlag;
1,006,806✔
321
  code = buildMqHbRsp(pMsg, &rsp);
1,006,806✔
322

323
END:
1,012,598✔
324
  if (pConsumer != NULL) {
1,012,598✔
325
    taosWUnLockLatch(&pConsumer->lock);
1,006,806✔
326
  }
327
  tDestroySMqHbRsp(&rsp);
1,012,598✔
328
  mndReleaseConsumer(pMnode, pConsumer);
1,012,598✔
329
  tDestroySMqHbReq(&req);
1,012,598✔
330
  PRINT_LOG_END
1,012,598✔
331
  return code;
1,012,598✔
332
}
333

334
static int32_t processEachTopicEp(SMnode *pMnode, SMqConsumerObj *pConsumer, char *topic, SMqAskEpRsp *rsp, int32_t epoch) {
1,008,666✔
335
  int32_t         code = 0;
1,008,666✔
336
  int32_t         lino = 0;
1,008,666✔
337
  SMqSubscribeObj *pSub = NULL;
1,008,666✔
338
  SMqSubTopicEp topicEp = {0};
1,008,666✔
339
  char  key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
1,008,666✔
340
  PRINT_LOG_START
1,008,666✔
341
  (void)snprintf(key, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", pConsumer->cgroup, TMQ_SEPARATOR, topic);
1,008,666✔
342
  if(mndAcquireSubscribeByKey(pMnode, key, &pSub) != 0) {
1,008,666✔
343
    mWarn("%s failed to acquire subscribe by key:%s", __func__, key);
×
344
    goto END;
×
345
  }
346
  tstrncpy(topicEp.topic, topic, TSDB_TOPIC_FNAME_LEN);
1,008,666✔
347

348
  taosWLockLatch(&pSub->lock);
1,008,666✔
349
  // 2.2 iterate all vg assigned to the consumer of that topic
350
  SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
1,008,666✔
351
  MND_TMQ_NULL_CHECK(pConsumerEp);
1,008,666✔
352
  int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
1,008,666✔
353
  topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
1,008,666✔
354
  MND_TMQ_NULL_CHECK(topicEp.vgs);
1,008,666✔
355

356
  tstrncpy(topicEp.db, pSub->dbName, TSDB_DB_FNAME_LEN);
1,008,666✔
357
  for (int32_t j = 0; j < vgNum; j++) {
2,297,559✔
358
    SMqVgEp *pVgEp = taosArrayGet(pConsumerEp->vgs, j);
1,288,893✔
359
    if (pVgEp == NULL) {
1,288,893✔
360
      continue;
×
361
    }
362
    if (epoch == -1) {
1,288,893✔
363
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
852,460✔
364
      if (pVgroup) {
852,460✔
365
        pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
39,504✔
366
        mndReleaseVgroup(pMnode, pVgroup);
39,504✔
367
      }
368
    }
369
    SMqSubVgEp vgEp = {.epSet = pVgEp->epSet, .vgId = pVgEp->vgId, .offset = -1};
1,288,893✔
370
    MND_TMQ_NULL_CHECK(taosArrayPush(topicEp.vgs, &vgEp));
2,577,786✔
371
  }
372
  MND_TMQ_NULL_CHECK(taosArrayPush(rsp->topics, &topicEp));
2,017,332✔
373
  topicEp.vgs = NULL;
1,008,666✔
374

375
END:
1,008,666✔
376
  if (pSub != NULL) {
1,008,666✔
377
    taosWUnLockLatch(&pSub->lock);
1,008,666✔
378
  }
379
  taosArrayDestroy(topicEp.vgs);
1,008,666✔
380
  mndReleaseSubscribe(pMnode, pSub);
1,008,666✔
381
  PRINT_LOG_END
1,008,666✔
382
  return code;
1,008,666✔
383
}
384

385
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
1,113,701✔
386
  if (pMnode == NULL || pConsumer == NULL || rsp == NULL){
1,113,701✔
387
    return TSDB_CODE_INVALID_PARA;
×
388
  }
389
  int32_t code = 0;
1,113,701✔
390
  int32_t lino = 0;
1,113,701✔
391
  PRINT_LOG_START
1,113,701✔
392

393
  int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
1,113,701✔
394
  rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
1,113,701✔
395
  MND_TMQ_NULL_CHECK(rsp->topics);
1,113,701✔
396

397
  // handle all topics subscribed by this consumer
398
  for (int32_t i = 0; i < numOfTopics; i++) {
2,122,367✔
399
    char            *topic = taosArrayGetP(pConsumer->currentTopics, i);
1,008,666✔
400
    MND_TMQ_RETURN_CHECK(processEachTopicEp(pMnode, pConsumer, topic, rsp, epoch));
1,008,666✔
401
  }
402

403
END:
1,113,701✔
404
  PRINT_LOG_END
1,113,701✔
405
  return code;
1,113,701✔
406
}
407

408
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
4,144,597✔
409
  if (pMsg == NULL || rsp == NULL) {
4,144,597✔
UNCOV
410
    return TSDB_CODE_INVALID_PARA;
×
411
  }
412
  int32_t code = 0;
4,144,930✔
413
  // encode rsp
414
  int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
4,143,305✔
415
  void   *buf = rpcMallocCont(tlen);
4,143,305✔
416
  if (buf == NULL) {
4,143,594✔
417
    return terrno;
×
418
  }
419

420
  SMqRspHead *pHead = buf;
4,143,594✔
421

422
  pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
4,143,594✔
423
  pHead->epoch = serverEpoch;
4,143,594✔
424
  pHead->consumerId = consumerId;
4,144,260✔
425
  pHead->walsver = 0;
4,144,597✔
426
  pHead->walever = 0;
4,144,316✔
427

428
  void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
4,143,646✔
429
  if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) {
4,144,260✔
430
    rpcFreeCont(buf);
×
431
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
432
  }
433

434
  // send rsp
435
  pMsg->info.rsp = buf;
4,144,260✔
436
  pMsg->info.rspLen = tlen;
4,143,979✔
437
  return code;
4,143,317✔
438
}
439

440
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
4,151,570✔
441
  if (pMsg == NULL) {
4,151,570✔
442
    return TSDB_CODE_INVALID_PARA;
×
443
  }
444
  SMnode     *pMnode = pMsg->info.node;
4,151,570✔
445
  SMqAskEpReq req = {0};
4,151,570✔
446
  SMqAskEpRsp rsp = {0};
4,151,237✔
447
  int32_t     code = 0;
4,151,237✔
448
  int32_t     lino = 0;
4,151,237✔
449
  SMqConsumerObj *pConsumer = NULL;
4,151,237✔
450
  PRINT_LOG_START
4,151,570✔
451

452
  MND_TMQ_RETURN_CHECK(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
4,151,570✔
453
  int64_t consumerId = req.consumerId;
4,151,570✔
454
  MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
4,151,570✔
455
  taosRLockLatch(&pConsumer->lock);
4,144,930✔
456
  if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
4,144,930✔
457
    mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
×
458
           pConsumer->cgroup);
459
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
×
460
    goto END;
×
461
  }
462

463
  // 1. check consumer status
464
  int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
4,144,260✔
465
  int32_t status = atomic_load_32(&pConsumer->status);
4,144,930✔
466
  if (status != MQ_CONSUMER_STATUS_READY) {
4,144,597✔
467
    mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
871,799✔
468
    rsp.code = TSDB_CODE_MND_CONSUMER_NOT_READY;
871,799✔
469
  } else {
470
    int32_t epoch = req.epoch;
3,272,798✔
471

472
    // 2. check epoch, only send ep info when epochs do not match
473
    if (epoch != serverEpoch) {
3,272,798✔
474
      mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
1,113,701✔
475
            consumerId, epoch, serverEpoch);
476
      MND_TMQ_RETURN_CHECK(addEpSetInfo(pMnode, pConsumer, epoch, &rsp));
1,113,701✔
477
    }
478
  }
479
  code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
4,144,597✔
480

481
END:
4,150,286✔
482
  if (pConsumer != NULL) {
4,150,567✔
483
    taosRUnLockLatch(&pConsumer->lock);
4,144,264✔
484
  }
485
  tDeleteSMqAskEpRsp(&rsp);
486
  mndReleaseConsumer(pMnode, pConsumer);
4,150,286✔
487
  PRINT_LOG_END
4,151,570✔
488
  return code;
4,151,570✔
489
}
490

491
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
103,306✔
492
  if (pConsumer == NULL || pTrans == NULL) {
103,306✔
493
    return TSDB_CODE_INVALID_PARA;
×
494
  }
495
  int32_t  code = 0;
103,306✔
496
  int32_t lino = 0;
103,306✔
497
  PRINT_LOG_START
103,306✔
498
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
103,306✔
499
  MND_TMQ_NULL_CHECK(pCommitRaw);
103,306✔
500
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
103,306✔
501
  if (code != 0) {
103,306✔
502
    sdbFreeRaw(pCommitRaw);
×
503
    goto END;
×
504
  }
505
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
103,306✔
506

507
END:
103,306✔
508
  PRINT_LOG_END
103,306✔
509
  return code;
103,306✔
510
}
511

512
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
526,974✔
513
  if (pConsumer == NULL || pTrans == NULL) {
526,974✔
514
    return TSDB_CODE_INVALID_PARA;
×
515
  }
516
  int32_t  code = 0;
526,974✔
517
  int32_t lino = 0;
526,974✔
518
  PRINT_LOG_START
526,974✔
519
  SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
526,974✔
520
  MND_TMQ_NULL_CHECK(pCommitRaw);
526,974✔
521
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
526,974✔
522
  if (code != 0) {
526,974✔
523
    sdbFreeRaw(pCommitRaw);
×
524
    goto END;
×
525
  }
526
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
526,974✔
527
END:
526,974✔
528

529
  return code;
526,974✔
530
}
531

532
static void freeItem(void *param) {
309✔
533
  if (param == NULL) {
309✔
534
    return;
×
535
  }
536
  void *pItem = *(void **)param;
309✔
537
  if (pItem != NULL) {
309✔
538
    taosMemoryFree(pItem);
309✔
539
  }
540
}
541

542
#define ADD_TOPIC_TO_ARRAY(element, array) \
543
char *newTopicCopy = taosStrdup(element); \
544
MND_TMQ_NULL_CHECK(newTopicCopy);\
545
if (taosArrayPush(pConsumerNew->array, &newTopicCopy) == NULL){\
546
  taosMemoryFree(newTopicCopy);\
547
  code = terrno;\
548
  goto END;\
549
}
550

551
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
113,629✔
552
  if (pExistedConsumer == NULL || pConsumerNew == NULL) {
113,629✔
553
    return TSDB_CODE_INVALID_PARA;
×
554
  }
555
  int32_t code = 0;
113,629✔
556
  int32_t lino = 0;
113,629✔
557
  PRINT_LOG_START
113,629✔
558
  taosRLockLatch(&pExistedConsumer->lock);
113,629✔
559

560
  pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
113,629✔
561
  MND_TMQ_NULL_CHECK(pConsumerNew->rebNewTopics);
113,629✔
562
  pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
113,629✔
563
  MND_TMQ_NULL_CHECK(pConsumerNew->rebRemovedTopics);
113,629✔
564

565
  int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
113,629✔
566
  int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
113,629✔
567
  int32_t i = 0, j = 0;
113,629✔
568
  while (i < oldTopicNum || j < newTopicNum) {
229,797✔
569
    if (i >= oldTopicNum) {
116,168✔
570
      void* tmp = taosArrayGetP(pConsumerNew->assignedTopics, j);
632✔
571
      MND_TMQ_NULL_CHECK(tmp);
632✔
572
      ADD_TOPIC_TO_ARRAY(tmp, rebNewTopics);
1,264✔
573
      j++;
632✔
574
      continue;
632✔
575
    } else if (j >= newTopicNum) {
115,536✔
576
      void* tmp = taosArrayGetP(pExistedConsumer->currentTopics, i);
110,113✔
577
      MND_TMQ_NULL_CHECK(tmp);
110,113✔
578
      ADD_TOPIC_TO_ARRAY(tmp, rebRemovedTopics);
220,226✔
579
      i++;
110,113✔
580
      continue;
110,113✔
581
    } else {
582
      char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
5,423✔
583
      MND_TMQ_NULL_CHECK(oldTopic);
5,423✔
584
      char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
5,423✔
585
      MND_TMQ_NULL_CHECK(newTopic);
5,423✔
586
      int   comp = strcmp(oldTopic, newTopic);
5,423✔
587
      if (comp == 0) {
5,423✔
588
        i++;
5,423✔
589
        j++;
5,423✔
590
        continue;
5,423✔
591
      } else if (comp < 0) {
×
592
        ADD_TOPIC_TO_ARRAY(oldTopic, rebRemovedTopics);
×
593
        i++;
×
594
        continue;
×
595
      } else {
×
596
        ADD_TOPIC_TO_ARRAY(newTopic, rebNewTopics);
×
597
        j++;
×
598
        continue;
×
599
      }
600
    }
601
  }
602
  // no topics need to be rebalanced
603
  if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
113,629✔
604
    code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
5,423✔
605
  }
606

607
END:
108,206✔
608
  taosRUnLockLatch(&pExistedConsumer->lock);
113,629✔
609
  PRINT_LOG_END
113,629✔
610
  return code;
113,629✔
611
}
612

613
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
262,057✔
614
  if (pTopicList == NULL || pMnode == NULL) {
262,057✔
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617
  taosArraySort(pTopicList, taosArrayCompareString);
262,057✔
618
  taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
262,057✔
619

620
  int32_t newTopicNum = taosArrayGetSize(pTopicList);
262,057✔
621
  for (int i = 0; i < newTopicNum; i++) {
418,742✔
622
    int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
157,022✔
623
    if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
157,022✔
624
      return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
337✔
625
    }
626
  }
627
  return 0;
261,720✔
628
}
629

630
static int32_t buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe, SMqConsumerObj** ppConsumer){
259,334✔
631
  if (pMnode == NULL || subscribe == NULL) {
259,334✔
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  int64_t         consumerId = subscribe->consumerId;
259,334✔
635
  char           *cgroup     = subscribe->cgroup;
259,334✔
636
  SMqConsumerObj *pConsumerNew     = NULL;
259,334✔
637
  SMqConsumerObj *pExistedConsumer = NULL;
259,334✔
638
  int32_t lino = 0;
259,334✔
639
  PRINT_LOG_START
259,334✔
640
  int32_t code = mndAcquireConsumer(pMnode, consumerId, &pExistedConsumer);
259,334✔
641
  if (code != 0) {
259,334✔
642
    mInfo("receive tmq subscribe request from new consumer:0x%" PRIx64
145,705✔
643
              ",cgroup:%s, numOfTopics:%d", consumerId,
644
          subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
645

646
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe, &pConsumerNew));
145,705✔
647
  } else {
648
    int32_t status = atomic_load_32(&pExistedConsumer->status);
113,629✔
649

650
    mInfo("receive tmq subscribe request from existed consumer:0x%" PRIx64
113,629✔
651
              ",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
652
          consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
653
          (int32_t)taosArrayGetSize(subscribe->topicNames));
654

655
    if (status != MQ_CONSUMER_STATUS_READY) {
113,629✔
656
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
×
657
      goto END;
×
658
    }
659
    MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe, &pConsumerNew));
113,629✔
660
    MND_TMQ_RETURN_CHECK(getTopicAddDelete(pExistedConsumer, pConsumerNew));
113,629✔
661
  }
662
  if (ppConsumer){
253,911✔
663
    *ppConsumer = pConsumerNew;
253,911✔
664
    pConsumerNew = NULL;
253,911✔
665
  }
666

667
END:
259,016✔
668
  PRINT_LOG_END
259,334✔
669
  mndReleaseConsumer(pMnode, pExistedConsumer);
259,334✔
670
  tDeleteSMqConsumerObj(pConsumerNew);
259,334✔
671
  return code;
259,334✔
672
}
673

674
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
350,173✔
675
  if (pMsg == NULL) {
350,173✔
676
    return TSDB_CODE_INVALID_PARA;
×
677
  }
678
  SMnode *pMnode = pMsg->info.node;
350,173✔
679
  char   *msgStr = pMsg->pCont;
350,173✔
680
  int32_t code = 0;
350,173✔
681
  int32_t lino = 0;
350,173✔
682
  SMqConsumerObj *pConsumerNew = NULL;
350,173✔
683
  STrans         *pTrans = NULL;
350,173✔
684

685
  PRINT_LOG_START
350,173✔
686
  SCMSubscribeReq subscribe = {0};
350,173✔
687
  MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
700,346✔
688
  bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
350,173✔
689
  if(unSubscribe){
350,173✔
690
    SMqConsumerObj *pConsumerTmp = NULL;
195,690✔
691
    MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
195,690✔
692
    taosRLockLatch(&pConsumerTmp->lock);
194,617✔
693
    size_t topicNum = taosArrayGetSize(pConsumerTmp->assignedTopics);
194,617✔
694
    taosRUnLockLatch(&pConsumerTmp->lock);
194,617✔
695
    mndReleaseConsumer(pMnode, pConsumerTmp);
194,617✔
696
    if (topicNum == 0){
194,617✔
697
      goto END;
87,043✔
698
    }
699
  }
700
  MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
262,057✔
701
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
261,720✔
702
                          (unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
703
                          pMsg, "subscribe");
704
  MND_TMQ_NULL_CHECK(pTrans);
261,720✔
705

706
  MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, RPC_MSG_USER(pMsg), RPC_MSG_TOKEN(pMsg)));
261,720✔
707
  MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew));
259,334✔
708
  MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew));
253,911✔
709
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
253,911✔
710
  code = TSDB_CODE_ACTION_IN_PROGRESS;
253,911✔
711

712
END:
350,173✔
713
  mndTransDrop(pTrans);
350,173✔
714
  tDeleteSMqConsumerObj(pConsumerNew);
350,173✔
715
  taosArrayDestroyP(subscribe.topicNames, NULL);
350,173✔
716
  code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
350,173✔
717
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS){
350,173✔
718
    mError("tmq subscribe request from consumer:0x%" PRIx64 " failed, code:%d", subscribe.consumerId, code);
2,723✔
719
  } else {
720
    mInfo("tmq subscribe request from consumer:0x%" PRIx64 " processed, code:%d", subscribe.consumerId, code);
347,450✔
721
  }
722
  return code;
350,173✔
723
}
724

725
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
735,845✔
726
  if (pConsumer == NULL) {
735,845✔
727
    return NULL;
×
728
  }
729
  int32_t code = 0;
735,845✔
730
  int32_t lino = 0;
735,845✔
731
  terrno = TSDB_CODE_OUT_OF_MEMORY;
735,845✔
732

733
  void   *buf = NULL;
735,845✔
734
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
735,845✔
735
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
735,845✔
736

737
  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
735,845✔
738
  if (pRaw == NULL) goto CM_ENCODE_OVER;
735,845✔
739

740
  buf = taosMemoryMalloc(tlen);
735,845✔
741
  if (buf == NULL) goto CM_ENCODE_OVER;
735,845✔
742

743
  void *abuf = buf;
735,845✔
744
  if(tEncodeSMqConsumerObj(&abuf, pConsumer) < 0){
735,845✔
745
    goto CM_ENCODE_OVER;
×
746
  }
747

748
  int32_t dataPos = 0;
735,845✔
749
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
735,845✔
750
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
735,845✔
751
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
735,845✔
752
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
735,845✔
753

754
  terrno = TSDB_CODE_SUCCESS;
735,845✔
755

756
CM_ENCODE_OVER:
735,845✔
757
  taosMemoryFreeClear(buf);
735,845✔
758
  if (terrno != 0) {
735,845✔
759
    mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
×
760
    sdbFreeRaw(pRaw);
×
761
    return NULL;
×
762
  }
763

764
  mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
735,845✔
765
  return pRaw;
735,845✔
766
}
767

768
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
642,729✔
769
  if (pRaw == NULL) {
642,729✔
770
    return NULL;
×
771
  }
772
  int32_t         code = 0;
642,729✔
773
  int32_t         lino = 0;
642,729✔
774
  SSdbRow        *pRow = NULL;
642,729✔
775
  SMqConsumerObj *pConsumer = NULL;
642,729✔
776
  void           *buf = NULL;
642,729✔
777

778
  terrno = 0;
642,729✔
779
  int8_t sver = 0;
642,729✔
780
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
642,729✔
781
    goto CM_DECODE_OVER;
×
782
  }
783

784
  if (sver < 1 || sver > MND_CONSUMER_VER_NUMBER) {
642,729✔
785
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
786
    goto CM_DECODE_OVER;
×
787
  }
788

789
  pRow = sdbAllocRow(sizeof(SMqConsumerObj));
642,729✔
790
  if (pRow == NULL) {
642,729✔
791
    goto CM_DECODE_OVER;
×
792
  }
793

794
  pConsumer = sdbGetRowObj(pRow);
642,729✔
795
  if (pConsumer == NULL) {
642,729✔
796
    goto CM_DECODE_OVER;
×
797
  }
798

799
  int32_t dataPos = 0;
642,729✔
800
  int32_t len;
641,987✔
801
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
642,729✔
802
  buf = taosMemoryMalloc(len);
642,729✔
803
  if (buf == NULL) {
642,729✔
804
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
805
    goto CM_DECODE_OVER;
×
806
  }
807

808
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
642,729✔
809
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
642,729✔
810

811
  if (tDecodeSMqConsumerObj(buf, pConsumer, sver) == NULL) {
642,729✔
812
    terrno = TSDB_CODE_OUT_OF_MEMORY;  // TODO set correct error code
×
813
    goto CM_DECODE_OVER;
×
814
  }
815

816
  tmsgUpdateDnodeEpSet(&pConsumer->ep);
642,729✔
817

818
CM_DECODE_OVER:
642,729✔
819
  taosMemoryFreeClear(buf);
642,729✔
820
  if (terrno != TSDB_CODE_SUCCESS) {
642,729✔
821
    mError("consumer:0x%" PRIx64 " failed to decode from raw:%p since %s",
×
822
           pConsumer == NULL ? 0 : pConsumer->consumerId, pRaw, terrstr());
823
    taosMemoryFreeClear(pRow);
×
824
  }
825

826
  return pRow;
642,729✔
827
}
828

829
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
153,612✔
830
  if (pConsumer == NULL) {
153,612✔
831
    return TSDB_CODE_INVALID_PARA;
×
832
  }
833
  mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
153,612✔
834
        pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
835
  pConsumer->subscribeTime = pConsumer->createTime;
153,612✔
836
  return 0;
153,612✔
837
}
838

839
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
642,729✔
840
  if (pConsumer == NULL) {
642,729✔
841
    return TSDB_CODE_INVALID_PARA;
×
842
  }
843
  mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
642,729✔
844
        mndConsumerStatusName(pConsumer->status));
845
  tClearSMqConsumerObj(pConsumer);
642,729✔
846
  return 0;
642,729✔
847
}
848

849
// remove from topic list
850
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
379,680✔
851
  if (topicList == NULL || pTopic == NULL) {
379,680✔
852
    return;
×
853
  }
854
  int32_t size = taosArrayGetSize(topicList);
379,680✔
855
  for (int32_t i = 0; i < size; i++) {
383,634✔
856
    char *p = taosArrayGetP(topicList, i);
380,073✔
857
    if (strcmp(pTopic, p) == 0) {
380,073✔
858
      taosArrayRemove(topicList, i);
376,119✔
859
      taosMemoryFree(p);
376,119✔
860

861
      mInfo("tmq rebalance consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
376,119✔
862
            consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
863
      break;
376,119✔
864
    }
865
  }
866
}
867

868
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
150,180✔
869
  if (pConsumer == NULL || pTopic == NULL) {
150,180✔
870
    return false;
×
871
  }
872
  bool    existing = false;
150,180✔
873
  int32_t size = taosArrayGetSize(pConsumer->currentTopics);
150,180✔
874
  for (int32_t i = 0; i < size; i++) {
152,988✔
875
    char *topic = taosArrayGetP(pConsumer->currentTopics, i);
2,808✔
876
    if (topic && strcmp(topic, pTopic) == 0) {
2,808✔
877
      existing = true;
×
878
      break;
×
879
    }
880
  }
881

882
  return existing;
150,180✔
883
}
884

885
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
385,044✔
886
  if (pOldConsumer == NULL || pNewConsumer == NULL) {
385,044✔
887
    return TSDB_CODE_INVALID_PARA;
×
888
  }
889
  int32_t lino = 0;
385,044✔
890
  int32_t code = 0;
385,044✔
891
  char *pNewTopic = NULL;
385,044✔
892
  PRINT_LOG_START
385,044✔
893
  taosWLockLatch(&pOldConsumer->lock);
385,044✔
894
  mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
385,044✔
895
        pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
896

897
  if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
385,044✔
898
    TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
109,282✔
899
    TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
109,282✔
900
    TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
109,282✔
901

902
    pOldConsumer->subscribeTime = taosGetTimestampMs();
109,282✔
903
    pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
109,282✔
904
    mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
109,282✔
905
  } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
275,762✔
906
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
10,832✔
907
    pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
10,832✔
908
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
10,832✔
909
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
10,832✔
910
  } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
264,930✔
911
    void *tmp = taosArrayGetP(pNewConsumer->rebNewTopics, 0);
150,180✔
912
    MND_TMQ_NULL_CHECK(tmp);
150,180✔
913
    char *pNewTopic = taosStrdup(tmp);
150,180✔
914
    MND_TMQ_NULL_CHECK(pNewTopic);
150,180✔
915
    removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
150,180✔
916
    bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
150,180✔
917
    if (existing) {
150,180✔
918
      mError("tmq rebalance consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
×
919
    } else {
920
      MND_TMQ_NULL_CHECK(taosArrayPush(pOldConsumer->currentTopics, &pNewTopic));
300,360✔
921
      pNewTopic = NULL;
150,180✔
922
      taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
150,180✔
923
    }
924

925
    int32_t status = pOldConsumer->status;
150,180✔
926
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
150,180✔
927
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
147,641✔
928
    }
929

930
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
150,180✔
931
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
150,180✔
932

933
    mInfo("tmq rebalance consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
150,180✔
934
          ", current topics:%d, newTopics:%d, removeTopics:%d",
935
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
936
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
937
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
938
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
939

940
  } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
114,750✔
941
    char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
114,750✔
942
    MND_TMQ_NULL_CHECK(topic);
114,750✔
943
    removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
114,750✔
944
    removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
114,750✔
945

946
    int32_t status = pOldConsumer->status;
114,750✔
947
    if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
114,750✔
948
      pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
112,211✔
949
    }
950
    pOldConsumer->rebalanceTime = taosGetTimestampMs();
114,750✔
951
    (void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
114,750✔
952

953
    mInfo("tmq rebalanceconsumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
114,750✔
954
          ", current topics:%d, newTopics:%d, removeTopics:%d",
955
          pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
956
          mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
957
          (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
958
          (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
959
  }
960

961
END:
×
962
  taosMemoryFree(pNewTopic);
385,044✔
963
  taosWUnLockLatch(&pOldConsumer->lock);
385,044✔
964

965
  PRINT_LOG_END
385,044✔
966

967
  return code;
385,044✔
968
}
969

970
int32_t mndAcquireConsumer(SMnode *pMnode, int64_t consumerId, SMqConsumerObj** pConsumer) {
5,723,321✔
971
  if (pMnode == NULL || pConsumer == NULL) {
5,723,321✔
972
    return TSDB_CODE_INVALID_PARA;
×
973
  }
974
  SSdb           *pSdb = pMnode->pSdb;
5,723,321✔
975
  *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
5,723,321✔
976
  if (*pConsumer == NULL) {
5,722,988✔
977
    return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
161,028✔
978
  }
979
  return 0;
5,561,960✔
980
}
981

982
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
18,474,975✔
983
  if (pMnode == NULL || pConsumer == NULL) {
18,474,975✔
984
    return;
11,498,490✔
985
  }
986
  SSdb *pSdb = pMnode->pSdb;
6,976,485✔
987
  sdbRelease(pSdb, pConsumer);
6,976,485✔
988
}
989

990
static int32_t buildResult(SMqConsumerObj *pConsumer, SShowObj *pShow, SSDataBlock *pBlock, int32_t numOfRows, const char* showTopic, bool hasTopic) {
65,374✔
991
  int32_t         code = 0;
65,374✔
992
  int32_t         lino = 0;
65,374✔
993
  SColumnInfoData *pColInfo = NULL;
65,374✔
994
  int32_t          cols = 0;
65,374✔
995
  char * parasStr = NULL;
65,374✔
996
  char           *status = NULL;
65,374✔
997

998
  PRINT_LOG_START
65,374✔
999
  // consumer id
1000
  char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1001
  (void)snprintf(varDataVal(consumerIdHex), TSDB_CONSUMER_ID_LEN, "0x%" PRIx64, pConsumer->consumerId);
65,374✔
1002
  varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
65,374✔
1003

1004
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1005
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1006
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false));
65,374✔
1007

1008
  // consumer group
1009
  char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1010
  STR_TO_VARSTR(cgroup, pConsumer->cgroup);
65,374✔
1011

1012
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1013
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1014
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
65,374✔
1015

1016
  // client id
1017
  char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1018
  STR_TO_VARSTR(clientId, pConsumer->clientId);
65,374✔
1019

1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1021
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1022
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
65,374✔
1023

1024
  // user
1025
  char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1026
  STR_TO_VARSTR(user, pConsumer->user);
65,374✔
1027

1028
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1029
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1030
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
65,374✔
1031

1032
  // fqdn
1033
  char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1034
  STR_TO_VARSTR(fqdn, pConsumer->fqdn);
65,374✔
1035

1036
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1037
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1038
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
65,374✔
1039

1040
  // status
1041
  const char *pStatusName = mndConsumerStatusName(pConsumer->status);
65,374✔
1042
  status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
65,374✔
1043
  MND_TMQ_NULL_CHECK(status);
65,374✔
1044
  STR_TO_VARSTR(status, pStatusName);
65,374✔
1045

1046
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1047
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1048
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
65,374✔
1049

1050
  // one subscribed topic
1051
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1052
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1053
  if (hasTopic) {
65,374✔
1054
    char        topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
65,374✔
1055
    mndTopicGetShowName(showTopic, topic + VARSTR_HEADER_SIZE);
65,374✔
1056
    *(VarDataLenT *)(topic) = strlen(topic + VARSTR_HEADER_SIZE);
65,374✔
1057
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)topic, false));
65,374✔
1058
  } else {
1059
    MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, NULL, true));
×
1060
  }
1061

1062
  // up time
1063
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1064
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1065
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false));
65,374✔
1066

1067
  // subscribe time
1068
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1069
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1070
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false));
65,374✔
1071

1072
  // rebalance time
1073
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1074
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1075
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0));
65,374✔
1076

1077
  char         buf[TSDB_OFFSET_LEN] = {0};
65,374✔
1078
  STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
65,374✔
1079
  tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
65,374✔
1080

1081
  parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
65,374✔
1082
  MND_TMQ_NULL_CHECK(parasStr);
65,374✔
1083
  (void)snprintf(varDataVal(parasStr), pShow->pMeta->pSchemas[cols].bytes - VARSTR_HEADER_SIZE, "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
130,748✔
1084
          pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
65,374✔
1085
  varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
65,374✔
1086

1087
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1088
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1089
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
65,374✔
1090

1091
  // rebalance time
1092
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,374✔
1093
  MND_TMQ_NULL_CHECK(pColInfo);
65,374✔
1094
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->pollTime, pConsumer->pollTime == 0));
65,374✔
1095

1096
END:
65,374✔
1097
  PRINT_LOG_END
65,374✔
1098
  taosMemoryFreeClear(status);
65,374✔
1099
  taosMemoryFreeClear(parasStr);
65,374✔
1100
  return code;
65,374✔
1101
}
1102

1103
static int32_t retrieveOneConsumer(SMqConsumerObj *pConsumer, int32_t* numOfRows, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
65,951✔
1104
  int32_t         code = 0;
65,951✔
1105
  int32_t         lino = 0;
65,951✔
1106
  PRINT_LOG_START
65,951✔
1107
  taosRLockLatch(&pConsumer->lock);
65,951✔
1108
  mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
65,951✔
1109
  if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
65,951✔
1110
    mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
577✔
1111
    goto END;
577✔
1112
  }
1113

1114

1115
  int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
65,374✔
1116
  bool    hasTopic = true;
65,374✔
1117
  if (topicSz == 0) {
65,374✔
1118
    hasTopic = false;
×
1119
    topicSz = 1;
×
1120
  }
1121

1122
  if (*numOfRows + topicSz > rowsCapacity) {
65,374✔
1123
    MND_TMQ_RETURN_CHECK(blockDataEnsureCapacity(pBlock, *numOfRows + topicSz));
×
1124
  }
1125

1126
  for (int32_t i = 0; i < topicSz; i++) {
130,748✔
1127
    MND_TMQ_RETURN_CHECK(buildResult(pConsumer, pShow, pBlock, *numOfRows, taosArrayGetP(pConsumer->assignedTopics, i), hasTopic));
65,374✔
1128
    (*numOfRows)++;
65,374✔
1129
  }
1130

1131
END:
65,374✔
1132
  taosRUnLockLatch(&pConsumer->lock);
65,951✔
1133
  PRINT_LOG_END
65,951✔
1134
  return code;
65,951✔
1135
}
1136

1137
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
15,470✔
1138
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
15,470✔
1139
    return TSDB_CODE_INVALID_PARA;
×
1140
  }
1141
  SMnode         *pMnode = pReq->info.node;
15,470✔
1142
  SSdb           *pSdb = pMnode->pSdb;
15,470✔
1143
  int32_t         numOfRows = 0;
15,470✔
1144
  SMqConsumerObj *pConsumer = NULL;
15,470✔
1145
  int32_t         code = 0;
15,470✔
1146
  int32_t         lino = 0;
15,470✔
1147
  PRINT_LOG_START
15,470✔
1148

1149
  while (numOfRows < rowsCapacity) {
81,421✔
1150
    pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
81,421✔
1151
    if (pShow->pIter == NULL) {
81,421✔
1152
      break;
15,470✔
1153
    }
1154
    MND_TMQ_RETURN_CHECK(retrieveOneConsumer(pConsumer, &numOfRows, pShow, pBlock, rowsCapacity));
65,951✔
1155
    
1156
    pBlock->info.rows = numOfRows;
65,951✔
1157
    sdbRelease(pSdb, pConsumer);
65,951✔
1158
    pConsumer = NULL;
65,951✔
1159
  }
1160

1161
  pShow->numOfRows += numOfRows;
15,470✔
1162

1163
END:
15,470✔
1164
  sdbRelease(pSdb, pConsumer);
15,470✔
1165
  sdbCancelFetch(pSdb, pShow->pIter);
15,470✔
1166
  if (code != 0) {
15,470✔
1167
    mError("show consumer failed, code:%d", code);
×
1168
    return code;
×
1169
  } else {
1170
    mDebug("show consumer processed, numOfRows:%d", numOfRows);
15,470✔
1171
    return numOfRows;
15,470✔
1172
  }
1173
}
1174

1175
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
×
1176
  if (pMnode == NULL || pIter == NULL) return;
×
1177
  SSdb *pSdb = pMnode->pSdb;
×
1178
  sdbCancelFetchByType(pSdb, pIter, SDB_CONSUMER);
×
1179
}
1180

1181
const char *mndConsumerStatusName(int status) {
3,897,737✔
1182
  switch (status) {
3,897,737✔
1183
    case MQ_CONSUMER_STATUS_READY:
1,826,588✔
1184
      return "ready";
1,826,588✔
1185
    case MQ_CONSUMER_STATUS_REBALANCE:
2,071,149✔
1186
      return "rebalancing";
2,071,149✔
1187
    default:
×
1188
      return "unknown";
×
1189
  }
1190
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc