• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5053

13 May 2026 12:00PM UTC coverage: 73.397% (+0.06%) from 73.338%
#5053

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

627 existing lines in 131 files now uncovered.

281694 of 383795 relevant lines covered (73.4%)

132505311.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.57
/source/client/src/clientTmqCommit.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "clientTmq.h"
17
#include "taos.h"
18
#include "tmsg.h"
19

20
// ============================================================
21
// commit done / count down
22
// ============================================================
23
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
5,239,172✔
24
  if (pParamSet == NULL) {
5,239,172✔
25
    return TSDB_CODE_INVALID_PARA;
×
26
  }
27
  int64_t refId = pParamSet->refId;
5,239,172✔
28
  int32_t code = 0;
5,239,172✔
29
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
5,239,172✔
30
  if (tmq == NULL) {
5,239,172✔
31
    code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
×
32
  }
33

34
  // if no more waiting rsp
35
  if (pParamSet->callbackFn != NULL) {
5,239,172✔
36
    pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
5,229,811✔
37
  }
38

39
  taosMemoryFree(pParamSet);
5,239,172✔
40
  if (tmq != NULL) {
5,239,172✔
41
    code = taosReleaseRef(tmqMgmt.rsetId, refId);
5,239,172✔
42
  }
43

44
  return code;
5,239,172✔
45
}
46

47
int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
11,038,006✔
48
  if (pParamSet == NULL) {
11,038,006✔
49
    return TSDB_CODE_INVALID_PARA;
×
50
  }
51
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
11,038,006✔
52
  if (waitingRspNum == 0) {
11,038,015✔
53
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
5,239,172✔
54
             vgId);
55
    return tmqCommitDone(pParamSet);
5,239,172✔
56
  } else {
57
    tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
5,798,843✔
58
             waitingRspNum);
59
  }
60
  return 0;
5,798,843✔
61
}
62

63
// ============================================================
64
// commit callback
65
// ============================================================
66
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
5,811,514✔
67
  if (pBuf){
5,811,514✔
68
    taosMemoryFreeClear(pBuf->pData);
5,811,514✔
69
    taosMemoryFreeClear(pBuf->pEpSet);
5,811,514✔
70
  }
71
  if(param == NULL){
5,811,514✔
72
    return TSDB_CODE_INVALID_PARA;
×
73
  }
74
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
5,811,514✔
75
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
5,811,514✔
76

77
  return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
5,811,514✔
78
}
79

80
// ============================================================
81
// send commit msg
82
// ============================================================
83
int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
5,811,514✔
84
                        SMqCommitCbParamSet* pParamSet) {
85
  if (tmq == NULL || epSet == NULL || offset == NULL || pTopicName == NULL || pParamSet == NULL) {
5,811,514✔
86
    return TSDB_CODE_INVALID_PARA;
×
87
  }
88
  SMqVgOffset pOffset = {0};
5,811,514✔
89

90
  pOffset.consumerId = tmq->consumerId;
5,811,514✔
91
  pOffset.offset.val = *offset;
5,811,514✔
92
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName);
5,811,514✔
93
  int32_t len = 0;
5,811,514✔
94
  int32_t code = 0;
5,811,514✔
95
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
5,811,514✔
96
  if (code < 0) {
5,811,514✔
97
    return TSDB_CODE_INVALID_PARA;
×
98
  }
99

100
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
5,811,514✔
101
  if (buf == NULL) {
5,811,514✔
102
    return terrno;
×
103
  }
104

105
  ((SMsgHead*)buf)->vgId = htonl(vgId);
5,811,514✔
106

107
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
5,811,514✔
108

109
  SEncoder encoder = {0};
5,811,514✔
110
  tEncoderInit(&encoder, abuf, len);
5,811,514✔
111
  if (tEncodeMqVgOffset(&encoder, &pOffset) < 0) {
5,811,514✔
112
    tEncoderClear(&encoder);
×
113
    taosMemoryFree(buf);
×
114
    return TSDB_CODE_INVALID_PARA;
×
115
  }
116
  tEncoderClear(&encoder);
5,811,514✔
117

118
  // build param
119
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
5,811,514✔
120
  if (pParam == NULL) {
5,811,514✔
121
    taosMemoryFree(buf);
×
122
    return terrno;
×
123
  }
124

125
  pParam->params = pParamSet;
5,811,514✔
126
  pParam->vgId = vgId;
5,811,514✔
127
  pParam->consumerId = tmq->consumerId;
5,811,514✔
128

129
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
5,811,514✔
130

131
  // build send info
132
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,811,514✔
133
  if (pMsgSendInfo == NULL) {
5,811,514✔
134
    taosMemoryFree(buf);
×
135
    taosMemoryFree(pParam);
×
136
    return terrno;
×
137
  }
138

139
  pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
5,811,514✔
140

141
  pMsgSendInfo->requestId = generateRequestId();
5,811,514✔
142
  pMsgSendInfo->requestObjRefId = 0;
5,811,514✔
143
  pMsgSendInfo->param = pParam;
5,811,514✔
144
  pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,811,514✔
145
  pMsgSendInfo->fp = tmqCommitCb;
5,811,514✔
146
  pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
5,811,514✔
147

148
  // int64_t transporterId = 0;
149
  (void)atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
5,811,514✔
150
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, pMsgSendInfo);
5,811,514✔
151
  if (code != 0) {
5,811,514✔
152
    (void)atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
×
153
  }
154
  return code;
5,811,514✔
155
}
156

157
// ============================================================
158
// prepare commit param set
159
// ============================================================
160
int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum,
5,240,668✔
161
                                SMqCommitCbParamSet** ppParamSet) {
162
  if (tmq == NULL || ppParamSet == NULL) {
5,240,668✔
163
    return TSDB_CODE_INVALID_PARA;
×
164
  }
165
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
5,240,681✔
166
  if (pParamSet == NULL) {
5,240,681✔
167
    return terrno;
×
168
  }
169

170
  pParamSet->refId = tmq->refId;
5,240,681✔
171
  pParamSet->epoch = atomic_load_32(&tmq->epoch);
5,240,681✔
172
  pParamSet->callbackFn = pCommitFp;
5,240,668✔
173
  pParamSet->userParam = userParam;
5,240,668✔
174
  pParamSet->waitingRspNum = rspNum;
5,240,668✔
175
  *ppParamSet = pParamSet;
5,240,668✔
176
  return 0;
5,240,668✔
177
}
178

179
// ============================================================
180
// wal marker
181
// ============================================================
182
static int32_t sendWalMarkMsgToMnodeCb(void* param, SDataBuf* pMsg, int32_t code) {
×
183
  if (pMsg) {
×
184
    taosMemoryFreeClear(pMsg->pEpSet);
×
185
    taosMemoryFreeClear(pMsg->pData);
×
186
  }
187
  tqDebugC("sendWalMarkMsgToMnodeCb code:%d", code);
×
188
  return 0;
×
189
}
190

191
void asyncSendWalMarkMsgToMnode(tmq_t* tmq, int32_t vgId, int64_t keepVersion) {
×
192
  if (tmq == NULL) return ;
×
193
  void*           buf = NULL;
×
194
  SMsgSendInfo*   sendInfo = NULL;
×
195
  SMndSetVgroupKeepVersionReq req = {0};
×
196

197
  tqDebugC("consumer:0x%" PRIx64 " send vgId:%d keepVersion:%"PRId64, tmq->consumerId, vgId, keepVersion);
×
198
  req.vgId = vgId;
×
199
  req.keepVersion = keepVersion;
×
200

201
  int32_t tlen = tSerializeSMndSetVgroupKeepVersionReq(NULL, 0, &req);
×
202
  buf = taosMemoryMalloc(tlen);
×
203
  if (buf == NULL) {
×
204
    return;
×
205
  }
206
  tlen = tSerializeSMndSetVgroupKeepVersionReq(buf, tlen, &req);
×
207

208
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
209
  if (sendInfo == NULL) {
×
210
    taosMemoryFree(buf);
×
211
    return;
×
212
  }
213

214
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
×
215
  sendInfo->requestId = generateRequestId();
×
216
  sendInfo->fp = sendWalMarkMsgToMnodeCb;
×
217
  sendInfo->msgType = TDMT_MND_SET_VGROUP_KEEP_VERSION;
×
218

219
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
×
220

221
  int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
222
  if (code != 0) {
×
223
    tqErrorC("consumer:0x%" PRIx64 " send wal mark msg to mnode failed, code:%s", tmq->consumerId,
×
224
             tstrerror(terrno));
225
  }
226
}
227

228
// ============================================================
229
// inner commit
230
// ============================================================
231
int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){
8,212,485✔
232
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) {
8,212,485✔
233
    return TSDB_CODE_INVALID_PARA;
×
234
  }
235
  int32_t code = 0;
8,212,485✔
236
  if (offsetVal->type <= 0) {
8,212,485✔
237
    code = TSDB_CODE_TMQ_INVALID_MSG;
266,067✔
238
    return code;
266,067✔
239
  }
240
  if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
7,946,418✔
241
    code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
2,134,917✔
242
    return code;
2,134,917✔
243
  }
244
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
5,811,514✔
245
  tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
5,811,514✔
246

247
  char commitBuf[TSDB_OFFSET_LEN] = {0};
5,811,514✔
248
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
5,811,514✔
249

250
  code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
5,811,514✔
251
  if (code != TSDB_CODE_SUCCESS) {
5,811,514✔
252
    tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
×
253
             tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
254
    return code;
×
255
  }
256

257
  if (tmq->enableWalMarker && offsetVal->type == TMQ_OFFSET__LOG) {
5,811,514✔
258
    asyncSendWalMarkMsgToMnode(tmq, pVg->vgId, offsetVal->version);
×
259
  }
260
  tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
5,811,514✔
261
           tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
262
  tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
5,811,514✔
263
  return code;
5,811,514✔
264
}
265

266
// ============================================================
267
// async commit offset (single)
268
// ============================================================
269
int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
14,180✔
270
                          tmq_commit_cb* pCommitFp, void* userParam) {
271
  if (tmq == NULL || pTopicName == NULL || offsetVal == NULL) {
14,180✔
272
    return TSDB_CODE_INVALID_PARA;
×
273
  }
274
  tqDebugC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
14,180✔
275
  SMqCommitCbParamSet* pParamSet = NULL;
14,180✔
276
  int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet);
14,180✔
277
  if (code != 0){
14,180✔
278
    return code;
×
279
  }
280

281
  tmqRlock(tmq);
14,180✔
282
  SMqClientVg* pVg = NULL;
14,180✔
283
  code = getClientVg(tmq, pTopicName, vgId, &pVg);
14,180✔
284
  if (code == 0) {
14,180✔
285
    code = innerCommit(tmq, pTopicName, offsetVal, pVg, pParamSet);
14,180✔
286
  }
287
  tmqRUnlock(tmq);
14,180✔
288

289
  if (code != 0){
14,180✔
290
    taosMemoryFree(pParamSet);
1,509✔
291
  }
292
  return code;
14,180✔
293
}
294

295
// ============================================================
296
// async commit from result
297
// ============================================================
298
void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
11,162✔
299
  char*        pTopicName = NULL;
11,162✔
300
  int32_t      vgId = 0;
11,162✔
301
  STqOffsetVal offsetVal = {0};
11,162✔
302
  int32_t      code = 0;
11,162✔
303

304
  if (pRes == NULL || tmq == NULL) {
11,162✔
305
    code = TSDB_CODE_INVALID_PARA;
×
306
    goto end;
×
307
  }
308

309
  if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
11,162✔
310
      TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
×
311
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
11,162✔
312
    pTopicName = pRspObj->topic;
11,162✔
313
    vgId = pRspObj->vgId;
11,162✔
314
    offsetVal = pRspObj->rspOffset;
11,162✔
315
  } else {
316
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
317
    goto end;
×
318
  }
319

320
  code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
11,162✔
321

322
  end:
11,162✔
323
  if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) {
11,162✔
324
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
325
    pCommitFp(tmq, code, userParam);
×
326
  }
327
}
11,162✔
328

329
// ============================================================
330
// inner commit all
331
// ============================================================
332
static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
5,226,488✔
333
  if (tmq == NULL || pParamSet == NULL) {
5,226,488✔
334
    return TSDB_CODE_INVALID_PARA;
×
335
  }
336
  int32_t code = 0;
5,226,501✔
337
  tmqRlock(tmq);
5,226,501✔
338
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
5,226,501✔
339
  tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
5,226,501✔
340

341
  for (int32_t i = 0; i < numOfTopics; i++) {
10,375,045✔
342
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
5,148,544✔
343
    if (pTopic == NULL) {
5,148,544✔
344
      code = TSDB_CODE_TMQ_INVALID_TOPIC;
×
345
      goto END;
×
346
    }
347
    int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
5,148,544✔
348
    tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
5,148,544✔
349
    for (int32_t j = 0; j < numOfVgroups; j++) {
13,346,862✔
350
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
8,198,318✔
351
      if (pVg == NULL) {
8,198,318✔
352
        code = terrno;
×
353
        goto END;
×
354
      }
355

356
      code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
8,198,318✔
357
      if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
8,198,318✔
358
        tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
266,067✔
359
                 tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
360
      }
361
    }
362
  }
363
  tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT,
5,226,501✔
364
           numOfTopics);
365
  END:
45,068✔
366
  tmqRUnlock(tmq);
5,226,501✔
367
  return code;
5,226,501✔
368
}
369

370
// ============================================================
371
// async commit all offsets
372
// ============================================================
373
void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
5,226,488✔
374
  if (tmq == NULL) {
5,226,488✔
375
    return;
×
376
  }
377
  int32_t code = 0;
5,226,488✔
378
  SMqCommitCbParamSet* pParamSet = NULL;
5,226,488✔
379
  // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue
380
  code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet);
5,226,488✔
381
  if (code != 0) {
5,226,488✔
382
    tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code));
×
383
    if (pCommitFp != NULL) {
×
384
      pCommitFp(tmq, code, userParam);
×
385
    }
386
    return;
×
387
  }
388
  code = innerCommitAll(tmq, pParamSet);
5,226,488✔
389
  if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
5,226,501✔
390
    tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
65,539✔
391
  }
392

393
  code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1);
5,226,501✔
394
  if (code != 0) {
5,226,501✔
395
    tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code));
×
396
  }
397
  return;
5,226,501✔
398
}
399

400
// ============================================================
401
// sync commit helpers
402
// ============================================================
403
void commitCallBackFn(tmq_t* tmq, int32_t code, void* param) {
253,407✔
404
  if (param == NULL) {
253,407✔
405
    tqErrorC("invalid param in commit cb");
×
406
    return;
×
407
  }
408
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
253,407✔
409
  pInfo->code = code;
253,407✔
410
  if (tsem2_post(&pInfo->sem) != 0){
253,407✔
411
    tqErrorC("failed to post rsp sem in commit cb");
×
412
  }
413
}
414

415
// ============================================================
416
// public commit APIs
417
// ============================================================
418
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
9,361✔
419
  if (tmq == NULL) {
9,361✔
420
    tqErrorC("invalid tmq handle, null");
×
421
    if (cb != NULL) {
×
422
      cb(tmq, TSDB_CODE_INVALID_PARA, param);
×
423
    }
424
    return;
×
425
  }
426
  if (pRes == NULL) {  // here needs to commit all offsets.
9,361✔
427
    asyncCommitAllOffsets(tmq, cb, param);
9,361✔
428
  } else {  // only commit one offset
429
    asyncCommitFromResult(tmq, pRes, cb, param);
×
430
  }
431
}
432

433
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
251,898✔
434
  if (tmq == NULL) {
251,898✔
435
    tqErrorC("invalid tmq handle, null");
×
436
    return TSDB_CODE_INVALID_PARA;
×
437
  }
438

439
  int32_t code = 0;
251,898✔
440

441
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
251,898✔
442
  if (pInfo == NULL) {
251,898✔
443
    tqErrorC("failed to allocate memory for sync commit");
×
444
    return terrno;
×
445
  }
446

447
  code = tsem2_init(&pInfo->sem, 0, 0);
251,898✔
448
  if (code != 0) {
251,898✔
449
    tqErrorC("failed to init sem for sync commit");
×
450
    taosMemoryFree(pInfo);
×
451
    return code;
×
452
  }
453
  pInfo->code = 0;
251,898✔
454

455
  if (pRes == NULL) {
251,898✔
456
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
240,736✔
457
  } else {
458
    asyncCommitFromResult(tmq, pRes, commitCallBackFn, pInfo);
11,162✔
459
  }
460

461
  if (tsem2_wait(&pInfo->sem) != 0){
251,898✔
462
    tqErrorC("failed to wait sem for sync commit");
×
463
  }
464
  code = pInfo->code;
251,898✔
465

466
  if(tsem2_destroy(&pInfo->sem) != 0) {
251,898✔
467
    tqErrorC("failed to destroy sem for sync commit");
×
468
  }
469
  taosMemoryFree(pInfo);
251,898✔
470

471
  tqDebugC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
251,898✔
472
  return code;
251,898✔
473
}
474

475
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
476
int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) {
9,171✔
477
  if (offset == NULL) {
9,171✔
478
    tqErrorC("invalid offset, null");
×
479
    return TSDB_CODE_INVALID_PARA;
×
480
  }
481
  if (offset->walVerBegin == -1 || offset->walVerEnd == -1) {
9,171✔
UNCOV
482
    tqErrorC("Assignment or poll interface need to be called first");
×
UNCOV
483
    return TSDB_CODE_TMQ_NEED_INITIALIZED;
×
484
  }
485

486
  if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
9,171✔
487
    tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
61✔
488
             offset->walVerBegin, offset->walVerEnd);
489
    return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
61✔
490
  }
491

492
  return 0;
9,110✔
493
}
494

495
bool isInSnapshotMode(int8_t type, bool useSnapshot) {
21,579✔
496
  if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) {
21,579✔
497
    return true;
×
498
  }
499
  return false;
21,579✔
500
}
501

502
int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
9,530✔
503
  if (tmq == NULL || pTopicName == NULL) {
9,530✔
504
    tqErrorC("invalid tmq handle, null");
×
505
    return TSDB_CODE_INVALID_PARA;
×
506
  }
507

508
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
9,530✔
509
  buildTopicFullName(tmq, pTopicName, tname);
9,530✔
510

511
  tmqWlock(tmq);
9,530✔
512
  SMqClientVg* pVg = NULL;
9,530✔
513
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
9,530✔
514
  if (code != 0) {
9,530✔
515
    tmqWUnlock(tmq);
6,512✔
516
    return code;
6,512✔
517
  }
518

519
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
3,018✔
520
  code = checkWalRange(pOffsetInfo, offset);
3,018✔
521
  if (code != 0) {
3,018✔
522
    tmqWUnlock(tmq);
×
523
    return code;
×
524
  }
525
  tmqWUnlock(tmq);
3,018✔
526

527
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
3,018✔
528

529
  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
3,018✔
530
  if (pInfo == NULL) {
3,018✔
531
    tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId);
×
532
    return terrno;
×
533
  }
534

535
  code = tsem2_init(&pInfo->sem, 0, 0);
3,018✔
536
  if (code != 0) {
3,018✔
537
    taosMemoryFree(pInfo);
×
538
    return code;
×
539
  }
540
  pInfo->code = 0;
3,018✔
541

542
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
3,018✔
543
  if (code == 0) {
3,018✔
544
    if (tsem2_wait(&pInfo->sem) != 0){
1,509✔
545
      tqErrorC("failed to wait sem for sync commit offset");
×
546
    }
547
    code = pInfo->code;
1,509✔
548
  }
549

550
  if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
3,018✔
551
  if(tsem2_destroy(&pInfo->sem) != 0) {
3,018✔
552
    tqErrorC("failed to destroy sem for sync commit offset");
×
553
  }
554
  taosMemoryFree(pInfo);
3,018✔
555

556
  tqDebugC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
3,018✔
557
          offset, tstrerror(code));
558

559
  return code;
3,018✔
560
}
561

562
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
6,105✔
563
                             void* param) {
564
  int32_t code = 0;
6,105✔
565
  if (tmq == NULL || pTopicName == NULL) {
6,105✔
566
    tqErrorC("invalid tmq handle, null");
×
567
    code = TSDB_CODE_INVALID_PARA;
×
568
    goto end;
×
569
  }
570

571
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
6,105✔
572
  buildTopicFullName(tmq, pTopicName, tname);
6,105✔
573

574
  tmqWlock(tmq);
6,105✔
575
  SMqClientVg* pVg = NULL;
6,105✔
576
  code = getClientVg(tmq, tname, vgId, &pVg);
6,105✔
577
  if (code != 0) {
6,105✔
578
    tmqWUnlock(tmq);
6,105✔
579
    goto end;
6,105✔
580
  }
581

UNCOV
582
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
×
UNCOV
583
  code = checkWalRange(pOffsetInfo, offset);
×
UNCOV
584
  if (code != 0) {
×
UNCOV
585
    tmqWUnlock(tmq);
×
UNCOV
586
    goto end;
×
587
  }
588
  tmqWUnlock(tmq);
×
589

590
  STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset};
×
591

592
  code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
×
593

594
  tqDebugC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
×
595
          offset, tstrerror(code));
596

597
  end:
6,105✔
598
  if (code != 0 && cb != NULL) {
6,105✔
599
    if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
×
600
    cb(tmq, code, param);
×
601
  }
602
}
6,105✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc