• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.12
/source/client/src/clientTmqOffset.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "clientTmq.h"
17
#include "taos.h"
18
#include "tmsg.h"
19

20
// ============================================================
21
// tmq_committed callback
22
// ============================================================
23
static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) {
391✔
24
  if (param == NULL) {
391✔
25
    return code;
×
26
  }
27
  SMqCommittedParam* pParam = param;
391✔
28
  SDecoder decoder = {0};
391✔
29

30
  if (code != 0) {
391✔
31
    goto end;
60✔
32
  }
33
  if (pMsg) {
331✔
34
    tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len);
331✔
35
    int32_t err = tDecodeMqVgOffset(&decoder, &pParam->vgOffset);
331✔
36
    if (err < 0) {
331✔
37
      tOffsetDestroy(&pParam->vgOffset.offset);
×
38
      code = err;
×
39
      goto end;
×
40
    }
41
  }
42

43
end:
331✔
44
  tDecoderClear(&decoder);
391✔
45

46
  if (pMsg) {
391✔
47
    taosMemoryFree(pMsg->pData);
391✔
48
    taosMemoryFree(pMsg->pEpSet);
391✔
49
  }
50
  pParam->code = code;
391✔
51
  if (tsem2_post(&pParam->sem) != 0){
391✔
52
    tqErrorC("failed to post semaphore in tmCommittedCb");
×
53
  }
54
  return code;
391✔
55
}
56

57
// ============================================================
58
// get committed from server
59
// ============================================================
60
int32_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* epSet, int64_t* committed) {
391✔
61
  if (tmq == NULL || tname == NULL || epSet == NULL) {
391✔
62
    return TSDB_CODE_INVALID_PARA;
×
63
  }
64
  int32_t     code = 0;
391✔
65
  SMqVgOffset pOffset = {0};
391✔
66

67
  pOffset.consumerId = tmq->consumerId;
391✔
68
  (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, tname);
391✔
69

70
  int32_t len = 0;
391✔
71
  tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code);
391✔
72
  if (code < 0) {
391✔
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75

76
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
391✔
77
  if (buf == NULL) {
391✔
78
    return terrno;
×
79
  }
80

81
  ((SMsgHead*)buf)->vgId = htonl(vgId);
391✔
82

83
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
391✔
84

85
  SEncoder encoder = {0};
391✔
86
  tEncoderInit(&encoder, abuf, len);
391✔
87
  code = tEncodeMqVgOffset(&encoder, &pOffset);
391✔
88
  if (code < 0) {
391✔
89
    taosMemoryFree(buf);
×
90
    tEncoderClear(&encoder);
×
91
    return code;
×
92
  }
93
  tEncoderClear(&encoder);
391✔
94

95
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
391✔
96
  if (sendInfo == NULL) {
391✔
97
    taosMemoryFree(buf);
×
98
    return terrno;
×
99
  }
100

101
  SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
391✔
102
  if (pParam == NULL) {
391✔
103
    taosMemoryFree(buf);
×
104
    taosMemoryFree(sendInfo);
×
105
    return terrno;
×
106
  }
107
  if (tsem2_init(&pParam->sem, 0, 0) != 0) {
391✔
108
    taosMemoryFree(buf);
×
109
    taosMemoryFree(sendInfo);
×
110
    taosMemoryFree(pParam);
×
111
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
112
  }
113

114
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
391✔
115
  sendInfo->requestId = generateRequestId();
391✔
116
  sendInfo->requestObjRefId = 0;
391✔
117
  sendInfo->param = pParam;
391✔
118
  sendInfo->fp = tmCommittedCb;
391✔
119
  sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
391✔
120

121
  code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
391✔
122
  if (code != 0) {
391✔
123
    if(tsem2_destroy(&pParam->sem) != 0) {
×
124
      tqErrorC("failed to destroy semaphore in get committed from server1");
×
125
    }
126
    taosMemoryFree(pParam);
×
127
    return code;
×
128
  }
129

130
  if (tsem2_wait(&pParam->sem) != 0){
391✔
131
    tqErrorC("failed to wait semaphore in get committed from server");
×
132
  }
133
  code = pParam->code;
391✔
134
  if (code == TSDB_CODE_SUCCESS) {
391✔
135
    if (pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG) {
331✔
136
      *committed = pParam->vgOffset.offset.val.version;
331✔
137
    } else {
138
      tOffsetDestroy(&pParam->vgOffset.offset);
×
139
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
140
    }
141
  }
142
  if(tsem2_destroy(&pParam->sem) != 0) {
391✔
143
    tqErrorC("failed to destroy semaphore in get committed from server2");
×
144
  }
145
  taosMemoryFree(pParam);
391✔
146

147
  return code;
391✔
148
}
149

150
// ============================================================
151
// tmq_position
152
// ============================================================
153
int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
12,288✔
154
  if (tmq == NULL || pTopicName == NULL) {
12,288✔
155
    tqErrorC("invalid tmq handle, null");
×
156
    return TSDB_CODE_INVALID_PARA;
×
157
  }
158

159
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
12,288✔
160
  buildTopicFullName(tmq, pTopicName, tname);
12,288✔
161

162
  tmqWlock(tmq);
12,288✔
163

164
  SMqClientVg* pVg = NULL;
12,288✔
165
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
12,288✔
166
  if (code != 0) {
12,288✔
167
    tmqWUnlock(tmq);
8,316✔
168
    return code;
8,316✔
169
  }
170

171
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
3,972✔
172
  int32_t        type = pOffsetInfo->endOffset.type;
3,972✔
173
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
3,972✔
174
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type);
×
175
    tmqWUnlock(tmq);
×
176
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
177
  }
178

179
  code = checkWalRange(pOffsetInfo, -1);
3,972✔
180
  if (code != 0) {
3,972✔
181
    tmqWUnlock(tmq);
×
182
    return code;
×
183
  }
184
  SEpSet  epSet = pVg->epSet;
3,972✔
185
  int64_t begin = pVg->offsetInfo.walVerBegin;
3,972✔
186
  int64_t end = pVg->offsetInfo.walVerEnd;
3,972✔
187
  tmqWUnlock(tmq);
3,972✔
188

189
  int64_t position = 0;
3,972✔
190
  if (type == TMQ_OFFSET__LOG) {
3,972✔
191
    position = pOffsetInfo->endOffset.version;
3,641✔
192
  } else if (type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST) {
331✔
193
    code = getCommittedFromServer(tmq, tname, vgId, &epSet, &position);
331✔
194
    if (code == TSDB_CODE_TMQ_NO_COMMITTED) {
331✔
195
      if (type == TMQ_OFFSET__RESET_EARLIEST) {
×
196
        position = begin;
×
197
      } else if (type == TMQ_OFFSET__RESET_LATEST) {
×
198
        position = end;
×
199
      } else {
200
        tqErrorC("consumer:0x%" PRIx64 " invalid offset type:%d", tmq->consumerId, type);
×
201
        return TSDB_CODE_INTERNAL_ERROR;
×
202
      }
203
    } else if(code != 0) {
331✔
204
      tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
×
205
      return code;
×
206
    }
207
  } else {
208
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
×
209
    return TSDB_CODE_INTERNAL_ERROR;
×
210
  }
211

212
  tqDebugC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
3,972✔
213
  return position;
3,972✔
214
}
215

216
// ============================================================
217
// tmq_committed
218
// ============================================================
219
int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) {
14,144✔
220
  if (tmq == NULL || pTopicName == NULL) {
14,144✔
221
    tqErrorC("invalid tmq handle, null");
×
222
    return TSDB_CODE_INVALID_PARA;
×
223
  }
224

225
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
14,144✔
226
  buildTopicFullName(tmq, pTopicName, tname);
14,144✔
227

228
  tmqWlock(tmq);
14,144✔
229

230
  SMqClientVg* pVg = NULL;
14,144✔
231
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
14,144✔
232
  if (code != 0) {
14,144✔
233
    tmqWUnlock(tmq);
9,450✔
234
    return code;
9,450✔
235
  }
236

237
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
4,694✔
238
  if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
4,694✔
239
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
240
             pOffsetInfo->endOffset.type);
241
    tmqWUnlock(tmq);
×
242
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
243
  }
244

245
  if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
4,694✔
246
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
×
247
             pOffsetInfo->committedOffset.type);
248
    tmqWUnlock(tmq);
×
249
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
250
  }
251

252
  int64_t committed = 0;
4,694✔
253
  if (pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG) {
4,694✔
254
    committed = pOffsetInfo->committedOffset.version;
4,634✔
255
    tmqWUnlock(tmq);
4,634✔
256
    goto end;
4,634✔
257
  }
258
  SEpSet epSet = pVg->epSet;
60✔
259
  tmqWUnlock(tmq);
60✔
260

261
  code = getCommittedFromServer(tmq, tname, vgId, &epSet, &committed);
60✔
262
  if (code != 0) {
60✔
263
    tqErrorC("consumer:0x%" PRIx64 " getCommittedFromServer error,%d", tmq->consumerId, code);
60✔
264
    return code;
60✔
265
  }
266

267
  end:
4,634✔
268
  tqDebugC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
4,634✔
269
  return committed;
4,634✔
270
}
271

272
// ============================================================
273
// helpers for assignment
274
// ============================================================
275
void destroyCommonInfo(SMqVgCommon* pCommon) {
2,187✔
276
  if (pCommon == NULL) {
2,187✔
277
    return;
1,747✔
278
  }
279
  taosArrayDestroy(pCommon->pList);
440✔
280
  pCommon->pList = NULL;
440✔
281
  if(tsem2_destroy(&pCommon->rsp) != 0) {
440✔
282
    tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
×
283
  }
284
  taosMemoryFreeClear(pCommon->pTopicName);
440✔
285
  (void)taosThreadMutexDestroy(&pCommon->mutex);
440✔
286
  taosMemoryFree(pCommon);
440✔
287
}
288

289
// ============================================================
290
// get wal info callback (for assignment)
291
// ============================================================
292
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
820✔
293
  if (param == NULL || pMsg == NULL) {
820✔
294
    return code;
×
295
  }
296
  SMqVgWalInfoParam* pParam = param;
820✔
297
  SMqVgCommon*       pCommon = pParam->pCommon;
820✔
298

299
  if (code != TSDB_CODE_SUCCESS) {
820✔
300
    tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
301
             pParam->vgId, pCommon->pTopicName);
302

303
  } else {
304
    SMqDataRsp rsp = {0};
820✔
305
    SDecoder   decoder = {0};
820✔
306
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
820✔
307
    code = tDecodeMqDataRsp(&decoder, &rsp);
820✔
308
    tDecoderClear(&decoder);
820✔
309
    if (code != 0) {
820✔
310
      goto END;
×
311
    }
312

313
    SMqRspHead*          pHead = pMsg->pData;
820✔
314
    tmq_topic_assignment assignment = {.begin = pHead->walsver,
1,482✔
315
        .end = pHead->walever + 1,
820✔
316
        .currentOffset = rsp.rspOffset.version,
820✔
317
        .vgId = pParam->vgId};
820✔
318

319
    (void)taosThreadMutexLock(&pCommon->mutex);
820✔
320
    if (taosArrayPush(pCommon->pList, &assignment) == NULL) {
1,640✔
321
      tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId,
×
322
               pParam->vgId, pCommon->pTopicName);
323
      code = TSDB_CODE_TSC_INTERNAL_ERROR;
×
324
    }
325
    (void)taosThreadMutexUnlock(&pCommon->mutex);
820✔
326
  }
327

328
  END:
820✔
329
  pCommon->code = code;
820✔
330
  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
820✔
331
  if (total == pParam->totalReq) {
820✔
332
    if (tsem2_post(&pCommon->rsp) != 0) {
440✔
333
      tqErrorC("failed to post semaphore in get wal cb");
×
334
    }
335
  }
336

337
  if (pMsg) {
820✔
338
    taosMemoryFree(pMsg->pData);
820✔
339
    taosMemoryFree(pMsg->pEpSet);
820✔
340
  }
341

342
  return code;
820✔
343
}
344

345
// ============================================================
346
// tmq_get_topic_assignment
347
// ============================================================
348
static int32_t fetchRemoteAssignments(tmq_t* tmq, SMqClientTopic* pTopic, int32_t numOfVgs,
440✔
349
                                      tmq_topic_assignment** assignment, int32_t* numOfAssignment,
350
                                      SMqVgCommon** ppCommon) {
351
  int32_t code = 0;
440✔
352
  SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
440✔
353
  if (pCommon == NULL) {
440✔
354
    return terrno;
×
355
  }
356
  *ppCommon = pCommon;
440✔
357

358
  pCommon->pList = taosArrayInit(4, sizeof(tmq_topic_assignment));
440✔
359
  if (pCommon->pList == NULL) {
440✔
360
    return terrno;
×
361
  }
362

363
  code = tsem2_init(&pCommon->rsp, 0, 0);
440✔
364
  if (code != 0) {
440✔
365
    return code;
×
366
  }
367
  (void)taosThreadMutexInit(&pCommon->mutex, 0);
440✔
368
  pCommon->pTopicName = taosStrdup(pTopic->topicName);
440✔
369
  if (pCommon->pTopicName == NULL) {
440✔
370
    return terrno;
×
371
  }
372
  pCommon->consumerId = tmq->consumerId;
440✔
373

374
  for (int32_t i = 0; i < numOfVgs; ++i) {
1,260✔
375
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
820✔
376
    if (pClientVg == NULL) {
820✔
377
      continue;
×
378
    }
379
    SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
820✔
380
    if (pParam == NULL) {
820✔
381
      return terrno;
×
382
    }
383

384
    pParam->epoch = atomic_load_32(&tmq->epoch);
820✔
385
    pParam->vgId = pClientVg->vgId;
820✔
386
    pParam->totalReq = numOfVgs;
820✔
387
    pParam->pCommon = pCommon;
820✔
388

389
    SMqPollReq req = {0};
820✔
390
    tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
820✔
391
    req.reqOffset = pClientVg->offsetInfo.beginOffset;
820✔
392

393
    int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
820✔
394
    if (msgSize < 0) {
820✔
395
      taosMemoryFree(pParam);
×
396
      return msgSize;
×
397
    }
398

399
    char* msg = taosMemoryCalloc(1, msgSize);
820✔
400
    if (NULL == msg) {
820✔
401
      taosMemoryFree(pParam);
×
402
      return terrno;
×
403
    }
404

405
    msgSize = tSerializeSMqPollReq(msg, msgSize, &req);
820✔
406
    if (msgSize < 0) {
820✔
407
      taosMemoryFree(msg);
×
408
      taosMemoryFree(pParam);
×
409
      return msgSize;
×
410
    }
411

412
    SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
820✔
413
    if (sendInfo == NULL) {
820✔
414
      taosMemoryFree(pParam);
×
415
      taosMemoryFree(msg);
×
416
      return terrno;
×
417
    }
418

419
    sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
820✔
420
    sendInfo->requestId = req.reqId;
820✔
421
    sendInfo->requestObjRefId = 0;
820✔
422
    sendInfo->param = pParam;
820✔
423
    sendInfo->paramFreeFp = taosAutoMemoryFree;
820✔
424
    sendInfo->fp = tmqGetWalInfoCb;
820✔
425
    sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
820✔
426

427
    char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
820✔
428
    tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
820✔
429

430
    tqDebugC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
820✔
431
            pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
432
    code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
820✔
433
    if (code != 0) {
820✔
434
      return code;
×
435
    }
436
  }
437

438
  if (tsem2_wait(&pCommon->rsp) != 0){
440✔
439
    tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
×
440
  }
441
  code = pCommon->code;
440✔
442
  if (code != TSDB_CODE_SUCCESS) {
440✔
443
    return code;
×
444
  }
445

446
  int32_t num = taosArrayGetSize(pCommon->pList);
440✔
447
  for (int32_t i = 0; i < num; ++i) {
1,260✔
448
    (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
820✔
449
  }
450
  *numOfAssignment = num;
440✔
451

452
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
1,260✔
453
    tmq_topic_assignment* p = &(*assignment)[j];
820✔
454

455
    for (int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
2,400✔
456
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
1,580✔
457
      if (pClientVg == NULL) {
1,580✔
458
        continue;
×
459
      }
460
      if (pClientVg->vgId != p->vgId) {
1,580✔
461
        continue;
760✔
462
      }
463

464
      SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
820✔
465
      tqDebugC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
820✔
466
              p->vgId, p->currentOffset);
467

468
      pOffsetInfo->walVerBegin = p->begin;
820✔
469
      pOffsetInfo->walVerEnd = p->end;
820✔
470
    }
471
  }
472

473
  return TSDB_CODE_SUCCESS;
440✔
474
}
475

476
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
11,637✔
477
                                 int32_t* numOfAssignment) {
478
  if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) {
11,637✔
479
    tqErrorC("invalid tmq handle, null");
9,450✔
480
    return TSDB_CODE_INVALID_PARA;
9,450✔
481
  }
482
  *numOfAssignment = 0;
2,187✔
483
  *assignment = NULL;
2,187✔
484
  SMqVgCommon* pCommon = NULL;
2,187✔
485

486
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
2,187✔
487
  buildTopicFullName(tmq, pTopicName, tname);
2,187✔
488

489
  tmqWlock(tmq);
2,187✔
490

491
  SMqClientTopic* pTopic = NULL;
2,187✔
492
  int32_t         code = getTopicByName(tmq, tname, &pTopic);
2,187✔
493
  if (code != 0) {
2,187✔
UNCOV
494
    tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
×
UNCOV
495
    goto end;
×
496
  }
497

498
  // in case of snapshot is opened, no valid offset will return
499
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2,187✔
500
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
6,441✔
501
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
4,254✔
502
    if (pClientVg == NULL) {
4,254✔
503
      continue;
×
504
    }
505
    int32_t type = pClientVg->offsetInfo.beginOffset.type;
4,254✔
506
    if (isInSnapshotMode(type, tmq->useSnapshot)) {
4,254✔
507
      tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
×
508
      code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
509
      goto end;
×
510
    }
511
  }
512

513
  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
2,187✔
514
  if (*assignment == NULL) {
2,187✔
515
    tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
×
516
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
517
    code = terrno;
×
518
    goto end;
×
519
  }
520

521
  bool needFetch = false;
2,187✔
522

523
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
5,952✔
524
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
4,205✔
525
    if (pClientVg == NULL) {
4,205✔
526
      continue;
×
527
    }
528
    if (pClientVg->offsetInfo.beginOffset.type != TMQ_OFFSET__LOG) {
4,205✔
529
      needFetch = true;
440✔
530
      break;
440✔
531
    }
532

533
    tmq_topic_assignment* pAssignment = &(*assignment)[j];
3,765✔
534
    pAssignment->currentOffset = pClientVg->offsetInfo.beginOffset.version;
3,765✔
535
    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
3,765✔
536
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
3,765✔
537
    pAssignment->vgId = pClientVg->vgId;
3,765✔
538
    tqDebugC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
3,765✔
539
            pAssignment->currentOffset);
540
  }
541

542
  if (needFetch) {
2,187✔
543
    code = fetchRemoteAssignments(tmq, pTopic, *numOfAssignment, assignment, numOfAssignment, &pCommon);
440✔
544
    if (code != 0) {
440✔
545
      goto end;
×
546
    }
547
  }
548

549
  end:
2,187✔
550
  if (code != TSDB_CODE_SUCCESS) {
2,187✔
UNCOV
551
    taosMemoryFree(*assignment);
×
UNCOV
552
    *assignment = NULL;
×
UNCOV
553
    *numOfAssignment = 0;
×
UNCOV
554
    tqDebugC("consumer:0x%" PRIx64 " get assignment for topic:%s failed, code:%s", tmq->consumerId, pTopicName, tstrerror(code));
×
555
  } else {
556
    tqDebugC("consumer:0x%" PRIx64 " get assignment for topic:%s, numOfAssignment:%d", tmq->consumerId, pTopicName, *numOfAssignment);
2,187✔
557
    for (int32_t i = 0; i < *numOfAssignment; ++i) {
6,441✔
558
      tmq_topic_assignment* p = &(*assignment)[i];
4,254✔
559
      tqTraceC("consumer:0x%" PRIx64 " assignment[%d] vgId:%d offset:%" PRId64 " begin:%" PRId64 " end:%" PRId64,
4,254✔
560
               tmq->consumerId, i, p->vgId, p->currentOffset, p->begin, p->end);
561
    }
562
  }
563
  destroyCommonInfo(pCommon);
2,187✔
564
  tmqWUnlock(tmq);
2,187✔
565
  return code;
2,187✔
566
}
567

568
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
5,103✔
569
  if (pAssignment == NULL) {
5,103✔
570
    return;
3,024✔
571
  }
572

573
  taosMemoryFree(pAssignment);
2,079✔
574
}
575

576
// ============================================================
577
// tmq_offset_seek
578
// ============================================================
579
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
6,878✔
580
  if (tmq == NULL || pTopicName == NULL) {
6,878✔
581
    tqErrorC("invalid tmq handle, null");
×
582
    return TSDB_CODE_INVALID_PARA;
×
583
  }
584

585
  char tname[TSDB_TOPIC_FNAME_LEN] = {0};
6,878✔
586
  buildTopicFullName(tmq, pTopicName, tname);
6,878✔
587

588
  tmqWlock(tmq);
6,878✔
589

590
  SMqClientVg* pVg = NULL;
6,878✔
591
  int32_t      code = getClientVg(tmq, tname, vgId, &pVg);
6,878✔
592
  if (code != 0) {
6,878✔
593
    tmqWUnlock(tmq);
5,412✔
594
    return code;
5,412✔
595
  }
596

597
  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
1,466✔
598

599
  int32_t type = pOffsetInfo->endOffset.type;
1,466✔
600
  if (isInSnapshotMode(type, tmq->useSnapshot)) {
1,466✔
601
    tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
×
602
    tmqWUnlock(tmq);
×
603
    return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
×
604
  }
605

606
  code = checkWalRange(pOffsetInfo, offset);
1,466✔
607
  if (code != 0) {
1,466✔
608
    tmqWUnlock(tmq);
60✔
609
    return code;
60✔
610
  }
611

612
  tqDebugC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
1,406✔
613
  // update the offset, and then commit to vnode
614
  pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG;
1,406✔
615
  pOffsetInfo->endOffset.version = offset;
1,406✔
616
  pOffsetInfo->beginOffset = pOffsetInfo->endOffset;
1,406✔
617
  pVg->seekUpdated = true;
1,406✔
618
  tmqWUnlock(tmq);
1,406✔
619

620
  return code;
1,406✔
621
}
622

623
// ============================================================
624
// tmqGetNextResInfo (get next result block)
625
// ============================================================
626
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) {
137,285,995✔
627
  if (res == NULL || pResInfo == NULL) {
137,285,995✔
628
    return TSDB_CODE_INVALID_PARA;
×
629
  }
630
  SMqRspObj*  pRspObj = (SMqRspObj*)res;
137,287,495✔
631
  SMqDataRsp* data = &pRspObj->dataRsp;
137,287,495✔
632

633
  pRspObj->resIter++;
137,287,120✔
634
  if (pRspObj->resIter < data->blockNum) {
137,287,495✔
635
    doFreeReqResultInfo(&pRspObj->resInfo);
82,310,252✔
636
    SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
82,309,127✔
637
    if (pSW) {
82,309,877✔
638
      TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
82,309,877✔
639
    }
640
    void*   pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
82,311,002✔
641
    void*   rawData = NULL;
82,311,002✔
642
    int64_t rows = 0;
82,311,002✔
643
    int32_t precision = 0;
82,311,002✔
644

645
    // inline helper to get raw data rows and precision
646
    if (pRetrieve != NULL) {
82,311,002✔
647
      if (*(int64_t*)pRetrieve == 0) {
82,311,002✔
648
        rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
649
        rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
×
650
        precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
×
651
      } else if (*(int64_t*)pRetrieve == 1) {
82,311,002✔
652
        rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
82,310,252✔
653
        rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
82,311,377✔
654
        precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
82,310,627✔
655
      }
656
    }
657

658
    pRspObj->resInfo.pData = rawData;
82,311,752✔
659
    pRspObj->resInfo.numOfRows = rows;
82,311,002✔
660
    pRspObj->resInfo.current = 0;
82,310,627✔
661
    pRspObj->resInfo.precision = precision;
82,310,252✔
662

663
    pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
82,309,877✔
664
    int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
82,310,252✔
665
    if (code != 0) {
82,307,252✔
666
      return code;
×
667
    }
668
    *pResInfo = &pRspObj->resInfo;
82,307,252✔
669
    return code;
82,309,127✔
670
  }
671

672
  return TSDB_CODE_TSC_INTERNAL_ERROR;
54,976,493✔
673
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc