• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.8
/source/client/src/clientTmq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "clientTmq.h"
17
#include "cJSON.h"
18
#include "parser.h"
19
#include "taos.h"
20
#include "tdatablock.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23

24
// ============================================================
25
// global variables
26
// ============================================================
27
TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
28
volatile int32_t tmqInitRes = 0;               // initialize rsp code
29
SMqMgmt        tmqMgmt = {0};
30

31
int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic** topic) {
67,066✔
32
  if (tmq == NULL || pTopicName == NULL || topic == NULL) {
67,066✔
33
    return TSDB_CODE_INVALID_PARA;
×
34
  }
35
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
67,066✔
36
  for (int32_t i = 0; i < numOfTopics; ++i) {
67,126✔
37
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
32,290✔
38
    if (pTopic == NULL || strcmp(pTopic->topicName, pTopicName) != 0) {
32,290✔
39
      continue;
60✔
40
    }
41
    *topic = pTopic;
32,230✔
42
    return 0;
32,230✔
43
  }
44

45
  tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
34,836✔
46
  return TSDB_CODE_TMQ_INVALID_TOPIC;
34,836✔
47
}
48

49
int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg) {
64,873✔
50
  if (tmq == NULL || pTopicName == NULL || pVg == NULL) {
64,873✔
51
    return TSDB_CODE_INVALID_PARA;
×
52
  }
53
  SMqClientTopic* pTopic = NULL;
64,873✔
54
  int32_t         code = getTopicByName(tmq, pTopicName, &pTopic);
64,873✔
55
  if (code != 0) {
64,873✔
56
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
34,836✔
57
    return code;
34,836✔
58
  }
59

60
  int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
30,037✔
61
  for (int32_t i = 0; i < numOfVgs; ++i) {
44,247✔
62
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
41,163✔
63
    if (pClientVg && pClientVg->vgId == vgId) {
41,163✔
64
      *pVg = pClientVg;
26,953✔
65
      break;
26,953✔
66
    }
67
  }
68

69
  return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
30,037✔
70
}
71

72
static void generateTimedTask(int64_t refId, int32_t type) {
6,803,319✔
73
  tmq_t*  tmq = NULL;
6,803,319✔
74
  int8_t* pTaskType = NULL;
6,803,319✔
75
  int32_t code = 0;
6,803,319✔
76

77
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
6,803,319✔
78
  if (tmq == NULL) return;
6,803,319✔
79

80
  code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType);
6,802,563✔
81
  if (code == TSDB_CODE_SUCCESS) {
6,802,563✔
82
    *pTaskType = type;
6,802,563✔
83
    if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
6,802,563✔
84
      if (tsem2_post(&tmq->rspSem) != 0){
6,802,563✔
85
        tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
×
86
      }
87
    }else{
88
      taosFreeQitem(pTaskType);
×
89
    }
90
  }
91

92
  code = taosReleaseRef(tmqMgmt.rsetId, refId);
6,802,563✔
93
  if (code != 0){
6,802,563✔
94
    tqErrorC("failed to release ref:%"PRId64 ", type:%d, code:%d", refId, type, code);
×
95
  }
96
}
97

98
void tmqAssignAskEpTask(void* param, void* tmrId) {
2,488,528✔
99
  int64_t refId = (int64_t)param;
2,488,528✔
100
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
2,488,528✔
101
}
2,488,528✔
102

103
void tmqReplayTask(void* param, void* tmrId) {
3,952✔
104
  int64_t refId = (int64_t)param;
3,952✔
105
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
3,952✔
106
  if (tmq == NULL) return;
3,952✔
107

108
  if (tsem2_post(&tmq->rspSem) != 0){
3,952✔
109
    tqErrorC("consumer:0x%" PRIx64 " failed to post sem, replay", tmq->consumerId);
×
110
  }
111
  int32_t code = taosReleaseRef(tmqMgmt.rsetId, refId);
3,952✔
112
  if (code != 0){
3,952✔
113
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, code);
×
114
  }
115
}
116

117
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
4,314,791✔
118
  int64_t refId = (int64_t)param;
4,314,791✔
119
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
4,314,791✔
120
}
4,314,791✔
121

122
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
1,153,399✔
123
  if (pMsg == NULL || param == NULL) {
1,153,399✔
124
    return TSDB_CODE_INVALID_PARA;
×
125
  }
126

127
  int64_t refId = (int64_t)param;
1,153,399✔
128
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1,153,399✔
129

130
  if (tmq == NULL) {
1,153,399✔
131
    goto END;
4,203✔
132
  }
133

134
  atomic_store_32(&tmq->tokenCode, code);
1,149,196✔
135
  if (code != 0){
1,149,196✔
136
    goto END;
10,652✔
137
  }
138

139
  SMqHbRsp rsp = {0};
1,138,544✔
140
  code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
1,138,544✔
141
  if (code != 0) {
1,138,544✔
142
    goto END;
×
143
  }
144

145
  tmqWlock(tmq);
1,138,544✔
146
  for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
2,205,111✔
147
    STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
1,066,567✔
148
    if (privilege == NULL) {
1,066,567✔
149
      continue;
×
150
    }
151
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1,066,567✔
152
    for (int32_t j = 0; j < topicNumCur; j++) {
2,218,668✔
153
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
1,152,101✔
154
      if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0 && pTopicCur->noPrivilege != privilege->noPrivilege) {
1,152,101✔
155
        tqInfoC("consumer:0x%" PRIx64 ", update privilege:%s, topic:%s", tmq->consumerId, privilege->noPrivilege ? "false" : "true", privilege->topic);
171✔
156
        pTopicCur->noPrivilege = privilege->noPrivilege;
171✔
157
      }
158
    }
159
  }
160
  tmqWUnlock(tmq);
1,138,544✔
161

162
  tqClientDebugFlag = rsp.debugFlag;
1,138,544✔
163

164
  tDestroySMqHbRsp(&rsp);
1,138,544✔
165

166
END:
1,153,399✔
167
  taosMemoryFree(pMsg->pData);
1,153,399✔
168
  taosMemoryFree(pMsg->pEpSet);
1,153,399✔
169
  if (tmq != NULL) {
1,153,399✔
170
    int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
1,149,196✔
171
    if (ret != 0){
1,149,196✔
172
      tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
173
    }
174
  }
175
  if (code != 0){
1,153,399✔
176
    tqErrorC("failed to process heartbeat, refId:%"PRId64 ", code:%d", refId, code);
11,047✔
177
  }
178
  return code;
1,153,399✔
179
}
180

181
static void buildVgHeartbeatData(tmq_t* tmq, SMqHbReq* req) {
1,156,279✔
182
  if (tmq == NULL || req == NULL) return;
1,156,279✔
183
  tmqRlock(tmq);
1,156,279✔
184
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
2,246,138✔
185
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
1,089,859✔
186
    if (pTopic == NULL) {
1,089,859✔
187
      continue;
×
188
    }
189
    int32_t          numOfVgroups = taosArrayGetSize(pTopic->vgs);
1,089,859✔
190
    TopicOffsetRows* data = taosArrayReserve(req->topics, 1);
1,089,859✔
191
    if (data == NULL) {
1,089,859✔
192
      continue;
×
193
    }
194
    tstrncpy(data->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
1,089,859✔
195
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
1,089,859✔
196
    if (data->offsetRows == NULL) {
1,089,859✔
197
      continue;
×
198
    }
199
    for (int j = 0; j < numOfVgroups; j++) {
3,820,151✔
200
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
2,730,292✔
201
      if (pVg == NULL) {
2,730,292✔
202
        continue;
×
203
      }
204
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
2,730,292✔
205
      if (offRows == NULL) {
2,730,292✔
206
        continue;
×
207
      }
208
      offRows->vgId = pVg->vgId;
2,730,292✔
209
      offRows->rows = pVg->numOfRows;
2,730,292✔
210
      offRows->offset = pVg->offsetInfo.endOffset;
2,730,292✔
211
      offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
2,730,292✔
212
      char buf[TSDB_OFFSET_LEN] = {0};
2,730,292✔
213
      tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
2,730,292✔
214
      tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
2,730,292✔
215
               tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
216
    }
217
  }
218
  tmqRUnlock(tmq);
1,156,279✔
219
}
220

221
void tmqSendHbReq(void* param, void* tmrId) {
1,159,194✔
222
  int64_t refId = (int64_t)param;
1,159,194✔
223

224
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1,159,194✔
225
  if (tmq == NULL) {
1,159,194✔
226
    return;
2,915✔
227
  }
228

229
  SMqHbReq req = {0};
1,156,279✔
230
  req.consumerId = tmq->consumerId;
1,156,279✔
231
  req.epoch = atomic_load_32(&tmq->epoch);
1,156,279✔
232
  req.pollFlag = atomic_load_8(&tmq->pollFlag);
1,156,279✔
233
  tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
1,156,279✔
234
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
1,156,279✔
235
  if (req.topics == NULL) {
1,156,279✔
236
    goto END;
×
237
  }
238
  buildVgHeartbeatData(tmq, &req);
1,156,279✔
239

240
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
1,156,279✔
241
  if (tlen < 0) {
1,156,279✔
242
    tqErrorC("tSerializeSMqHbReq failed, size:%d", tlen);
×
243
    goto END;
×
244
  }
245

246
  void* pReq = taosMemoryCalloc(1, tlen);
1,156,279✔
247
  if (pReq == NULL) {
1,156,279✔
248
    tqErrorC("failed to malloc MqHbReq msg, code:%d", terrno);
×
249
    goto END;
×
250
  }
251

252
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
1,156,279✔
253
    tqErrorC("tSerializeSMqHbReq %d failed", tlen);
×
254
    taosMemoryFree(pReq);
×
255
    goto END;
×
256
  }
257

258
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,156,279✔
259
  if (sendInfo == NULL) {
1,156,279✔
260
    taosMemoryFree(pReq);
×
261
    goto END;
×
262
  }
263

264
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
1,156,279✔
265

266
  sendInfo->requestId = generateRequestId();
1,156,279✔
267
  sendInfo->requestObjRefId = 0;
1,156,279✔
268
  sendInfo->param = (void*)refId;
1,156,279✔
269
  sendInfo->fp = tmqHbCb;
1,156,279✔
270
  sendInfo->msgType = TDMT_MND_TMQ_HB;
1,156,279✔
271

272
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
1,156,279✔
273

274
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
1,156,279✔
275
  if (code != 0) {
1,156,279✔
276
    tqErrorC("tmqSendHbReq asyncSendMsgToServer failed");
2,880✔
277
  }
278
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0);
1,156,279✔
279

280
  END:
1,156,279✔
281
  tDestroySMqHbReq(&req);
1,156,279✔
282
  if (tmrId != NULL) {
1,156,279✔
283
    bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
922,941✔
284
    tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
922,941✔
285
  }
286
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
1,156,279✔
287
  if (ret != 0){
1,156,279✔
288
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
289
  }
290
}
291

292
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
421,935✔
293
  if (code != 0 && pTmq != NULL) {
421,935✔
294
    tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
×
295
  }
296
}
421,935✔
297

298
void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
353,332,633✔
299
  if (rspWrapper == NULL) {
353,332,633✔
300
    return;
×
301
  }
302
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
353,332,633✔
303
    tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
2,906,241✔
304
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
350,425,650✔
305
    DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
350,327,466✔
306
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
100,426✔
307
    DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
6,635✔
308
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
93,791✔
309
    DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
85,274✔
310
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
8,517✔
311
    DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
8,517✔
312
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
313
    DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
×
314
  }
315
}
316

317
void freeClientVg(void* param) {
978,601✔
318
  if (param == NULL) {
978,601✔
319
    return;
×
320
  }
321
  SMqClientVg* pVg = param;
978,601✔
322
  tqTraceC("freeClientVg vgId:%d", pVg->vgId);
978,601✔
323
  tOffsetDestroy(&pVg->offsetInfo.endOffset);
978,601✔
324
  tOffsetDestroy(&pVg->offsetInfo.beginOffset);
978,601✔
325
  tOffsetDestroy(&pVg->offsetInfo.committedOffset);
978,601✔
326
}
327
void freeClientTopic(void* param) {
694,893✔
328
  if (param == NULL) {
694,893✔
329
    return;
×
330
  }
331
  SMqClientTopic* pTopic = param;
694,893✔
332
  tqTraceC("freeClientTopic topic:%s, vgs:%d", pTopic->topicName, (int)taosArrayGetSize(pTopic->vgs));
694,893✔
333
  taosArrayDestroyEx(pTopic->vgs, freeClientVg);
694,893✔
334
}
335

336
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
697,081✔
337
                                   tmq_t* tmq) {
338
  if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
697,081✔
339
    return;
×
340
  }
341

342
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
697,081✔
343
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
697,081✔
344

345
  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
697,081✔
346
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
697,081✔
347

348
  tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
697,081✔
349
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
697,081✔
350
  if (pTopic->vgs == NULL) {
697,081✔
351
    tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName);
×
352
    return;
×
353
  }
354
  for (int32_t j = 0; j < vgNumGet; j++) {
1,689,786✔
355
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
992,705✔
356
    if (pVgEp == NULL) {
992,705✔
357
      continue;
×
358
    }
359
    (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopic->topicName, pVgEp->vgId);
992,705✔
360
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
992,705✔
361

362
    STqOffsetVal offsetNew = {0};
992,705✔
363
    offsetNew.type = tmq->resetOffsetCfg;
992,705✔
364

365
    tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
992,705✔
366
            pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
367

368
    SMqClientVg clientVg = {
2,008,188✔
369
        .pollCnt = 0,
370
        .vgId = pVgEp->vgId,
992,705✔
371
        .epSet = pVgEp->epSet,
372
        .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
992,705✔
373
        .vgSkipCnt = 0,
374
        .emptyBlockReceiveTs = 0,
375
        .blockReceiveTs = 0,
376
        .blockSleepForReplay = 0,
377
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
992,705✔
378
    };
379

380
    clientVg.offsetInfo.walVerBegin = -1;
992,705✔
381
    clientVg.offsetInfo.walVerEnd = -1;
992,705✔
382
    clientVg.seekUpdated = false;
992,705✔
383
    if (pInfo) {
992,705✔
384
      tOffsetCopy(&clientVg.offsetInfo.endOffset, &pInfo->currentOffset);
550,439✔
385
      tOffsetCopy(&clientVg.offsetInfo.committedOffset, &pInfo->commitOffset);
550,439✔
386
      tOffsetCopy(&clientVg.offsetInfo.beginOffset, &pInfo->seekOffset);
550,439✔
387
      clientVg.offsetInfo.walVerBegin = pInfo->walVerBegin;
550,439✔
388
      clientVg.offsetInfo.walVerEnd = pInfo->walVerEnd;
550,439✔
389
    } else {
390
      clientVg.offsetInfo.endOffset = offsetNew;
442,266✔
391
      clientVg.offsetInfo.committedOffset = offsetNew;
442,266✔
392
      clientVg.offsetInfo.beginOffset = offsetNew;
442,266✔
393
    }
394
    if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) {
1,985,410✔
395
      tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId,
×
396
               pTopic->topicName);
397
      freeClientVg(&clientVg);
×
398
    }
399
  }
400
}
401

402
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
690,432✔
403
  if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
690,432✔
404
    return;
×
405
  }
406
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
690,432✔
407
  if (pVgOffsetHashMap == NULL) {
690,432✔
408
    tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
×
409
    return;
×
410
  }
411

412
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
690,432✔
413
  for (int32_t i = 0; i < topicNumCur; i++) {
1,243,773✔
414
    // find old topic
415
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
553,341✔
416
    if (pTopicCur && pTopicCur->vgs) {
553,341✔
417
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
553,341✔
418
      tqInfoC("consumer:0x%" PRIx64 ", current vg num:%d", tmq->consumerId, vgNumCur);
553,341✔
419
      for (int32_t j = 0; j < vgNumCur; j++) {
1,107,162✔
420
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
553,821✔
421
        if (pVgCur == NULL) {
553,821✔
422
          continue;
×
423
        }
424
        char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
553,821✔
425
        (void)snprintf(vgKey, sizeof(vgKey), "%s:%d", pTopicCur->topicName, pVgCur->vgId);
553,821✔
426

427
        char buf[TSDB_OFFSET_LEN] = {0};
553,821✔
428
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset);
553,821✔
429
        tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf);
553,821✔
430

431
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
553,821✔
432
            .seekOffset = pVgCur->offsetInfo.beginOffset,
433
            .commitOffset = pVgCur->offsetInfo.committedOffset,
434
            .numOfRows = pVgCur->numOfRows,
553,821✔
435
            .vgStatus = pVgCur->vgStatus,
553,821✔
436
            .walVerBegin = pVgCur->offsetInfo.walVerBegin,
553,821✔
437
            .walVerEnd = pVgCur->offsetInfo.walVerEnd
553,821✔
438
        };
439
        if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) {
553,821✔
440
          tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId);
×
441
        }
442
      }
443
    }
444
  }
445

446
  for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) {
1,387,513✔
447
    SMqClientTopic topic = {0};
697,081✔
448
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
697,081✔
449
    if (pTopicEp == NULL) {
697,081✔
450
      continue;
×
451
    }
452
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
697,081✔
453
    if (taosArrayPush(newTopics, &topic) == NULL) {
697,081✔
454
      tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName);
×
455
      freeClientTopic(&topic);
×
456
    }
457
  }
458

459
  taosHashCleanup(pVgOffsetHashMap);
690,432✔
460
}
461

462
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
3,282,311✔
463
  if (tmq == NULL || pRsp == NULL) {
3,282,311✔
464
    return;
×
465
  }
466
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
3,282,311✔
467
  // vnode transform (epoch == tmq->epoch && topicNumGet != 0)
468
  // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
469
  if (epoch < atomic_load_32(&tmq->epoch) || (epoch == atomic_load_32(&tmq->epoch) && topicNumGet == 0)) {
3,282,311✔
470
    tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
2,360,146✔
471
             tmq->epoch, epoch, topicNumGet);
472
    return;
2,360,146✔
473
  }
474

475
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
922,165✔
476
  if (newTopics == NULL) {
922,165✔
477
    tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno);
×
478
    return;
×
479
  }
480
  tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
922,165✔
481
          tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics));
482

483
  tmqWlock(tmq);
922,165✔
484
  if (topicNumGet > 0){
922,165✔
485
    buildNewTopicList(tmq, newTopics, pRsp);
690,432✔
486
  }
487
  // destroy current buffered existed topics info
488
  if (tmq->clientTopics) {
922,165✔
489
    taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
922,165✔
490
  }
491
  tmq->clientTopics = newTopics;
922,165✔
492
  tmqWUnlock(tmq);
922,165✔
493

494
  atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
922,165✔
495
  atomic_store_32(&tmq->epoch, epoch);
922,165✔
496

497
  tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
922,165✔
498
}
499

500
static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
13,523,557✔
501
  SMqAskEpRsp rsp = {0};
13,523,557✔
502

503
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
13,523,557✔
504
  if (pParam == NULL) {
13,523,557✔
505
    goto _ERR;
×
506
  }
507

508
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
13,523,557✔
509
  if (tmq == NULL) {
13,523,557✔
510
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
511
    goto _ERR;
×
512
  }
513

514
  if (code != TSDB_CODE_SUCCESS) {
13,523,557✔
515
    tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
9,462,928✔
516
    if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
9,462,928✔
517
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
3,767✔
518
    }
519
    goto END;
9,462,928✔
520
  }
521

522
  if (pMsg == NULL) {
4,060,629✔
523
    goto END;
×
524
  }
525
  SMqRspHead* head = pMsg->pData;
4,060,629✔
526
  int32_t     epoch = atomic_load_32(&tmq->epoch);
4,060,629✔
527
  tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
4,060,629✔
528

529
  if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) == NULL) {
8,121,258✔
530
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
531
    tqErrorC("consumer:0x%" PRIx64 ", decode ep rsp failed", tmq->consumerId);
×
532
    goto END;
×
533
  }
534

535
  if (rsp.code != TSDB_CODE_SUCCESS) {
4,060,629✔
536
    code = rsp.code;
774,758✔
537
    goto END;
774,758✔
538
  }
539

540
  if (pParam->sync) {
3,285,871✔
541
    doUpdateLocalEp(tmq, head->epoch, &rsp);
379,346✔
542
  } else {
543
    SMqRspWrapper* pWrapper = NULL;
2,906,525✔
544
    code = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pWrapper);
2,906,525✔
545
    if (code) {
2,906,525✔
546
      goto END;
×
547
    }
548

549
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
2,906,525✔
550
    pWrapper->epoch = head->epoch;
2,906,525✔
551
    TSWAP(pWrapper->epRsp, rsp);
2,906,525✔
552
    code = taosWriteQitem(tmq->mqueue, pWrapper);
2,906,525✔
553
    if (code != 0) {
2,906,525✔
554
      tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
×
555
      taosFreeQitem(pWrapper);
×
556
      tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
×
557
    }
558
  }
559

560
END:
13,523,557✔
561
  tDeleteSMqAskEpRsp(&rsp);
562
  int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
13,523,557✔
563
  if (ret != 0){
13,523,557✔
564
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
×
565
  }
566

567
_ERR:
13,523,557✔
568
  if (pParam && pParam->sync) {
13,523,557✔
569
    SAskEpInfo* pInfo = pParam->pParam;
1,156,447✔
570
    if (pInfo) {
1,156,447✔
571
      pInfo->code = code;
1,156,447✔
572
      if (tsem2_post(&pInfo->sem) != 0){
1,156,447✔
573
        tqErrorC("failed to post rsp sem askep cb");
×
574
      }
575
    }
576
  }
577

578
  if (pMsg) {
13,523,557✔
579
    taosMemoryFree(pMsg->pEpSet);
13,523,557✔
580
    taosMemoryFree(pMsg->pData);
13,523,557✔
581
  }
582

583
  return code;
13,523,557✔
584
}
585

586
int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
13,523,557✔
587
  if (pTmq == NULL) {
13,523,557✔
588
    return TSDB_CODE_INVALID_PARA;
×
589
  }
590
  int32_t code = 0;
13,523,557✔
591
  int32_t lino = 0;
13,523,557✔
592
  SMqAskEpReq req = {0};
13,523,557✔
593
  req.consumerId = pTmq->consumerId;
13,523,557✔
594
  req.epoch = updateEpSet ? -1 : atomic_load_32(&pTmq->epoch);
13,523,557✔
595
  tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
13,523,557✔
596
  SMqAskEpCbParam* pParam = NULL;
13,523,557✔
597
  void*            pReq = NULL;
13,523,557✔
598

599
  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
13,523,557✔
600
  TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
13,523,557✔
601
  pReq = taosMemoryCalloc(1, tlen);
13,523,557✔
602
  TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
13,523,557✔
603

604
  code = tSerializeSMqAskEpReq(pReq, tlen, &req);
13,523,557✔
605
  TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
13,523,176✔
606

607
  pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
13,523,176✔
608
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
13,523,557✔
609

610
  pParam->refId = pTmq->refId;
13,523,557✔
611
  pParam->sync = sync;
13,523,557✔
612
  pParam->pParam = param;
13,523,557✔
613

614
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
13,523,557✔
615
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
13,523,121✔
616

617
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
13,523,121✔
618
  sendInfo->requestId = generateRequestId();
13,523,121✔
619
  sendInfo->requestObjRefId = 0;
13,523,557✔
620
  sendInfo->param = pParam;
13,523,557✔
621
  sendInfo->paramFreeFp = taosAutoMemoryFree;
13,523,557✔
622
  sendInfo->fp = askEpCb;
13,523,557✔
623
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
13,523,557✔
624

625
  pReq = NULL;
13,523,557✔
626
  pParam = NULL;
13,523,557✔
627

628
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
13,523,557✔
629
  tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
13,523,502✔
630
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
13,523,557✔
631

632
END:
13,523,557✔
633
  if (code != 0) {
13,523,557✔
634
    tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
×
635
  }
636
  taosMemoryFree(pReq);
13,523,557✔
637
  taosMemoryFree(pParam);
13,523,557✔
638
  return code;
13,523,557✔
639
}
640

641
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
557,711,909✔
642
  tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
557,711,909✔
643
  while (1) {
6,634,051✔
644
    int8_t* pTaskType = NULL;
564,353,780✔
645
    taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
564,355,280✔
646
    if (pTaskType == NULL) {break;}
564,355,679✔
647
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
6,634,051✔
648
      tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
2,363,843✔
649
      int32_t code = askEp(pTmq, NULL, false, false);
2,363,843✔
650
      if (code != 0) {
2,363,843✔
651
        tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
×
652
      }
653
      tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
2,363,843✔
654
      bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
2,363,843✔
655
                              &pTmq->epTimer);
656
      tqDebugC("reset timer for tmq ask ep:%d", ret);
2,363,843✔
657
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
4,270,208✔
658
      tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
4,270,208✔
659
      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
4,270,208✔
660
      tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
4,270,208✔
661
               pTmq->autoCommitInterval / 1000.0);
662
      bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
4,270,208✔
663
                              &pTmq->commitTimer);
664
      tqDebugC("reset timer for commit:%d", ret);
4,270,208✔
665
    } else {
UNCOV
666
      tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
×
667
    }
668

669
    taosFreeQitem(pTaskType);
6,634,051✔
670
  }
671

672
  return 0;
557,719,341✔
673
}
674

675
void tmqClearUnhandleMsg(tmq_t* tmq) {
370,635✔
676
  if (tmq == NULL) return;
370,635✔
677
  while (1) {
117,943✔
678
    SMqRspWrapper* rspWrapper = NULL;
488,578✔
679
    taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
488,578✔
680
    if (rspWrapper == NULL) break;
488,578✔
681
    tmqFreeRspWrapper(rspWrapper);
117,943✔
682
    taosFreeQitem(rspWrapper);
117,943✔
683
  }
684
}
685

686
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
383,439✔
687
  if (pMsg) {
383,439✔
688
    taosMemoryFreeClear(pMsg->pEpSet);
383,439✔
689
    taosMemoryFreeClear(pMsg->pData);
383,439✔
690
  }
691

692
  if (param == NULL) {
383,439✔
693
    return code;
×
694
  }
695

696
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
383,439✔
697
  pParam->rspErr = code;
383,439✔
698

699
  if (tsem2_post(&pParam->rspSem) != 0){
383,439✔
700
    tqErrorC("failed to post sem, subscribe cb");
×
701
  }
702
  return 0;
383,439✔
703
}
704

705
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
9,661✔
706
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
9,661✔
707
  if (*topics == NULL) {
9,661✔
708
    *topics = tmq_list_new();
7,568✔
709
    if (*topics == NULL) {
7,568✔
710
      return terrno;
×
711
    }
712
  }
713
  tmqRlock(tmq);
9,661✔
714
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
14,030✔
715
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
4,369✔
716
    if (topic == NULL) {
4,369✔
717
      tqErrorC("topic is null");
×
718
      continue;
×
719
    }
720
    char* tmp = strchr(topic->topicName, '.');
4,369✔
721
    if (tmp == NULL) {
4,369✔
722
      tqErrorC("topic name is invalid:%s", topic->topicName);
×
723
      continue;
×
724
    }
725
    if (tmq_list_append(*topics, tmp + 1) != 0) {
4,369✔
726
      tqErrorC("failed to append topic:%s", tmp + 1);
×
727
      continue;
×
728
    }
729
  }
730
  tmqRUnlock(tmq);
9,661✔
731
  return 0;
9,661✔
732
}
733

734
void tmqFreeImpl(void* handle) {
137,297✔
735
  if (handle == NULL) return;
137,297✔
736
  tmq_t*  tmq = (tmq_t*)handle;
137,297✔
737
  int64_t id = tmq->consumerId;
137,297✔
738

739
  if (tmq->mqueue) {
137,297✔
740
    tmqClearUnhandleMsg(tmq);
137,297✔
741
    taosCloseQueue(tmq->mqueue);
137,297✔
742
  }
743

744
  if (tmq->delayedTask) {
137,297✔
745
    taosCloseQueue(tmq->delayedTask);
137,297✔
746
  }
747

748
  if(tsem2_destroy(&tmq->rspSem) != 0) {
137,297✔
749
    tqErrorC("failed to destroy sem in free tmq");
×
750
  }
751

752
  taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
137,297✔
753
  taos_close_internal(tmq->pTscObj);
137,297✔
754

755
  if (tmq->commitTimer) {
137,297✔
756
    if (!taosTmrStopA(&tmq->commitTimer)) {
83,062✔
757
      tqErrorC("failed to stop commit timer");
43,591✔
758
    }
759
  }
760
  if (tmq->epTimer) {
137,297✔
761
    if (!taosTmrStopA(&tmq->epTimer)) {
134,753✔
762
      tqErrorC("failed to stop ep timer");
122,179✔
763
    }
764
  }
765
  if (tmq->hbLiveTimer) {
137,297✔
766
    if (!taosTmrStopA(&tmq->hbLiveTimer)) {
136,949✔
767
      tqErrorC("failed to stop hb timer");
×
768
    }
769
  }
770
  taosMemoryFree(tmq);
137,297✔
771

772
  tqInfoC("consumer:0x%" PRIx64 " closed", id);
137,297✔
773
}
774

775
void tmqMgmtInit(void) {
95,641✔
776
  tmqInitRes = 0;
95,641✔
777

778
  if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
95,641✔
779
    goto END;
×
780
  }
781

782
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
95,641✔
783

784
  if (tmqMgmt.timer == NULL) {
95,641✔
785
    goto END;
×
786
  }
787

788
  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
95,641✔
789
  if (tmqMgmt.rsetId < 0) {
95,641✔
790
    goto END;
×
791
  }
792

793
  return;
95,641✔
794
END:
×
795
  tmqInitRes = terrno;
×
796
}
797

798
void tmqMgmtClose(void) {
1,559,343✔
799
  if (tmqMgmt.timer) {
1,559,343✔
800
    taosTmrCleanUp(tmqMgmt.timer);
95,641✔
801
    tmqMgmt.timer = NULL;
95,641✔
802
  }
803

804
  if (tmqMgmt.rsetId > 0) {
1,559,343✔
805
    (void) taosThreadMutexLock(&tmqMgmt.lock);
95,641✔
806
    tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
95,641✔
807
    int64_t  refId = 0;
95,641✔
808

809
    while (tmq) {
99,276✔
810
      refId = tmq->refId;
3,635✔
811
      if (refId == 0) {
3,635✔
812
        break;
×
813
      }
814
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
3,635✔
815
      tmq = taosIterateRef(tmqMgmt.rsetId, refId);
3,635✔
816
    }
817
    taosCloseRef(tmqMgmt.rsetId);
95,641✔
818
    tmqMgmt.rsetId = -1;
95,641✔
819
    (void)taosThreadMutexUnlock(&tmqMgmt.lock);
95,641✔
820
  }
821
  (void)taosThreadMutexDestroy(&tmqMgmt.lock);
1,559,343✔
822
}
1,559,343✔
823

824
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
140,820✔
825
  int32_t code = 0;
140,820✔
826

827
  if (conf == NULL) {
140,820✔
828
    SET_ERROR_MSG_TMQ("configure is null")
×
829
    return NULL;
×
830
  }
831
  code = taosThreadOnce(&tmqInit, tmqMgmtInit);
140,820✔
832
  if (code != 0) {
140,932✔
833
    tqErrorC("failed to tmqInit, code:%s", tstrerror(code));
×
834
    SET_ERROR_MSG_TMQ("tmq init error")
×
835
    return NULL;
×
836
  }
837
  if (tmqInitRes != 0) {
140,932✔
838
    SET_ERROR_MSG_TMQ("tmqInitRes init error")
×
839
    return NULL;
×
840
  }
841

842
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
140,932✔
843
  if (pTmq == NULL) {
140,932✔
844
    tqErrorC("failed to create consumer, code:%s", terrstr());
×
845
    SET_ERROR_MSG_TMQ("malloc tmq failed")
×
846
    return NULL;
×
847
  }
848

849
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
140,932✔
850
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
140,932✔
851

852
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
140,932✔
853
  if (pTmq->clientTopics == NULL) {
140,932✔
854
    tqErrorC("failed to init topic array, since:%s", terrstr());
×
855
    SET_ERROR_MSG_TMQ("malloc client topics failed")
×
856
    goto _failed;
×
857
  }
858
  code = taosOpenQueue(&pTmq->mqueue);
140,932✔
859
  if (code) {
140,932✔
860
    tqErrorC("open mqueue failed since %s", tstrerror(code));
×
861
    SET_ERROR_MSG_TMQ("open mqueue failed")
×
862
    goto _failed;
×
863
  }
864

865
  code = taosOpenQueue(&pTmq->delayedTask);
140,932✔
866
  if (code) {
140,932✔
867
    tqErrorC("open delayed task queue failed since %s", tstrerror(code));
×
868
    SET_ERROR_MSG_TMQ("open delayed task queue failed")
×
869
    goto _failed;
×
870
  }
871

872
  if (conf->groupId[0] == 0) {
140,932✔
873
    SET_ERROR_MSG_TMQ("group is empty")
×
874
    goto _failed;
×
875
  }
876

877
  // init status
878
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
140,932✔
879
  pTmq->pollCnt = 0;
140,932✔
880
  pTmq->epoch = 0;
140,932✔
881
  pTmq->pollFlag = 0;
140,932✔
882

883
  // set conf
884
  tstrncpy(pTmq->clientId, conf->clientId, TSDB_CLIENT_ID_LEN);
140,932✔
885
  tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN);
140,932✔
886
  pTmq->withTbName = conf->withTbName;
140,932✔
887
  pTmq->useSnapshot = conf->snapEnable;
140,932✔
888
  pTmq->enableWalMarker = conf->enableWalMarker;
140,932✔
889
  pTmq->autoCommit = conf->autoCommit;
140,932✔
890
  pTmq->autoCommitInterval = conf->autoCommitInterval;
140,932✔
891
  pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
140,932✔
892
  pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
140,932✔
893
  pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
140,932✔
894
  pTmq->commitCb = conf->commitCb;
140,932✔
895
  pTmq->commitCbUserParam = conf->commitCbUserParam;
140,932✔
896
  pTmq->resetOffsetCfg = conf->resetOffset;
140,932✔
897
  pTmq->replayEnable = conf->replayEnable;
140,932✔
898
  pTmq->sourceExcluded = conf->sourceExcluded;
140,932✔
899
  pTmq->rawData = conf->rawData;
140,932✔
900
  pTmq->maxPollWaitTime = conf->maxPollWaitTime;
140,932✔
901
  pTmq->minPollRows = conf->minPollRows;
140,932✔
902
  pTmq->enableBatchMeta = conf->enableBatchMeta;
140,932✔
903
  tstrncpy(pTmq->user, user, TSDB_USER_LEN);
140,932✔
904
  if (taosGetFqdn(pTmq->fqdn) != 0) {
140,932✔
905
    tstrncpy(pTmq->fqdn, "localhost", TSDB_FQDN_LEN);
×
906
  }
907
  if (conf->replayEnable) {
140,932✔
908
    pTmq->autoCommit = false;
2,128✔
909
  }
910
  taosInitRWLatch(&pTmq->lock);
140,932✔
911

912
  // assign consumerId
913
  pTmq->consumerId = tGenIdPI64();
140,932✔
914

915
  // init semaphore
916
  if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) {
140,932✔
917
    tqErrorC("consumer:0x %" PRIx64 " init semaphore failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
918
    SET_ERROR_MSG_TMQ("init t_sem failed")
×
919
    goto _failed;
×
920
  }
921

922
  if (conf->token != NULL) {
140,932✔
923
    code = taos_connect_by_auth(conf->ip, NULL, conf->token, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
696✔
924
    if (code) {
696✔
925
      tqErrorC("consumer:0x%" PRIx64 " connect by token failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
348✔
926
      SET_ERROR_MSG_TMQ(terrstr())
348✔
927
      goto _failed;
348✔
928
    }
929
  } else {
930
    // init connection
931
    code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
140,236✔
932
    if (code) {
140,236✔
933
      tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
×
934
      SET_ERROR_MSG_TMQ(terrstr())
×
935
      goto _failed;
×
936
    }
937
  }
938
  
939
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
140,584✔
940
  if (pTmq->refId < 0) {
140,584✔
941
    SET_ERROR_MSG_TMQ("add tscObj ref failed")
×
942
    goto _failed;
×
943
  }
944

945
  pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
140,584✔
946
  if (pTmq->hbLiveTimer == NULL) {
140,584✔
947
    SET_ERROR_MSG_TMQ("start heartbeat timer failed")
×
948
    goto _failed;
×
949
  }
950
  char         buf[TSDB_OFFSET_LEN] = {0};
140,584✔
951
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
140,584✔
952
  tFormatOffset(buf, tListLen(buf), &offset);
140,584✔
953
  tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
140,584✔
954
              ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, maxPollIntervalMs:%dms, sessionTimeoutMs:%dms",
955
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
956
          buf, pTmq->maxPollIntervalMs, pTmq->sessionTimeoutMs);
957

958
  return pTmq;
140,584✔
959

960
_failed:
348✔
961
  tmqFreeImpl(pTmq);
348✔
962
  return NULL;
348✔
963
}
964

965
int32_t syncAskEp(tmq_t* pTmq) {
1,156,447✔
966
  if (pTmq == NULL) return TSDB_CODE_INVALID_PARA;
1,156,447✔
967
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
1,156,447✔
968
  if (pInfo == NULL) return terrno;
1,156,447✔
969
  if (tsem2_init(&pInfo->sem, 0, 0) != 0) {
1,156,447✔
970
    taosMemoryFree(pInfo);
×
971
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
972
  }
973

974
  int32_t code = askEp(pTmq, pInfo, true, false);
1,156,447✔
975
  if (code == 0) {
1,156,447✔
976
    if (tsem2_wait(&pInfo->sem) != 0){
1,156,447✔
977
      tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
×
978
    }
979
    code = pInfo->code;
1,156,447✔
980
  }
981

982
  if(tsem2_destroy(&pInfo->sem) != 0) {
1,156,447✔
983
    tqErrorC("failed to destroy sem sync ask ep");
×
984
  }
985
  taosMemoryFree(pInfo);
1,156,447✔
986
  return code;
1,156,447✔
987
}
988

989
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
383,426✔
990
  if (tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
383,426✔
991
  const SArray*   container = &topic_list->container;
383,439✔
992
  int32_t         sz = taosArrayGetSize(container);
383,439✔
993
  void*           buf = NULL;
383,426✔
994
  SMsgSendInfo*   sendInfo = NULL;
383,426✔
995
  SCMSubscribeReq req = {0};
383,426✔
996
  int32_t         code = 0;
383,426✔
997

998
  tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
383,426✔
999

1000
  req.consumerId = tmq->consumerId;
383,439✔
1001
  tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
383,439✔
1002
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
383,439✔
1003
  tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
383,439✔
1004
  tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
383,439✔
1005

1006
  req.topicNames = taosArrayInit(sz, sizeof(void*));
383,439✔
1007
  if (req.topicNames == NULL) {
383,439✔
1008
    code = terrno;
×
1009
    goto END;
×
1010
  }
1011

1012
  req.withTbName = tmq->withTbName;
383,439✔
1013
  req.autoCommit = tmq->autoCommit;
383,439✔
1014
  req.autoCommitInterval = tmq->autoCommitInterval;
383,439✔
1015
  req.sessionTimeoutMs = tmq->sessionTimeoutMs;
383,439✔
1016
  req.maxPollIntervalMs = tmq->maxPollIntervalMs;
383,439✔
1017
  req.resetOffsetCfg = tmq->resetOffsetCfg;
383,439✔
1018
  req.enableReplay = tmq->replayEnable;
383,439✔
1019
  req.enableBatchMeta = tmq->enableBatchMeta;
383,439✔
1020

1021
  for (int32_t i = 0; i < sz; i++) {
535,915✔
1022
    char* topic = taosArrayGetP(container, i);
152,476✔
1023
    if (topic == NULL) {
152,476✔
1024
      code = terrno;
×
1025
      goto END;
×
1026
    }
1027
    SName name = {0};
152,476✔
1028
    code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
152,476✔
1029
    if (code) {
152,476✔
1030
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1031
               code);
1032
      goto END;
×
1033
    }
1034
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
152,476✔
1035
    if (topicFName == NULL) {
152,476✔
1036
      code = terrno;
×
1037
      goto END;
×
1038
    }
1039

1040
    code = tNameExtractFullName(&name, topicFName);
152,476✔
1041
    if (code) {
152,476✔
1042
      tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId,
×
1043
               code);
1044
      taosMemoryFree(topicFName);
×
1045
      goto END;
×
1046
    }
1047

1048
    if (taosArrayPush(req.topicNames, &topicFName) == NULL) {
304,952✔
1049
      code = terrno;
×
1050
      taosMemoryFree(topicFName);
×
1051
      goto END;
×
1052
    }
1053
    tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
152,476✔
1054
  }
1055

1056
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
383,439✔
1057
  buf = taosMemoryMalloc(tlen);
383,439✔
1058
  if (buf == NULL) {
383,439✔
1059
    code = terrno;
×
1060
    goto END;
×
1061
  }
1062

1063
  void* abuf = buf;
383,439✔
1064
  tlen = tSerializeSCMSubscribeReq(&abuf, &req);
383,439✔
1065

1066
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
383,439✔
1067
  if (sendInfo == NULL) {
383,426✔
1068
    code = terrno;
×
1069
    taosMemoryFree(buf);
×
1070
    goto END;
×
1071
  }
1072

1073
  SMqSubscribeCbParam param = {.rspErr = 0};
383,426✔
1074
  if (tsem2_init(&param.rspSem, 0, 0) != 0) {
383,426✔
1075
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1076
    taosMemoryFree(buf);
×
1077
    taosMemoryFree(sendInfo);
×
1078
    goto END;
×
1079
  }
1080

1081
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
383,439✔
1082
  sendInfo->requestId = generateRequestId();
383,439✔
1083
  sendInfo->requestObjRefId = 0;
383,439✔
1084
  sendInfo->param = &param;
383,439✔
1085
  sendInfo->fp = tmqSubscribeCb;
383,439✔
1086
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
383,439✔
1087

1088
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
383,439✔
1089

1090
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
383,439✔
1091
  if (code != 0) {
383,439✔
1092
    goto END;
×
1093
  }
1094

1095
  if (tsem2_wait(&param.rspSem) != 0){
383,439✔
1096
    tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
×
1097
  }
1098
  if(tsem2_destroy(&param.rspSem) != 0) {
383,439✔
1099
    tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
×
1100
  }
1101

1102
  if (param.rspErr != 0) {
383,439✔
1103
    code = param.rspErr;
3,086✔
1104
    goto END;
3,086✔
1105
  }
1106

1107
  int32_t retryCnt = 0;
380,353✔
1108
  while ((code = syncAskEp(tmq)) != 0) {
1,155,111✔
1109
    if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
777,101✔
1110
      tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes or code:%s",
2,343✔
1111
               tmq->consumerId, tstrerror(code));
1112
      if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
2,343✔
1113
        code = 0;
2,343✔
1114
      }
1115
      goto END;
2,343✔
1116
    }
1117

1118
    tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
774,758✔
1119
    taosMsleep(SUBSCRIBE_RETRY_INTERVAL);
774,758✔
1120
  }
1121

1122
  if (tmq->epTimer == NULL){
378,010✔
1123
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
137,259✔
1124
    if (tmq->epTimer == NULL) {
137,259✔
1125
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1126
      goto END;
×
1127
    }
1128
  }
1129
  if (tmq->autoCommit && tmq->commitTimer == NULL){
378,010✔
1130
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
84,054✔
1131
    if (tmq->commitTimer == NULL) {
84,054✔
1132
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1133
      goto END;
×
1134
    }
1135
  }
1136

1137
  END:
383,439✔
1138
  taosArrayDestroyP(req.topicNames, NULL);
383,439✔
1139
  return code;
383,439✔
1140
}
1141

1142
void getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId, SMqClientVg** pVg) {
350,308,353✔
1143
  if (tmq == NULL || topicName == NULL || pVg == NULL) {
350,308,353✔
1144
    return;
×
1145
  }
1146
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
350,312,150✔
1147
  for (int i = 0; i < topicNumCur; i++) {
362,810,245✔
1148
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
362,809,665✔
1149
    if (pTopicCur && strcmp(pTopicCur->topicName, topicName) == 0) {
362,810,346✔
1150
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
350,311,482✔
1151
      for (int32_t j = 0; j < vgNumCur; j++) {
699,296,438✔
1152
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
699,296,146✔
1153
        if (pVgCur && pVgCur->vgId == vgId) {
699,293,146✔
1154
          *pVg = pVgCur;
350,309,726✔
1155
          tqTraceC("consumer:0x%" PRIx64 " getVgInfo found topic:%s vgId:%d", tmq->consumerId, topicName, vgId);
350,310,101✔
1156
          return;
350,310,777✔
1157
        }
1158
      }
1159
    }
1160
  }
1161
  tqTraceC("consumer:0x%" PRIx64 " getVgInfo not found topic:%s vgId:%d, total topics:%d", tmq->consumerId, topicName, vgId, topicNumCur);
580✔
1162
}
1163

1164
SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName) {
336,043,631✔
1165
  if (tmq == NULL || topicName == NULL) {
336,043,631✔
1166
    return NULL;
×
1167
  }
1168
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
336,047,793✔
1169
  for (int i = 0; i < topicNumCur; i++) {
348,545,880✔
1170
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
348,545,880✔
1171
    if (strcmp(pTopicCur->topicName, topicName) == 0) {
348,549,691✔
1172
      tqTraceC("consumer:0x%" PRIx64 " getTopicInfo found topic:%s", tmq->consumerId, topicName);
336,051,607✔
1173
      return pTopicCur;
336,048,195✔
1174
    }
1175
  }
UNCOV
1176
  tqTraceC("consumer:0x%" PRIx64 " getTopicInfo not found topic:%s, total:%d", tmq->consumerId, topicName, topicNumCur);
×
1177
  return NULL;
×
1178
}
1179

1180
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
350,420,436✔
1181
  tmq_t*             tmq = NULL;
350,420,436✔
1182
  SMqRspWrapper*     pRspWrapper = NULL;
350,420,436✔
1183
  int8_t             rspType = 0;
350,424,773✔
1184
  int32_t            vgId = 0;
350,424,773✔
1185
  uint64_t           requestId = 0;
350,424,773✔
1186
  SMqPollCbParam*    pParam = (SMqPollCbParam*)param;
350,424,773✔
1187
  if (pMsg == NULL) {
350,424,773✔
1188
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1189
  }
1190
  if (pParam == NULL) {
350,424,773✔
1191
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1192
    goto EXIT;
×
1193
  }
1194
  int64_t refId = pParam->refId;
350,424,773✔
1195
  vgId = pParam->vgId;
350,423,226✔
1196
  requestId = pParam->requestId;
350,422,541✔
1197
  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
350,423,404✔
1198
  if (tmq == NULL) {
350,427,152✔
1199
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
1200
    goto EXIT;
×
1201
  }
1202

1203
  int32_t ret = taosAllocateQitem(sizeof(SMqRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
350,427,152✔
1204
  if (ret) {
350,408,859✔
1205
    code = ret;
321✔
1206
    tqWarnC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
321✔
1207
    goto END;
×
1208
  }
1209

1210
  if (code != 0) {
350,411,217✔
1211
    goto END;
14,273,345✔
1212
  }
1213

1214
  if (pMsg->pData == NULL) {
336,137,872✔
1215
    tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
×
1216
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1217
    goto END;
×
1218
  }
1219

1220
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
336,140,836✔
1221
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
336,143,417✔
1222

1223
  if (msgEpoch != clientEpoch) {
336,139,710✔
1224
    tqWarnC("consumer:0x%" PRIx64" msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
743✔
1225
             tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
1226
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
743✔
1227
    goto END;
743✔
1228
  }
1229
  rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
336,138,967✔
1230
  tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
336,141,027✔
1231

1232
  pRspWrapper->tmqRspType = rspType;
336,150,199✔
1233
  pRspWrapper->pollRsp.reqId = requestId;
336,149,873✔
1234
  pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
336,149,655✔
1235
  pRspWrapper->pollRsp.data = pMsg->pData;
336,149,560✔
1236
  pRspWrapper->pollRsp.len = pMsg->len;
336,151,285✔
1237
  pMsg->pData = NULL;
336,149,858✔
1238
  pMsg->pEpSet = NULL;
336,149,678✔
1239

1240
  END:
350,423,326✔
1241
  if (pRspWrapper) {
350,422,272✔
1242
    pRspWrapper->code = code;
350,422,988✔
1243
    pRspWrapper->pollRsp.vgId = vgId;
350,423,691✔
1244
    tstrncpy(pRspWrapper->pollRsp.topicName, pParam->topicName, TSDB_TOPIC_FNAME_LEN);
350,420,946✔
1245
    code = taosWriteQitem(tmq->mqueue, pRspWrapper);
350,425,640✔
1246
    if (code != 0) {
350,426,170✔
1247
      tmqFreeRspWrapper(pRspWrapper);
×
1248
      taosFreeQitem(pRspWrapper);
×
1249
      tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
×
1250
    } else {
1251
      tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
350,426,170✔
1252
               tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
1253
    }
1254
  }
1255

1256
  if (tsem2_post(&tmq->rspSem) != 0){
350,425,256✔
1257
    tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
×
1258
  }
1259
  ret = taosReleaseRef(tmqMgmt.rsetId, refId);
350,427,346✔
1260
  if (ret != 0){
350,425,962✔
1261
    tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
×
1262
  }
1263

1264
  EXIT:
350,425,962✔
1265
  taosMemoryFreeClear(pMsg->pData);
350,423,652✔
1266
  taosMemoryFreeClear(pMsg->pEpSet);
350,426,314✔
1267
  return code;
350,419,275✔
1268
}
1269

1270
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
350,429,110✔
1271
  if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
350,429,110✔
UNCOV
1272
    return;
×
1273
  }
1274
  (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
350,429,485✔
1275
  pReq->withTbName = tmq->withTbName;
350,428,809✔
1276
  pReq->consumerId = tmq->consumerId;
350,429,522✔
1277
  pReq->timeout = tmq->maxPollWaitTime;
350,429,522✔
1278
  pReq->minPollRows = tmq->minPollRows;
350,428,360✔
1279
  pReq->epoch = atomic_load_32(&tmq->epoch);
350,429,110✔
1280
  pReq->reqOffset = pVg->offsetInfo.endOffset;
350,428,260✔
1281
  pReq->head.vgId = pVg->vgId;
350,428,709✔
1282
  pReq->useSnapshot = tmq->useSnapshot;
350,428,408✔
1283
  pReq->reqId = generateRequestId();
350,429,496✔
1284
  pReq->enableReplay = tmq->replayEnable;
350,427,673✔
1285
  pReq->sourceExcluded = tmq->sourceExcluded;
350,428,835✔
1286
  pReq->rawData = tmq->rawData;
350,429,094✔
1287
  pReq->enableBatchMeta = tmq->enableBatchMeta;
350,428,793✔
1288
}
1289

1290
void changeByteEndian(char* pData) {
88,651,420✔
1291
  if (pData == NULL) {
88,651,420✔
1292
    return;
×
1293
  }
1294
  char* p = pData;
88,651,420✔
1295

1296
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
1297
  // length | version:
1298
  int32_t blockVersion = *(int32_t*)p;
88,651,420✔
1299
  if (blockVersion != BLOCK_VERSION_1) {
88,651,795✔
1300
    tqErrorC("invalid block version:%d", blockVersion);
×
1301
    return;
×
1302
  }
1303
  *(int32_t*)p = BLOCK_VERSION_2;
88,651,795✔
1304

1305
  p += sizeof(int32_t);
88,651,045✔
1306
  p += sizeof(int32_t);
88,650,670✔
1307
  p += sizeof(int32_t);
88,651,045✔
1308
  int32_t cols = *(int32_t*)p;
88,651,045✔
1309
  p += sizeof(int32_t);
88,651,045✔
1310
  p += sizeof(int32_t);
88,650,670✔
1311
  p += sizeof(uint64_t);
88,651,045✔
1312
  // check fields
1313
  p += cols * (sizeof(int8_t) + sizeof(int32_t));
88,651,045✔
1314

1315
  int32_t* colLength = (int32_t*)p;
88,650,670✔
1316
  for (int32_t i = 0; i < cols; ++i) {
555,336,130✔
1317
    colLength[i] = htonl(colLength[i]);
466,688,835✔
1318
  }
1319
}
1320

1321
static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, int64_t* rows, int32_t* precision) {
88,651,795✔
1322
  if (pRetrieve == NULL || rawData == NULL || rows == NULL) {
88,651,795✔
1323
    return;
×
1324
  }
1325
  if (*(int64_t*)pRetrieve == 0) {
88,652,170✔
1326
    *rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1327
    *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
1328
    if (precision != NULL) {
×
1329
      *precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
1330
    }
1331
  } else if (*(int64_t*)pRetrieve == 1) {
88,651,795✔
1332
    *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
88,652,545✔
1333
    *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
88,652,545✔
1334
    if (precision != NULL) {
88,652,170✔
1335
      *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
×
1336
    }
1337
  }
1338
}
1339

1340
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
55,099,078✔
1341
                                        SMqRspObj* pRspObj) {
1342
  if (pWrapper == NULL || pVg == NULL || numOfRows == NULL || pRspObj == NULL) {
55,099,078✔
1343
    return;
×
1344
  }
1345
  pRspObj->resIter = -1;
55,099,078✔
1346
  pRspObj->resInfo.totalRows = 0;
55,099,453✔
1347
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
55,099,078✔
1348

1349
  SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
55,099,453✔
1350
  // extract the rows in this data packet
1351
  for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
143,749,748✔
1352
    void*   pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
88,652,170✔
1353
    void*   rawData = NULL;
88,652,545✔
1354
    int64_t rows = 0;
88,652,545✔
1355
    // deal with compatibility
1356
    tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
88,652,545✔
1357

1358
    pVg->numOfRows += rows;
88,652,170✔
1359
    (*numOfRows) += rows;
88,652,170✔
1360
    changeByteEndian(rawData);
88,652,170✔
1361
  }
1362
}
1363

1364
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
350,427,950✔
1365
  SMqPollReq      req = {0};
350,427,950✔
1366
  char*           msg = NULL;
350,427,200✔
1367
  SMqPollCbParam* pParam = NULL;
350,427,200✔
1368
  SMsgSendInfo*   sendInfo = NULL;
350,427,200✔
1369
  int             code = 0;
350,427,200✔
1370
  int             lino = 0;
350,427,200✔
1371
  tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
350,427,200✔
1372

1373
  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
350,427,797✔
1374
  TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
350,415,869✔
1375

1376
  msg = taosMemoryCalloc(1, msgSize);
350,415,869✔
1377
  TSDB_CHECK_NULL(msg, code, lino, END, terrno);
350,405,789✔
1378

1379
  TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
350,405,789✔
1380

1381
  pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
350,413,672✔
1382
  TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
350,396,335✔
1383

1384
  pParam->refId = pTmq->refId;
350,396,335✔
1385
  tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
350,409,073✔
1386
  pParam->vgId = pVg->vgId;
350,402,753✔
1387
  pParam->requestId = req.reqId;
350,416,680✔
1388

1389
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
350,415,454✔
1390
  TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
350,392,487✔
1391

1392
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
350,392,487✔
1393
  sendInfo->requestId = req.reqId;
350,398,465✔
1394
  sendInfo->requestObjRefId = 0;
350,413,613✔
1395
  sendInfo->param = pParam;
350,411,323✔
1396
  sendInfo->paramFreeFp = taosAutoMemoryFree;
350,420,442✔
1397
  sendInfo->fp = tmqPollCb;
350,417,795✔
1398
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
350,412,471✔
1399

1400
  msg = NULL;
350,418,118✔
1401
  pParam = NULL;
350,418,118✔
1402

1403
  char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
350,418,118✔
1404
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
350,409,491✔
1405
  code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
350,403,705✔
1406
  tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
350,427,271✔
1407
           pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
1408
  TSDB_CHECK_CODE(code, lino, END);
350,427,651✔
1409

1410
  pVg->pollCnt++;
350,427,651✔
1411
  pVg->seekUpdated = false;  // reset this flag.
350,428,776✔
1412
  pTmq->pollCnt++;
350,428,401✔
1413

1414
END:
350,428,401✔
1415
  if (code != 0){
350,426,901✔
1416
    tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
×
1417
  }
1418
  taosMemoryFreeClear(pParam);
350,428,401✔
1419
  taosMemoryFreeClear(msg);
350,428,401✔
1420
  return code;
350,428,401✔
1421
}
1422

1423
static int32_t tmqPollImpl(tmq_t* tmq) {
492,515,110✔
1424
  if (tmq == NULL) {
492,515,110✔
1425
    return TSDB_CODE_INVALID_MSG;
×
1426
  }
1427
  tmqWlock(tmq);
492,515,110✔
1428

1429
  int32_t code = atomic_load_32(&tmq->tokenCode);
492,520,117✔
1430
  if (code == TSDB_CODE_MND_TOKEN_NOT_EXIST || code == TSDB_CODE_MND_TOKEN_DISABLED || code == TSDB_CODE_MND_TOKEN_EXPIRED){
492,518,992✔
1431
    goto end;
9,048✔
1432
  } else {
1433
    code = 0;
492,509,944✔
1434
  }
1435

1436
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
492,509,944✔
1437
    code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
7,850✔
1438
    goto end;
7,850✔
1439
  }
1440

1441
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
492,500,100✔
1442
  tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
492,502,046✔
1443

1444
  for (int i = 0; i < numOfTopics; i++) {
1,015,576,088✔
1445
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
523,066,935✔
1446
    if (pTopic == NULL) {
523,067,345✔
1447
      continue;
×
1448
    }
1449
    int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
523,067,345✔
1450
    if (pTopic->noPrivilege) {
523,067,395✔
1451
      tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
16,929✔
1452
      continue;
16,929✔
1453
    }
1454
    for (int j = 0; j < numOfVg; j++) {
1,932,591,393✔
1455
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
1,409,533,097✔
1456
      if (pVg == NULL) {
1,409,540,602✔
1457
        continue;
×
1458
      }
1459

1460
      int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
1,409,542,858✔
1461
      if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) {  // less than EMPTY_BLOCK_POLL_IDLE_DURATION
1,409,542,821✔
1462
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll",
826,945,721✔
1463
                 tmq->consumerId, tmq->epoch, pVg->vgId, elapsed);
1464
        continue;
826,938,601✔
1465
      }
1466

1467
      elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
582,596,801✔
1468
      if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
582,596,426✔
1469
        tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
488,528✔
1470
                 tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
1471
        continue;
488,528✔
1472
      }
1473

1474
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
582,108,685✔
1475
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
582,110,584✔
1476
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
231,682,220✔
1477
        if (vgSkipCnt % 10000 == 0) {
231,682,220✔
1478
          tqInfoC("consumer:0x%" PRIx64 " epoch %d, vgId:%d has skipped poll %d times in a row", tmq->consumerId,
×
1479
                  tmq->epoch, pVg->vgId, vgSkipCnt);
1480
        }
1481
        tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
231,681,845✔
1482
                 pVg->vgId, vgSkipCnt);
1483
        continue;
231,681,845✔
1484
      }
1485

1486
      atomic_store_32(&pVg->vgSkipCnt, 0);
350,428,364✔
1487
      code = doTmqPollImpl(tmq, pTopic, pVg);
350,428,401✔
1488
      if (code != TSDB_CODE_SUCCESS) {
350,432,814✔
1489
        goto end;
×
1490
      }
1491
    }
1492
  }
1493

1494
  end:
492,509,153✔
1495
  tmqWUnlock(tmq);
492,526,051✔
1496
  tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
492,521,973✔
1497
  return code;
492,520,774✔
1498
}
1499

1500
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
336,046,248✔
1501
                         int64_t consumerId, bool hasData) {
1502
  if (pVg == NULL || reqOffset == NULL || rspOffset == NULL) {
336,046,248✔
1503
    return;
×
1504
  }
1505
  if (!pVg->seekUpdated) {
336,050,827✔
1506
    tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
336,050,814✔
1507
    if (hasData) {
336,053,082✔
1508
      tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset);
55,193,619✔
1509
    }
1510
    tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset);
336,052,332✔
1511
  } else {
1512
    tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId);
13✔
1513
  }
1514

1515
  // update the status
1516
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
336,053,040✔
1517

1518
  // update the valid wal version range
1519
  pVg->offsetInfo.walVerBegin = sver;
336,053,470✔
1520
  pVg->offsetInfo.walVerEnd = ever + 1;
336,053,470✔
1521
}
1522

1523
static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
55,193,244✔
1524
  typedef union {
1525
    SMqDataRsp      dataRsp;
1526
    SMqMetaRsp      metaRsp;
1527
    SMqBatchMetaRsp batchMetaRsp;
1528
  } MEMSIZE;
1529

1530
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
55,193,244✔
1531
  if (pRspObj == NULL) {
55,192,869✔
1532
    tqErrorC("buildRsp:failed to allocate memory");
×
1533
    return NULL;
×
1534
  }
1535
  (void)memcpy(&pRspObj->dataRsp, &pollRspWrapper->dataRsp, sizeof(MEMSIZE));
55,192,869✔
1536
  tstrncpy(pRspObj->topic, pollRspWrapper->topicName, TSDB_TOPIC_FNAME_LEN);
55,192,869✔
1537
  tstrncpy(pRspObj->db, pollRspWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
55,193,244✔
1538
  pRspObj->vgId = pollRspWrapper->vgId;
55,193,244✔
1539
  (void)memset(&pollRspWrapper->dataRsp, 0, sizeof(MEMSIZE));
55,193,244✔
1540
  return pRspObj;
55,193,619✔
1541
}
1542

1543
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
14,260,592✔
1544
  int32_t code = pRspWrapper->code;
14,260,592✔
1545
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
14,260,592✔
1546

1547
  tqErrorC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
14,260,592✔
1548
    tstrerror(pRspWrapper->code));
1549
  if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||   // for vnode transform
14,260,592✔
1550
      pRspWrapper->code == TSDB_CODE_SYN_NOT_LEADER) {          // for vnode split
14,265,947✔
1551
    int32_t ret = askEp(tmq, NULL, false, true);
10,003,267✔
1552
    if (ret != 0) {
10,003,267✔
1553
      tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when vnode transform, ret:%s", tmq->consumerId, tstrerror(ret));
×
1554
    }
1555
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
4,257,325✔
1556
    code = syncAskEp(tmq);
1,336✔
1557
    if (code != 0) {
1,336✔
1558
      tqWarnC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
×
1559
    }
1560
  } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
4,255,989✔
1561
    code = 0;
4,252,869✔
1562
  }
1563
  
1564
  tmqWlock(tmq);
14,260,592✔
1565
  SMqClientVg* pVg = NULL;
14,260,592✔
1566
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
14,260,592✔
1567
  if (pVg) {
14,260,592✔
1568
    pVg->emptyBlockReceiveTs = taosGetTimestampMs();
28,487,394✔
1569
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
14,260,012✔
1570
  }
1571
  tmqWUnlock(tmq);
14,260,592✔
1572

1573
  return code;
14,260,592✔
1574
}
1575

1576
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
336,049,308✔
1577
  int32_t code = 0;
336,049,308✔
1578
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
336,049,308✔
1579
    PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
335,952,669✔
1580
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
335,951,489✔
1581
    pRspWrapper->pollRsp.data = NULL;
335,951,076✔
1582
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
100,426✔
1583
    PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
85,274✔
1584
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
15,152✔
1585
    PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
6,635✔
1586
    pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
6,635✔
1587
    pRspWrapper->pollRsp.data = NULL;
6,635✔
1588
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
8,517✔
1589
    PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
8,517✔
1590
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
×
1591
    PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
×
1592
    pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
×
1593
    pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
×
1594
    pRspWrapper->pollRsp.data = NULL;
×
1595
  } else {
1596
    tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
×
1597
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
1598
    goto END;
×
1599
  }
1600
  END:
336,050,002✔
1601
  return code;
336,050,002✔
1602
}
1603

1604
static int32_t processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper, SMqRspObj** pRspObj){
338,954,148✔
1605
  int32_t    code = 0;
338,954,148✔
1606

1607
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
338,954,148✔
1608
    tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
2,902,965✔
1609
    SMqAskEpRsp*        rspMsg = &pRspWrapper->epRsp;
2,902,965✔
1610
    doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
2,902,965✔
1611
    return code;
2,902,965✔
1612
  }
1613

1614
  code = processWrapperData(pRspWrapper);
336,053,433✔
1615
  if (code != 0) {
336,050,323✔
1616
    return code;
×
1617
  }
1618
  SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
336,050,323✔
1619
  tmqWlock(tmq);
336,052,227✔
1620
  SMqClientVg* pVg = NULL;
336,051,516✔
1621
  getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
336,051,553✔
1622
  if(pVg == NULL) {
336,049,334✔
1623
    tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
×
1624
             pollRspWrapper->topicName, pollRspWrapper->vgId);
1625
    code = TSDB_CODE_TMQ_INVALID_VGID;
×
1626
    goto END;
×
1627
  }
1628
  pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
336,049,334✔
1629
  if (pollRspWrapper->pEpset != NULL) {
336,048,903✔
1630
    pVg->epSet = *pollRspWrapper->pEpset;
8,600✔
1631
  }
1632

1633
  if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
336,051,158✔
1634
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
100,426✔
1635
      pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
336,053,457✔
1636
    updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
335,955,161✔
1637
                 tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
335,955,462✔
1638
    
1639
    if (pollRspWrapper->dataRsp.timeout) {
335,959,642✔
1640
      tqInfoC("consumer:0x%" PRIx64 " poll data timeout, vgId:%d", tmq->consumerId, pVg->vgId);
×
1641
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
×
1642
      goto END;
×
1643
    }
1644
    char buf[TSDB_OFFSET_LEN] = {0};
335,959,568✔
1645
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
335,958,855✔
1646
    if (pollRspWrapper->dataRsp.blockNum == 0) {
335,959,531✔
1647
      pVg->emptyBlockReceiveTs = taosGetTimestampMs();
556,740,734✔
1648
      tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
280,859,426✔
1649
                   ", total:%" PRId64 ", QID:0x%" PRIx64,
1650
               tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1651
    } else {
1652
      *pRspObj = buildRsp(pollRspWrapper);
55,099,453✔
1653
      if (*pRspObj == NULL) {
55,099,828✔
1654
        tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
1655
        code = terrno;
×
1656
        goto END;
×
1657
      }
1658
      (*pRspObj)->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
110,198,156✔
1659
                         (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
55,099,453✔
1660
      int64_t numOfRows = 0;
55,099,078✔
1661
      if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
55,099,453✔
1662
        tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, *pRspObj);
55,099,078✔
1663
        tmq->totalRows += numOfRows;
55,099,078✔
1664
      }
1665
      if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
55,099,453✔
1666
        pVg->blockReceiveTs = taosGetTimestampMs();
10,336✔
1667
        pVg->blockSleepForReplay = (*pRspObj)->dataRsp.sleepTime;
5,168✔
1668
        if (pVg->blockSleepForReplay > 0) {
5,168✔
1669
          if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
3,952✔
1670
            tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
×
1671
                     tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay);
1672
          }
1673
        }
1674
      }
1675
      pVg->emptyBlockReceiveTs = 0;
55,099,453✔
1676
      tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
55,098,703✔
1677
                   ", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
1678
               tmq->consumerId, pVg->vgId, buf, (*pRspObj)->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
1679
               pollRspWrapper->reqId);
1680
    }
1681
  } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
93,791✔
1682
    updateVgInfo(pVg, &pollRspWrapper->rspOffset, &pollRspWrapper->rspOffset,
93,791✔
1683
                 pollRspWrapper->head.walsver, pollRspWrapper->head.walever, tmq->consumerId, true);
1684

1685
    *pRspObj = buildRsp(pollRspWrapper);
93,791✔
1686
    if (*pRspObj == NULL) {
93,791✔
1687
      tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
×
1688
      code = terrno;
×
1689
      goto END;
×
1690
    }
1691
    (*pRspObj)->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
93,791✔
1692
  }
1693

1694
END:
330,902,367✔
1695
  tmqWUnlock(tmq);
336,049,633✔
1696
  return code;
336,052,271✔
1697
}
1698

1699
static int32_t tmqHandleAllRsp(tmq_t* tmq, SMqRspObj** rspObj) {
557,721,240✔
1700
  tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
557,721,240✔
1701

1702
  int32_t code = 0;
557,719,003✔
1703
  while (1) {
288,014,371✔
1704
    SMqRspWrapper* pRspWrapper = NULL;
845,733,374✔
1705
    taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
845,730,340✔
1706
    if (pRspWrapper == NULL) {break;}
845,907,334✔
1707

1708
    tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
353,215,490✔
1709
    if (pRspWrapper->code != 0) {
353,212,527✔
1710
      code = processMqRspError(tmq, pRspWrapper);
14,260,592✔
1711
    }else{
1712
      code = processMqRsp(tmq, pRspWrapper, rspObj);
338,956,023✔
1713
    }
1714

1715
    tmqFreeRspWrapper(pRspWrapper);
353,213,990✔
1716
    taosFreeQitem(pRspWrapper);
353,214,287✔
1717
    if(*rspObj != NULL || code != 0){
353,214,334✔
1718
      break;
1719
    }
1720
  }
1721

1722
END:
557,716,301✔
1723
  return code;
557,716,301✔
1724
}
1725

1726
static void printResult(TAOS_RES* res) {
×
1727
  typedef union {
1728
    SMqDataRsp      dataRsp;
1729
    SMqMetaRsp      metaRsp;
1730
    SMqBatchMetaRsp batchMetaRsp;
1731
  } MEMSIZE;
1732
  SMqRspObj* msg = (SMqRspObj*)res;
×
1733

1734
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
×
1735
  if (pRspObj == NULL) {
×
1736
    tqErrorC("%s:failed to allocate memory", __func__);
×
1737
    return;
×
1738
  }
1739
  (void)memcpy(&pRspObj->dataRsp, &msg->dataRsp, sizeof(MEMSIZE));
×
1740
  tstrncpy(pRspObj->topic, msg->topic, TSDB_TOPIC_FNAME_LEN);
×
1741
  tstrncpy(pRspObj->db, msg->db, TSDB_DB_FNAME_LEN);
×
1742
  pRspObj->vgId = msg->vgId;
×
1743

1744
  const char* topicName = tmq_get_topic_name(pRspObj);
×
1745
  const char* dbName = tmq_get_db_name(pRspObj);
×
1746
  int32_t     vgroupId = tmq_get_vgroup_id(pRspObj);
×
1747
  char        buf[1024] = {0};
×
1748

1749
  while (1) {
×
1750
    TAOS_ROW row = taos_fetch_row(pRspObj);
×
1751
    if (row == NULL) break;
×
1752

1753
    TAOS_FIELD* fields = taos_fetch_fields(pRspObj);
×
1754
    int32_t     numOfFields = taos_field_count(pRspObj);
×
1755
    int32_t precision = taos_result_precision(pRspObj);
×
1756
    taos_print_row_with_size(buf, sizeof(buf), row, fields, numOfFields);
×
1757
    tqTraceC("topic: %s, db: %s, vgId: %d, precision: %d, row content: %s", topicName, dbName, vgroupId, precision, buf);
×
1758
  }
1759
  taosMemoryFree(pRspObj);
×
1760
}
1761

1762
static int64_t getElapsedTime(int64_t startTime) {
492,504,246✔
1763
  int64_t currentTime = taosGetTimestampMs();
492,505,075✔
1764
  int64_t elapsedTime = currentTime - startTime;
492,505,075✔
1765
  return elapsedTime;
492,505,075✔
1766
}
1767

1768
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
65,298,661✔
1769
  int32_t lino = 0;
65,298,661✔
1770
  int32_t code = 0;
65,298,661✔
1771
  terrno = 0;
65,298,661✔
1772
  TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
65,299,411✔
1773

1774
  int64_t startTime = taosGetTimestampMs();
65,300,083✔
1775

1776
  tqDebugC("%s consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, __func__, tmq->consumerId, startTime, timeout);
65,300,083✔
1777
  TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
65,301,208✔
1778

1779
  (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
65,301,208✔
1780

1781
  while (1) {
492,418,099✔
1782
    code = tmqHandleAllDelayedTask(tmq);
557,719,307✔
1783
    TSDB_CHECK_CODE(code, lino, END);
557,725,476✔
1784

1785
    SMqRspObj*   rspObj = NULL;
557,720,406✔
1786
    code = tmqHandleAllRsp(tmq, &rspObj);
557,721,615✔
1787
    if (rspObj) {
557,719,280✔
1788
      tqDebugC("%s consumer:0x%" PRIx64 " end to poll, return rsp:%p", __func__, tmq->consumerId, rspObj);
55,193,671✔
1789
      if ((rspObj->resType == RES_TYPE__TMQ || rspObj->resType == RES_TYPE__TMQ_METADATA) && (tqClientDebugFlag & DEBUG_TRACE)) {
55,193,619✔
1790
        printResult((TAOS_RES*)rspObj);
×
1791
      }
1792
      return (TAOS_RES*)rspObj;
55,193,619✔
1793
    }
1794
    TSDB_CHECK_CODE(code, lino, END);
502,525,609✔
1795

1796
    code = tmqPollImpl(tmq);
492,519,222✔
1797
    TSDB_CHECK_CODE(code, lino, END);
492,521,149✔
1798

1799
    if (timeout >= 0) {
492,504,251✔
1800
      int64_t elapsedTime = getElapsedTime(startTime);
492,505,001✔
1801
      TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
492,505,038✔
1802
      int64_t remaining = timeout - elapsedTime;  
492,420,734✔
1803
      int64_t waitMs = (remaining < EMPTY_BLOCK_POLL_IDLE_DURATION) ? remaining : EMPTY_BLOCK_POLL_IDLE_DURATION;  
492,420,734✔
1804
      (void)tsem2_timewait(&tmq->rspSem, waitMs);  
492,420,734✔
1805
    } else {
1806
      (void)tsem2_timewait(&tmq->rspSem, 1000);
×
1807
    }
1808
  }
1809

1810
END:
10,107,667✔
1811
  if (code != 0) {
10,107,667✔
1812
    terrno = code;
10,023,363✔
1813
    tqErrorC("%s consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", __func__, tmq != NULL ? tmq->consumerId : 0, lino, tstrerror(code));
10,023,363✔
1814
  } else {
1815
    tqDebugC("%s consumer:0x%" PRIx64 " poll end with timeout, msg:%s", __func__, tmq != NULL ? tmq->consumerId : 0, tstrerror(terrno));
84,304✔
1816
  }
1817
  return NULL;
10,107,667✔
1818
}
1819

1820
static void displayConsumeStatistics(tmq_t* pTmq) {
244,120✔
1821
  if (pTmq == NULL) return;
244,120✔
1822
  tmqRlock(pTmq);
244,120✔
1823
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
244,120✔
1824
  tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
244,120✔
1825
          pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
1826

1827
  tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
244,120✔
1828
  for (int32_t i = 0; i < numOfTopics; ++i) {
382,726✔
1829
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
138,606✔
1830
    if (pTopics == NULL) continue;
138,606✔
1831
    tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
138,606✔
1832
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
138,606✔
1833
    for (int32_t j = 0; j < numOfVgs; ++j) {
547,192✔
1834
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
408,586✔
1835
      if (pVg == NULL) continue;
408,586✔
1836
      tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
408,586✔
1837
    }
1838
  }
1839
  tmqRUnlock(pTmq);
244,120✔
1840
  tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
244,120✔
1841
}
1842

1843
int32_t tmq_unsubscribe(tmq_t* tmq) {
244,120✔
1844
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
244,120✔
1845
  int32_t code = 0;
244,120✔
1846
  int8_t status = atomic_load_8(&tmq->status);
244,120✔
1847
  tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
244,120✔
1848

1849
  displayConsumeStatistics(tmq);
244,120✔
1850
  if (status != TMQ_CONSUMER_STATUS__READY) {
244,120✔
1851
    tqInfoC("consumer:0x%" PRIx64 " status:%d, not in ready state, no need unsubscribe", tmq->consumerId, status);
10,782✔
1852
    goto END;
10,782✔
1853
  }
1854
  if (tmq->autoCommit) {
233,338✔
1855
    code = tmq_commit_sync(tmq, NULL);
138,700✔
1856
    if (code != 0) {
138,700✔
1857
      goto END;
×
1858
    }
1859
  }
1860
  tmqSendHbReq((void*)(tmq->refId), NULL);
233,338✔
1861

1862
  tmq_list_t* lst = tmq_list_new();
233,338✔
1863
  if (lst == NULL) {
233,338✔
1864
    code = terrno;
×
1865
    goto END;
×
1866
  }
1867
  code = tmq_subscribe(tmq, lst);
233,338✔
1868
  tmq_list_destroy(lst);
233,338✔
1869
  tmqClearUnhandleMsg(tmq);
233,338✔
1870
  atomic_store_32(&tmq->epoch, 0);
233,338✔
1871
  if(code != 0){
233,338✔
1872
    goto END;
348✔
1873
  }
1874

1875
  END:
232,990✔
1876
  return code;
244,120✔
1877
}
1878

1879
int32_t tmq_consumer_close(tmq_t* tmq) {
139,567✔
1880
  if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
139,567✔
1881
  int32_t code = 0;
139,489✔
1882
  (void) taosThreadMutexLock(&tmqMgmt.lock);
139,489✔
1883
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
139,489✔
1884
    goto end;
2,192✔
1885
  }
1886
  tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
137,297✔
1887
  code = tmq_unsubscribe(tmq);
137,297✔
1888
  if (code == 0) {
137,297✔
1889
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
136,949✔
1890
    code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
136,949✔
1891
    if (code != 0){
136,949✔
1892
      tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
×
1893
    }
1894
  }
1895

1896
end:
137,297✔
1897
  (void)taosThreadMutexUnlock(&tmqMgmt.lock);
139,489✔
1898
  return code;
139,489✔
1899
}
1900

1901
const char* tmq_err2str(int32_t err) {
106,699✔
1902
  if (err == 0) {
106,699✔
1903
    return "success";
101,371✔
1904
  } else if (err == -1) {
5,328✔
1905
    return "fail";
×
1906
  } else {
1907
    if (*(taosGetErrMsg()) == 0) {
5,328✔
1908
      return tstrerror(err);
2,162✔
1909
    } else {
1910
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
3,166✔
1911
      return (const char*)taosGetErrMsgReturn();
3,166✔
1912
    }
1913
  }
1914
}
1915

1916
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
55,170,810✔
1917
  if (res == NULL) {
55,170,810✔
1918
    return TMQ_RES_INVALID;
6,048✔
1919
  }
1920
  if (TD_RES_TMQ(res)) {
55,164,762✔
1921
    return TMQ_RES_DATA;
55,068,605✔
1922
  } else if (TD_RES_TMQ_META(res)) {
95,782✔
1923
    return TMQ_RES_TABLE_META;
76,591✔
1924
  } else if (TD_RES_TMQ_METADATA(res)) {
19,191✔
1925
    return TMQ_RES_METADATA;
11,290✔
1926
  } else if (TD_RES_TMQ_BATCH_META(res)) {
7,901✔
1927
    return TMQ_RES_TABLE_META;
7,901✔
1928
  } else if (TD_RES_TMQ_RAW(res)) {
×
1929
    return TMQ_RES_RAWDATA;
×
1930
  } else {
1931
    return TMQ_RES_INVALID;
×
1932
  }
1933
}
1934

1935
const char* tmq_get_topic_name(TAOS_RES* res) {
35,345,331✔
1936
  if (res == NULL) {
35,345,331✔
1937
    return NULL;
5,292✔
1938
  }
1939
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
35,340,039✔
1940
      TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
82,870✔
1941
    char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
35,340,414✔
1942
    if (tmp == NULL) {
35,340,414✔
1943
      return NULL;
×
1944
    }
1945
    return tmp + 1;
35,340,414✔
1946
  } else {
1947
    return NULL;
×
1948
  }
1949
}
1950

1951
const char* tmq_get_db_name(TAOS_RES* res) {
35,324,366✔
1952
  if (res == NULL) {
35,324,366✔
1953
    return NULL;
2,646✔
1954
  }
1955

1956
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
35,321,720✔
1957
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
82,870✔
1958
    char* tmp = strchr(((SMqRspObj*)res)->db, '.');
35,322,845✔
1959
    if (tmp == NULL) {
35,322,470✔
1960
      return NULL;
×
1961
    }
1962
    return tmp + 1;
35,322,470✔
1963
  } else {
1964
    return NULL;
×
1965
  }
1966
}
1967

1968
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
35,326,323✔
1969
  if (res == NULL) {
35,326,323✔
1970
    return TSDB_CODE_INVALID_PARA;
6,048✔
1971
  }
1972
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
35,320,275✔
1973
      TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
82,870✔
1974
    return ((SMqRspObj*)res)->vgId;
35,320,650✔
1975
  } else {
1976
    return TSDB_CODE_INVALID_PARA;
×
1977
  }
1978
}
1979

1980
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
166,064✔
1981
  if (res == NULL) {
166,064✔
1982
    return TSDB_CODE_INVALID_PARA;
6,048✔
1983
  }
1984
  if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
160,016✔
1985
    SMqRspObj* pRspObj = (SMqRspObj*)res;
160,016✔
1986
    STqOffsetVal*     pOffset = &pRspObj->dataRsp.reqOffset;
160,016✔
1987
    if (pOffset->type == TMQ_OFFSET__LOG) {
160,016✔
1988
      return pOffset->version;
160,016✔
1989
    } else {
1990
      tqErrorC("invalid offset type:%d", pOffset->type);
×
1991
    }
UNCOV
1992
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
UNCOV
1993
    SMqRspObj* pRspObj = (SMqRspObj*)res;
×
UNCOV
1994
    if (pRspObj->rspOffset.type == TMQ_OFFSET__LOG) {
×
UNCOV
1995
      return pRspObj->rspOffset.version;
×
1996
    }
1997
  } else {
1998
    tqErrorC("invalid tmq type:%d", *(int8_t*)res);
×
1999
  }
2000

2001
  // data from tsdb, no valid offset info
2002
  return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
2003
}
2004

2005
const char* tmq_get_table_name(TAOS_RES* res) {
2,147,483,647✔
2006
  if (res == NULL) {
2,147,483,647✔
2007
    return NULL;
4,914✔
2008
  }
2009
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
2,147,483,647✔
2010
    SMqRspObj* pRspObj = (SMqRspObj*)res;
2,147,483,647✔
2011
    SMqDataRsp* data = &pRspObj->dataRsp;
2,147,483,647✔
2012
    if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
2,147,483,647✔
2013
        pRspObj->resIter >= data->blockNum) {
1,878,811,665✔
2014
      return NULL;
2,147,483,647✔
2015
    }
2016
    return (const char*)taosArrayGetP(data->blockTbName, pRspObj->resIter);
1,878,813,170✔
2017
  }
2018
  return NULL;
×
2019
}
2020

2021
TAOS* tmq_get_connect(tmq_t* tmq) {
6,804✔
2022
  if (tmq && tmq->pTscObj) {
6,804✔
2023
    return (TAOS*)(&(tmq->pTscObj->id));
6,804✔
2024
  }
2025
  return NULL;
×
2026
}
2027

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc