• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.57
/source/client/src/clientTmqCommit.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "clientTmq.h"
17
#include "taos.h"
18
#include "tmsg.h"
19

20
// ============================================================
21
// commit done / count down
22
// ============================================================
23
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
4,443,164✔
24
  if (pParamSet == NULL) {
4,443,164✔
25
    return TSDB_CODE_INVALID_PARA;
×
26
  }
27
  int64_t refId = pParamSet->refId;
4,443,164✔
28
  int32_t code = 0;
4,443,164✔
29
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
4,443,164✔
30
  if (tmq == NULL) {
4,443,164✔
31
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
32
  }
33

34
  // if no more waiting rsp
35
  if (pParamSet->callbackFn != NULL) {
4,443,164✔
36
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
4,436,806✔
37
  }
38

39
  taosMemoryFree(pParamSet);
4,443,164✔
40
  if (tmq != NULL) {
4,443,164✔
41
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
4,443,164✔
42
  }
43

44
  return code;
4,443,164✔
45
}
46

47
int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
10,143,507✔
48
  if (pParamSet == NULL) {
10,143,507✔
49
    return TSDB_CODE_INVALID_PARA;
×
50
  }
51
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
10,143,507✔
52
  if (waitingRspNum == 0) {
10,143,502✔
53
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
4,443,164✔
54
             vgId);
55
    return tmqCommitDone(pParamSet);
4,443,164✔
56
  } else {
57
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
5,700,338✔
58
             waitingRspNum);
59
  }
60
  return 0;
5,700,342✔
61
}
62

63
// ============================================================
64
// commit callback
65
// ============================================================
66
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
5,711,128✔
67
  if (pBuf){
5,711,128✔
68
    taosMemoryFreeClear(pBuf->pData);
5,711,128✔
69
    taosMemoryFreeClear(pBuf->pEpSet);
5,711,128✔
70
  }
71
  if(param == NULL){
5,711,128✔
72
    return TSDB_CODE_INVALID_PARA;
×
73
  }
74
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
5,711,128✔
75
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
5,711,128✔
76

77
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
5,711,128✔
78
}
79

80
// ============================================================
81
// send commit msg
82
// ============================================================
83
int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
5,711,129✔
84
                        SMqCommitCbParamSet* pParamSet) {
85
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
5,711,129✔
86
    return TSDB_CODE_INVALID_PARA;
×
87
  }
88
  SMqVgOffset pOffset = {0};
5,711,129✔
89

90
  pOffset.consumerId = tmq->consumerId;
5,711,129✔
91
  pOffset.offset.val = *offset;
5,711,129✔
92
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
5,711,129✔
93
  int32_t len = 0;
5,711,129✔
94
  int32_t code = 0;
5,711,129✔
95
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
5,711,129✔
96
  if (code < 0) {
5,711,129✔
97
    return TSDB_CODE_INVALID_PARA;
×
98
  }
99

100
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
5,711,129✔
101
  if (buf == NULL) {
5,711,129✔
102
    return terrno;
×
103
  }
104

105
  ((SMsgHead*)buf)->vgId = htonl(vgId);
5,711,129✔
106

107
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
5,711,129✔
108

109
  SEncoder encoder = {0};
5,711,129✔
110
  tEncoderInit(&encoder, abuf, len);
5,711,129✔
111
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
5,711,129✔
112
    tEncoderClear(&encoder);
×
113
    taosMemoryFree(buf);
×
114
    return TSDB_CODE_INVALID_PARA;
×
115
  }
116
  tEncoderClear(&encoder);
5,711,129✔
117

118
  // build param
119
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
5,711,129✔
120
  if (pParam == NULL) {
5,711,129✔
121
    taosMemoryFree(buf);
×
122
    return terrno;
×
123
  }
124

125
  pParam->params = pParamSet;
5,711,129✔
126
  pParam->vgId = vgId;
5,711,129✔
127
  pParam->consumerId = tmq->consumerId;
5,711,129✔
128

129
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
5,711,129✔
130

131
  // build send info
132
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,711,129✔
133
  if (pMsgSendInfo == NULL) {
5,711,129✔
134
    taosMemoryFree(buf);
×
135
    taosMemoryFree(pParam);
×
136
    return terrno;
×
137
  }
138

139
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
5,711,129✔
140

141
  pMsgSendInfo->requestId = generateRequestId();
5,711,129✔
142
  pMsgSendInfo->requestObjRefId = 0;
5,711,129✔
143
  pMsgSendInfo->param = pParam;
5,711,129✔
144
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,711,129✔
145
  pMsgSendInfo->fp = tmqCommitCb;
5,711,129✔
146
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
5,711,129✔
147

148
  // int64_t transporterId = 0;
149
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
5,711,129✔
150
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
5,711,129✔
151
  if (code != 0) {
5,711,129✔
152
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
153
  }
154
  return code;
5,711,129✔
155
}
156

157
// ============================================================
158
// prepare commit param set
159
// ============================================================
160
int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
4,444,500✔
161
                                SMqCommitCbParamSet** ppParamSet) {
162
  if (tmq == NULL || ppParamSet == NULL) {
4,444,500✔
163
    return TSDB_CODE_INVALID_PARA;
×
164
  }
165
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
4,444,500✔
166
  if (pParamSet == NULL) {
4,444,500✔
167
    return terrno;
×
168
  }
169

170
  pParamSet->refId = tmq->refId;
4,444,500✔
171
  pParamSet->epoch = atomic_load_32(&tmq->epoch);
4,444,500✔
172
  pParamSet->callbackFn = pCommitFp;
4,444,500✔
173
  pParamSet->userParam = userParam;
4,444,500✔
174
  pParamSet->waitingRspNum = rspNum;
4,444,500✔
175
  *ppParamSet = pParamSet;
4,444,500✔
176
  return 0;
4,444,500✔
177
}
178

179
// ============================================================
180
// wal marker
181
// ============================================================
182
static int32_t sendWalMarkMsgToMnodeCb(void* param, SDataBuf* pMsg, int32_t code) {
×
183
  if (pMsg) {
×
184
    taosMemoryFreeClear(pMsg->pEpSet);
×
185
    taosMemoryFreeClear(pMsg->pData);
×
186
  }
187
  tqDebugC("sendWalMarkMsgToMnodeCb code:%d", code);
×
188
  return 0;
×
189
}
190

191
void asyncSendWalMarkMsgToMnode(tmq_t* tmq, int32_t vgId, int64_t keepVersion) {
×
192
  if (tmq == NULL) return ;
×
193
  void*           buf = NULL;
×
194
  SMsgSendInfo*   sendInfo = NULL;
×
195
  SMndSetVgroupKeepVersionReq req = {0};
×
196

197
  tqDebugC("consumer:0x%" PRIx64 " send vgId:%d keepVersion:%"PRId64, tmq->consumerId, vgId, keepVersion);
×
198
  req.vgId = vgId;
×
199
  req.keepVersion = keepVersion;
×
200

201
  int32_t tlen = tSerializeSMndSetVgroupKeepVersionReq(NULL, 0, &req);
×
202
  buf = taosMemoryMalloc(tlen);
×
203
  if (buf == NULL) {
×
204
    return;
×
205
  }
206
  tlen = tSerializeSMndSetVgroupKeepVersionReq(buf, tlen, &req);
×
207

208
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
209
  if (sendInfo == NULL) {
×
210
    taosMemoryFree(buf);
×
211
    return;
×
212
  }
213

214
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
×
215
  sendInfo->requestId = generateRequestId();
×
216
  sendInfo->fp = sendWalMarkMsgToMnodeCb;
×
217
  sendInfo->msgType = TDMT_MND_SET_VGROUP_KEEP_VERSION;
×
218

219
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
×
220

221
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
222
  if (code != 0) {
×
223
    tqErrorC("consumer:0x%" PRIx64 " send wal mark msg to mnode failed, code:%s", tmq->consumerId,
×
224
             tstrerror(terrno));
225
  }
226
}
227

228
// ============================================================
229
// inner commit
230
// ============================================================
231
int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
7,325,849✔
232
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
7,325,849✔
233
    return TSDB_CODE_INVALID_PARA;
×
234
  }
235
  int32_t code = 0;
7,325,849✔
236
  if (offsetVal->type <= 0) {
7,325,849✔
237
    code = TSDB_CODE_TMQ_INVALID_MSG;
199,146✔
238
    return code;
199,146✔
239
  }
240
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
7,126,703✔
241
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
1,415,574✔
242
    return code;
1,415,574✔
243
  }
244
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
5,711,129✔
245
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
5,711,129✔
246

247
  char commitBuf[TSDB_OFFSET_LEN] = {0};
5,711,129✔
248
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
5,711,129✔
249

250
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
5,711,129✔
251
  if (code != TSDB_CODE_SUCCESS) {
5,711,129✔
252
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
253
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
254
    return code;
×
255
  }
256

257
  if (tmq->enableWalMarker && offsetVal->type == TMQ_OFFSET__LOG) {
5,711,129✔
258
    asyncSendWalMarkMsgToMnode(tmq, pVg->vgId, offsetVal->version);
×
259
  }
260
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
5,711,129✔
261
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
262
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
5,711,129✔
263
  return code;
5,711,129✔
264
}
265

266
// ============================================================
267
// async commit offset (single)
268
// ============================================================
269
int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
12,121✔
270
                          tmq_commit_cb* pCommitFp, void* userParam) {
271
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
12,121✔
272
    return TSDB_CODE_INVALID_PARA;
×
273
  }
274
  tqDebugC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
12,121✔
275
  SMqCommitCbParamSet* pParamSet = NULL;
12,121✔
276
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
12,121✔
277
  if (code != 0){
12,121✔
278
    return code;
×
279
  }
280

281
  tmqRlock(tmq);
12,121✔
282
  SMqClientVg* pVg = NULL;
12,121✔
283
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
12,121✔
284
  if (code == 0) {
12,121✔
285
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
12,121✔
286
  }
287
  tmqRUnlock(tmq);
12,121✔
288

289
  if (code != 0){
12,121✔
290
    taosMemoryFree(pParamSet);
1,336✔
291
  }
292
  return code;
12,121✔
293
}
294

295
// ============================================================
296
// async commit from result
297
// ============================================================
298
void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
9,437✔
299
  char*        pTopicName = NULL;
9,437✔
300
  int32_t      vgId = 0;
9,437✔
301
  STqOffsetVal offsetVal = {0};
9,437✔
302
  int32_t      code = 0;
9,437✔
303

304
  if (pRes == NULL || tmq == NULL) {
9,437✔
305
    code = TSDB_CODE_INVALID_PARA;
×
306
    goto end;
×
307
  }
308

309
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
9,437✔
310
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
311
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
9,437✔
312
    pTopicName = pRspObj->topic;
9,437✔
313
    vgId = pRspObj->vgId;
9,437✔
314
    offsetVal = pRspObj->rspOffset;
9,437✔
315
  } else {
316
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
317
    goto end;
×
318
  }
319

320
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
9,437✔
321

322
  end:
9,437✔
323
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
9,437✔
324
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
325
    pCommitFp(tmq, code, userParam);
×
326
  }
327
}
9,437✔
328

329
// ============================================================
330
// inner commit all
331
// ============================================================
332
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
4,432,379✔
333
  if (tmq == NULL || pParamSet == NULL) {
4,432,379✔
334
    return TSDB_CODE_INVALID_PARA;
×
335
  }
336
  int32_t code = 0;
4,432,379✔
337
  tmqRlock(tmq);
4,432,379✔
338
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
4,432,379✔
339
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
4,432,379✔
340

341
  for (int32_t i = 0; i < numOfTopics; i++) {
8,792,518✔
342
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
4,360,139✔
343
    if (pTopic == NULL) {
4,360,139✔
344
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
345
      goto END;
×
346
    }
347
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
4,360,139✔
348
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
4,360,139✔
349
    for (int32_t j = 0; j < numOfVgroups; j++) {
11,673,867✔
350
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
7,313,728✔
351
      if (pVg == NULL) {
7,313,728✔
352
        code = terrno;
×
353
        goto END;
×
354
      }
355

356
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
7,313,728✔
357
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
7,313,728✔
358
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
199,146✔
359
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
360
      }
361
    }
362
  }
363
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
4,432,379✔
364
           numOfTopics);
365
  END:
38,096✔
366
  tmqRUnlock(tmq);
4,432,379✔
367
  return code;
4,432,379✔
368
}
369

370
// ============================================================
371
// async commit all offsets
372
// ============================================================
373
void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
4,432,379✔
374
  if (tmq == NULL) {
4,432,379✔
375
    return;
×
376
  }
377
  int32_t code = 0;
4,432,379✔
378
  SMqCommitCbParamSet* pParamSet = NULL;
4,432,379✔
379
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
380
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
4,432,379✔
381
  if (code != 0) {
4,432,379✔
382
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
383
    if (pCommitFp != NULL) {
×
384
      pCommitFp(tmq, code, userParam);
×
385
    }
386
    return;
×
387
  }
388
  code = innerCommitAll(tmq, pParamSet);
4,432,379✔
389
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
4,432,379✔
390
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
38,862✔
391
  }
392

393
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
4,432,379✔
394
  if (code != 0) {
4,432,379✔
395
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
396
  }
397
  return;
4,432,379✔
398
}
399

400
// ============================================================
401
// sync commit helpers
402
// ============================================================
403
void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
231,929✔
404
  if (param == NULL) {
231,929✔
405
    tqErrorC("invalid param in commit cb");
×
406
    return;
×
407
  }
408
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
231,929✔
409
  pInfo->code = code;
231,929✔
410
  if (tsem2_post(&pInfo->sem) != 0){
231,929✔
411
    tqErrorC("failed to post rsp sem in commit cb");
×
412
  }
413
}
414

415
// ============================================================
416
// public commit APIs
417
// ============================================================
418
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
6,358✔
419
  if (tmq == NULL) {
6,358✔
420
    tqErrorC("invalid tmq handle, null");
×
421
    if (cb != NULL) {
×
422
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
423
    }
424
    return;
×
425
  }
426
  if (pRes == NULL) {  // here needs to commit all offsets.
6,358✔
427
    asyncCommitAllOffsets(tmq, cb, param);
6,358✔
428
  } else {  // only commit one offset
429
    asyncCommitFromResult(tmq, pRes, cb, param);
×
430
  }
431
}
432

433
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
230,581✔
434
  if (tmq == NULL) {
230,581✔
435
    tqErrorC("invalid tmq handle, null");
×
436
    return TSDB_CODE_INVALID_PARA;
×
437
  }
438

439
  int32_t code = 0;
230,581✔
440

441
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
230,581✔
442
  if (pInfo == NULL) {
230,581✔
443
    tqErrorC("failed to allocate memory for sync commit");
×
444
    return terrno;
×
445
  }
446

447
  code = tsem2_init(&pInfo->sem, 0, 0);
230,581✔
448
  if (code != 0) {
230,581✔
449
    tqErrorC("failed to init sem for sync commit");
×
450
    taosMemoryFree(pInfo);
×
451
    return code;
×
452
  }
453
  pInfo->code = 0;
230,581✔
454

455
  if (pRes == NULL) {
230,581✔
456
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
221,144✔
457
  } else {
458
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
9,437✔
459
  }
460

461
  if (tsem2_wait(&pInfo->sem) != 0){
230,581✔
462
    tqErrorC("failed to wait sem for sync commit");
×
463
  }
464
  code = pInfo->code;
230,581✔
465

466
  if(tsem2_destroy(&pInfo->sem) != 0) {
230,581✔
467
    tqErrorC("failed to destroy sem for sync commit");
×
468
  }
469
  taosMemoryFree(pInfo);
230,581✔
470

471
  tqDebugC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
230,581✔
472
  return code;
230,581✔
473
}
474

475
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
476
int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
8,139✔
477
  if (offset == NULL) {
8,139✔
478
    tqErrorC("invalid offset, null");
×
479
    return TSDB_CODE_INVALID_PARA;
×
480
  }
481
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
8,139✔
482
    tqErrorC("Assignment or poll interface need to be called first");
×
483
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
484
  }
485

486
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
8,139✔
487
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
61✔
488
             offset->walVerBegin, offset->walVerEnd);
489
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
61✔
490
  }
491

492
  return 0;
8,078✔
493
}
494

495
bool isInSnapshotMode(int8_t type, bool useSnapshot) {
19,155✔
496
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
19,155✔
497
    return true;
×
498
  }
499
  return false;
19,155✔
500
}
501

502
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
9,416✔
503
  if (tmq == NULL || pTopicName == NULL) {
9,416✔
504
    tqErrorC("invalid tmq handle, null");
×
505
    return TSDB_CODE_INVALID_PARA;
×
506
  }
507

508
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
9,416✔
509
  buildTopicFullName(tmq, pTopicName, tname);
9,416✔
510

511
  tmqWlock(tmq);
9,416✔
512
  SMqClientVg* pVg = NULL;
9,416✔
513
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
9,416✔
514
  if (code != 0) {
9,416✔
515
    tmqWUnlock(tmq);
6,732✔
516
    return code;
6,732✔
517
  }
518

519
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
2,684✔
520
  code = checkWalRange(pOffsetInfo, offset);
2,684✔
521
  if (code != 0) {
2,684✔
522
    tmqWUnlock(tmq);
×
523
    return code;
×
524
  }
525
  tmqWUnlock(tmq);
2,684✔
526

527
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
2,684✔
528

529
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
2,684✔
530
  if (pInfo == NULL) {
2,684✔
531
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
532
    return terrno;
×
533
  }
534

535
  code = tsem2_init(&pInfo->sem, 0, 0);
2,684✔
536
  if (code != 0) {
2,684✔
537
    taosMemoryFree(pInfo);
×
538
    return code;
×
539
  }
540
  pInfo->code = 0;
2,684✔
541

542
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
2,684✔
543
  if (code == 0) {
2,684✔
544
    if (tsem2_wait(&pInfo->sem) != 0){
1,348✔
545
      tqErrorC("failed to wait sem for sync commit offset");
×
546
    }
547
    code = pInfo->code;
1,348✔
548
  }
549

550
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
2,684✔
551
  if(tsem2_destroy(&pInfo->sem) != 0) {
2,684✔
552
    tqErrorC("failed to destroy sem for sync commit offset");
×
553
  }
554
  taosMemoryFree(pInfo);
2,684✔
555

556
  tqDebugC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
2,684✔
557
          offset, tstrerror(code));
558

559
  return code;
2,684✔
560
}
561

562
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
7,106✔
563
                             void* param) {
564
  int32_t code = 0;
7,106✔
565
  if (tmq == NULL || pTopicName == NULL) {
7,106✔
566
    tqErrorC("invalid tmq handle, null");
×
567
    code = TSDB_CODE_INVALID_PARA;
×
568
    goto end;
×
569
  }
570

571
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
7,106✔
572
  buildTopicFullName(tmq, pTopicName, tname);
7,106✔
573

574
  tmqWlock(tmq);
7,106✔
575
  SMqClientVg* pVg = NULL;
7,106✔
576
  code = getClientVg(tmq, tname, vgId, &pVg);
7,106✔
577
  if (code != 0) {
7,106✔
578
    tmqWUnlock(tmq);
7,106✔
579
    goto end;
7,106✔
580
  }
581

UNCOV
582
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
UNCOV
583
  code = checkWalRange(pOffsetInfo, offset);
×
UNCOV
584
  if (code != 0) {
×
585
    tmqWUnlock(tmq);
×
586
    goto end;
×
587
  }
UNCOV
588
  tmqWUnlock(tmq);
×
589

UNCOV
590
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
591

UNCOV
592
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
593

UNCOV
594
  tqDebugC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
595
          offset, tstrerror(code));
596

597
  end:
7,106✔
598
  if (code != 0 && cb != NULL) {
7,106✔
599
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
600
    cb(tmq, code, param);
×
601
  }
602
}
7,106✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc