• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4995

18 Mar 2026 06:26AM UTC coverage: 71.996% (-0.2%) from 72.244%
#4995

push

travis-ci

web-flow
merge: from main to 3.0 branch #34835

2 of 4 new or added lines in 2 files covered. (50.0%)

5312 existing lines in 167 files now uncovered.

244665 of 339830 relevant lines covered (72.0%)

135013613.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.31
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
25,390,275✔
24
  int32_t    code = TDB_CODE_SUCCESS;
25,390,275✔
25
  int32_t    lino = 0;
25,390,275✔
26
  tqDebug("%s called", __FUNCTION__ );
25,390,275✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
25,390,632✔
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
25,390,632✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
25,390,632✔
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
25,390,632✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
25,390,632✔
34

35
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
25,390,632✔
36
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
25,390,632✔
37
  
38
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
25,390,632✔
39
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
25,390,632✔
40
  pRsp->withTbName = 0;
25,389,930✔
41
  pRsp->withSchema = 1;
25,389,930✔
42

43
END:
25,390,280✔
44
  if (code != 0){
25,390,280✔
45
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
46
    taosArrayDestroy(pRsp->blockData);
×
47
    taosArrayDestroy(pRsp->blockDataLen);
×
48
    taosArrayDestroy(pRsp->blockSchema);
×
49
  }
50
  return code;
25,390,280✔
51
}
52

53
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
5,417,131✔
54
  int32_t    code = TDB_CODE_SUCCESS;
5,417,131✔
55
  int32_t    lino = 0;
5,417,131✔
56
  tqDebug("%s called", __FUNCTION__ );
5,417,131✔
57
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
5,417,794✔
58
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
5,417,794✔
59
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
5,417,794✔
60

61
  pRsp->withTbName = 1;
5,417,794✔
62
  pRsp->withSchema = 1;
5,417,794✔
63
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
5,417,794✔
64
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
5,417,794✔
65

66
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
5,417,794✔
67
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
5,417,794✔
68

69
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
5,417,794✔
70
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
5,417,458✔
71

72
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
5,417,466✔
73
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
5,416,762✔
74

75
END:
5,416,762✔
76
  if (code != 0){
5,416,762✔
77
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
78
    taosArrayDestroy(pRsp->blockData);
×
79
    taosArrayDestroy(pRsp->blockDataLen);
×
80
    taosArrayDestroy(pRsp->blockTbName);
×
81
    taosArrayDestroy(pRsp->blockSchema);
×
82
  }
83
  return code;
5,416,762✔
84
}
85

86
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
569,019✔
87
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
88
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
569,019✔
89
    return TSDB_CODE_INVALID_PARA;
×
90
  }
91
  uint64_t   consumerId = pRequest->consumerId;
570,597✔
92
  STqOffset* pOffset = NULL;
570,597✔
93
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
570,597✔
94
  int32_t    vgId = TD_VID(pTq->pVnode);
569,909✔
95

96
  *pBlockReturned = false;
569,909✔
97
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
98
  if (code == 0) {
568,963✔
99
    tOffsetCopy(pOffsetVal, &pOffset->val);
104,413✔
100

101
    char formatBuf[TSDB_OFFSET_LEN] = {0};
104,413✔
102
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
104,413✔
103
    tqDebug("tmq poll: consumer:0x%" PRIx64
104,413✔
104
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
105
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
106
    return 0;
104,413✔
107
  } else {
108
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
109
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
464,550✔
110
      if (pRequest->useSnapshot) {
438,871✔
111
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
219,361✔
112
                consumerId, pHandle->subKey, vgId);
113
        if (pHandle->fetchMeta) {
219,670✔
114
          tqOffsetResetToMeta(pOffsetVal, 0);
115
        } else {
116
          SValue val = {0};
217,617✔
117
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
118
        }
119
      } else {
120
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
219,519✔
121
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
219,523✔
122
      }
123
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
26,352✔
124
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
16,704✔
125
      SMqDataRsp dataRsp = {0};
16,704✔
126
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
16,704✔
127

128
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
16,704✔
129
      if (code != 0) {
16,704✔
130
        return code;
×
131
      }
132
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
16,704✔
133
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
134
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
16,704✔
135
      tDeleteMqDataRsp(&dataRsp);
16,704✔
136

137
      *pBlockReturned = true;
16,704✔
138
      return code;
16,704✔
139
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
9,648✔
140
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
9,648✔
141
                  " in vg %d, subkey %s, reset none failed",
142
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
143
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
9,648✔
144
    }
145
  }
146

147
  return 0;
439,832✔
148
}
149

150
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
25,175,583✔
151
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
152
  int32_t    code = TDB_CODE_SUCCESS;
25,175,583✔
153
  int32_t    lino = 0;
25,175,583✔
154
  tqDebug("%s called", __FUNCTION__ );
25,175,583✔
155
  uint64_t consumerId = pRequest->consumerId;
25,175,933✔
156
  int32_t  vgId = TD_VID(pTq->pVnode);
25,175,887✔
157
  terrno = 0;
25,176,248✔
158

159
  SMqDataRsp dataRsp = {0};
25,176,248✔
160
  code = tqInitDataRsp(&dataRsp, *pOffset);
25,176,248✔
161
  TSDB_CHECK_CODE(code, lino, end);
25,175,546✔
162

163
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
25,175,546✔
164
  TSDB_CHECK_CODE(code, lino, end);
25,175,937✔
165

166
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
25,175,937✔
167
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
25,176,248✔
168
    goto end;
171,770✔
169
  }
170

171
  if (terrno == TSDB_CODE_TMQ_FETCH_TIMEOUT && dataRsp.blockNum == 0) {
25,004,478✔
172
    dataRsp.timeout = true;
×
173
  }
174
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
175
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
25,004,478✔
176
    // lock
177
    taosWLockLatch(&pTq->lock);
11,243,380✔
178
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
11,243,380✔
179
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
11,243,380✔
180
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
10,908,262✔
181
      taosWUnLockLatch(&pTq->lock);
10,908,262✔
182
      goto end;
10,908,262✔
183
    }
184
    taosWUnLockLatch(&pTq->lock);
335,118✔
185
  }
186

187
  // reqOffset represents the current date offset, may be changed if wal not exists
188
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
14,096,216✔
189
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
14,096,216✔
190

191
end:
25,175,544✔
192
  {
193
    char buf[TSDB_OFFSET_LEN] = {0};
25,175,896✔
194
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
25,175,896✔
195
    if (code != 0){
25,174,138✔
196
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
171,770✔
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
198
    } else {
199
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
25,002,368✔
200
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
201
    }
202

203
    tDeleteMqDataRsp(&dataRsp);
25,175,544✔
204
    return code;
25,176,248✔
205
  }
206
}
207

208
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
209
  SDecoder decoder = {0};                                                                                  \
210
  TYPE     req = {0};                                                                                      \
211
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
212
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
213
  tDecoderInit(&decoder, data, len);                                                                       \
214
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
215
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
216
            " msgType %d",                                                                                 \
217
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
218
    fetchVer++;                                                                                            \
219
    DELETE_FUNC(&req);                                                                                     \
220
    tDecoderClear(&decoder);                                                                               \
221
    continue;                                                                                              \
222
  }                                                                                                        \
223
  DELETE_FUNC(&req);                                                                                       \
224
  tDecoderClear(&decoder);
225

226
static void tDeleteCommon(void* parm) {}
654✔
UNCOV
227
static void tDeleteAlterTable(SVAlterTbReq* req) {
×
UNCOV
228
  taosArrayDestroy(req->pMultiTag);
×
UNCOV
229
}
×
230

231
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
232
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
233
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
234

235
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
918✔
236
  int32_t         code = 0;
918✔
237

238
  if (!btMetaRsp->batchMetaReq) {
918✔
239
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
918✔
240
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
918✔
241
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
918✔
242
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
918✔
243
  }
244

245
  SMqMetaRsp tmpMetaRsp = {0};
918✔
246
  tmpMetaRsp.resMsgType = type;
918✔
247
  tmpMetaRsp.metaRspLen = bodyLen;
918✔
248
  tmpMetaRsp.metaRsp = body;
918✔
249
  uint32_t len = 0;
918✔
250
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
918✔
251
  if (TSDB_CODE_SUCCESS != code) {
918✔
252
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
253
    goto END;
×
254
  }
255
  int32_t tLen = sizeof(SMqRspHead) + len;
918✔
256
  void*   tBuf = taosMemoryCalloc(1, tLen);
918✔
257
  TQ_NULL_GO_TO_END(tBuf);
918✔
258
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
918✔
259
  SEncoder encoder = {0};
918✔
260
  tEncoderInit(&encoder, metaBuff, len);
918✔
261
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
918✔
262
  tEncoderClear(&encoder);
918✔
263

264
  if (code < 0) {
918✔
265
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
266
    goto END;
×
267
  }
268
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
1,836✔
269
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
1,836✔
270

271
END:
918✔
272
  return code;
918✔
273
}
274

275
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
918✔
276
  int32_t code = 0;
918✔
277
  SVCreateTbBatchReq pReq = {0};
918✔
278
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
918✔
279
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
918✔
280
  TQ_NULL_GO_TO_END(pReq.pArray);
918✔
281
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
1,836✔
282
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
918✔
283
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
1,836✔
284
  }
285
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
918✔
286
  if (code < 0) {
918✔
287
    goto END;
×
288
  }
289
  *len += sizeof(SMsgHead);
918✔
290
  *pBuf = taosMemoryMalloc(*len);
918✔
291
  TQ_NULL_GO_TO_END(pBuf);
918✔
292
  SEncoder coder = {0};
918✔
293
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
918✔
294
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
918✔
295
  tEncoderClear(&coder);
918✔
296

297
END:
918✔
298
  taosArrayDestroy(pReq.pArray);
918✔
299
  return code;
918✔
300
}
301

302
#define SEND_BATCH_META_RSP \
303
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
304
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
305
goto END;
306

307
#define SEND_DATA_RSP \
308
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
309
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
310
goto END;
311

312
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
5,417,794✔
313
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
314
  int32_t         vgId = TD_VID(pTq->pVnode);
5,417,794✔
315
  SMqDataRsp      taosxRsp = {0};
5,417,459✔
316
  SMqBatchMetaRsp btMetaRsp = {0};
5,417,459✔
317
  int32_t         code = 0;
5,417,794✔
318

319
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
5,417,794✔
320
  if (offset->type != TMQ_OFFSET__LOG) {
5,417,114✔
321
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
40,875✔
322

323
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
40,875✔
324
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2,361✔
325
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
2,361✔
326
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
327
      goto END;
2,361✔
328
    }
329

330
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
38,514✔
331
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
332
    if (taosxRsp.blockNum > 0) {
38,514✔
333
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
36,234✔
334
      goto END;
36,234✔
335
    } else {
336
      tOffsetCopy(offset, &taosxRsp.rspOffset);
2,280✔
337
    }
338
  }
339

340
  if (offset->type == TMQ_OFFSET__LOG) {
5,378,871✔
341
    walReaderVerifyOffset(pHandle->pWalReader, offset);
5,379,199✔
342
    int64_t fetchVer = offset->version;
5,378,921✔
343

344
    uint64_t st = taosGetTimestampMs();
5,377,483✔
345
    int      totalRows = 0;
5,377,483✔
346
    int32_t  totalMetaRows = 0;
5,375,459✔
347
    while (1) {
46,435,229✔
348
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
51,810,688✔
349
      if (savedEpoch > pRequest->epoch) {
51,837,241✔
350
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
351
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
352
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
353
        goto END;
×
354
      }
355

356
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
51,830,743✔
357
        if (totalMetaRows > 0) {
5,196,826✔
UNCOV
358
          SEND_BATCH_META_RSP
×
359
        }
360
        SEND_DATA_RSP
10,393,652✔
361
      }
362

363
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
46,691,161✔
364
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
46,692,385✔
365
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
366

367
      // process meta
368
      if (pHead->msgType != TDMT_VND_SUBMIT) {
46,691,421✔
369
        if (totalRows > 0) {
12,839✔
370
          SEND_DATA_RSP
1,404✔
371
        }
372

373
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
12,137✔
374
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
654✔
UNCOV
375
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
×
376
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
654✔
UNCOV
377
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteAlterTable)
×
378
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
1,308✔
379
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
654✔
UNCOV
380
          } else if (pHead->msgType == TDMT_VND_DELETE) {
×
UNCOV
381
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
×
382
          }
383
        }
384

385
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
12,137✔
386
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
387
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
12,137✔
388
          SMqMetaRsp metaRsp = {0};
12,137✔
389
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
12,137✔
390
          metaRsp.resMsgType = pHead->msgType;
12,137✔
391
          metaRsp.metaRspLen = pHead->bodyLen;
12,137✔
392
          metaRsp.metaRsp = pHead->body;
12,137✔
393
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
12,137✔
394
          goto END;
12,137✔
395
        }
UNCOV
396
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
×
UNCOV
397
        fetchVer++;
×
UNCOV
398
        if (code != 0){
×
399
          goto END;
×
400
        }
UNCOV
401
        totalMetaRows++;
×
UNCOV
402
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) || (taosGetTimestampMs() - st > pRequest->timeout)) {
×
403
          SEND_BATCH_META_RSP
×
404
        }
UNCOV
405
        continue;
×
406
      }
407

408
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
46,679,240✔
409
        SEND_BATCH_META_RSP
×
410
      }
411

412
      // process data
413
      SPackedData submit = {
140,038,638✔
414
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
46,679,240✔
415
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
46,679,546✔
416
          .ver = pHead->version,
46,679,546✔
417
      };
418

419
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
46,679,546✔
420

421
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
46,593,229✔
422
        int32_t len = 0;
918✔
423
        void *pBuf = NULL;
918✔
424
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
918✔
425
        if (code == 0){
918✔
426
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
918✔
427
        }
428
        taosMemoryFree(pBuf);
918✔
429
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
1,836✔
430
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
918✔
431
          if (pCreateTbReq != NULL) {
918✔
432
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
918✔
433
          }
434
          taosMemoryFree(pCreateTbReq);
918✔
435
        }
436
        taosArrayDestroy(taosxRsp.createTableReq);
918✔
437
        taosxRsp.createTableReq = NULL;
918✔
438
        fetchVer++;
918✔
439
        if (code != 0){
918✔
440
          goto END;
×
441
        }
442
        totalMetaRows++;
918✔
443
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) ||
918✔
444
            (taosGetTimestampMs() - st > pRequest->timeout) ||
918✔
445
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
918✔
446
          SEND_BATCH_META_RSP
1,836✔
447
        }
448
        continue;
×
449
      }
450

451
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
46,654,399✔
452
          (taosGetTimestampMs() - st > pRequest->timeout) ||
46,436,005✔
453
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
46,475,510✔
454
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
455
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
211,172✔
456
          terrno = 0;
×
457
        } else{
458
          fetchVer++;
168,616✔
459
        }
460
        SEND_DATA_RSP
337,232✔
461
      } else {
462
        fetchVer++;
46,425,225✔
463
      }
464
    }
465
  }
466

467
END:
5,416,795✔
468
  if (code != 0){
5,415,598✔
469
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
470
            pRequest->subKey);
471
  }
472
  tDeleteMqBatchMetaRsp(&btMetaRsp);
5,415,598✔
473
  tDeleteSTaosxRsp(&taosxRsp);
5,416,541✔
474
  return code;
5,411,803✔
475
}
476

477
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
30,619,801✔
478
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
30,619,801✔
479
    return TSDB_CODE_TMQ_INVALID_MSG;
×
480
  }
481
  int32_t      code = 0;
30,620,394✔
482
  STqOffsetVal reqOffset = {0};
30,620,394✔
483
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
30,620,394✔
484

485
  // reset the offset if needed
486
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
30,619,394✔
487
    bool blockReturned = false;
570,282✔
488
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
569,947✔
489
    if (code != 0) {
570,597✔
490
      goto END;
9,648✔
491
    }
492

493
    // empty block returned, quit
494
    if (blockReturned) {
560,949✔
495
      goto END;
16,704✔
496
    }
497
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
30,049,141✔
498
    uError("req offset type is 0");
×
499
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
500
    goto END;
×
501
  }
502

503
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
30,593,051✔
504
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
25,176,248✔
505
  } else {
506
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
5,417,459✔
507
  }
508

509
END:
30,618,830✔
510
  if (code != 0){
30,618,495✔
511
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
181,418✔
512
  }
513
  tOffsetDestroy(&reqOffset);
30,618,495✔
514
  return code;
30,618,176✔
515
}
516

517
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
19,726,102✔
518
                          int64_t ever) {
519
  if (pMsgHead == NULL) {
19,726,102✔
520
    return;
×
521
  }
522
  pMsgHead->consumerId = consumerId;
19,726,102✔
523
  pMsgHead->epoch = epoch;
19,726,380✔
524
  pMsgHead->mqMsgType = type;
19,726,561✔
525
  pMsgHead->walsver = sver;
19,725,777✔
526
  pMsgHead->walever = ever;
19,725,822✔
527
}
528

529
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
3,279✔
530
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
531
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
3,279✔
532
    return TSDB_CODE_TMQ_INVALID_MSG;
×
533
  }
534
  int32_t len = 0;
3,279✔
535
  int32_t code = 0;
3,279✔
536
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
3,279✔
537
  if (code < 0) {
3,279✔
538
    return TAOS_GET_TERRNO(code);
×
539
  }
540
  int32_t tlen = sizeof(SMqRspHead) + len;
3,279✔
541
  void*   buf = rpcMallocCont(tlen);
3,279✔
542
  if (buf == NULL) {
3,279✔
543
    return TAOS_GET_TERRNO(terrno);
×
544
  }
545

546
  int64_t sver = 0, ever = 0;
3,279✔
547
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
3,279✔
548
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
3,279✔
549

550
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
3,279✔
551

552
  SEncoder encoder = {0};
3,279✔
553
  tEncoderInit(&encoder, abuf, len);
3,279✔
554
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
3,279✔
555
  tEncoderClear(&encoder);
3,279✔
556
  if (code < 0) {
3,279✔
557
    rpcFreeCont(buf);
×
558
    return TAOS_GET_TERRNO(code);
×
559
  }
560
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
3,279✔
561

562
  tmsgSendRsp(&resp);
3,279✔
563
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
3,279✔
564
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
565

566
  return 0;
3,279✔
567
}
568

569
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
12,137✔
570
                          int32_t vgId) {
571
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
12,137✔
572
    return TSDB_CODE_TMQ_INVALID_MSG;
×
573
  }
574
  int32_t len = 0;
12,137✔
575
  int32_t code = 0;
12,137✔
576
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
12,137✔
577
  if (code < 0) {
12,137✔
578
    return TAOS_GET_TERRNO(code);
×
579
  }
580
  int32_t tlen = sizeof(SMqRspHead) + len;
12,137✔
581
  void*   buf = rpcMallocCont(tlen);
12,137✔
582
  if (buf == NULL) {
12,137✔
583
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
584
  }
585

586
  int64_t sver = 0, ever = 0;
12,137✔
587
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
12,137✔
588
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
12,137✔
589

590
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
12,137✔
591

592
  SEncoder encoder = {0};
12,137✔
593
  tEncoderInit(&encoder, abuf, len);
12,137✔
594
  code = tEncodeMqMetaRsp(&encoder, pRsp);
12,137✔
595
  tEncoderClear(&encoder);
12,137✔
596
  if (code < 0) {
12,137✔
597
    rpcFreeCont(buf);
×
598
    return TAOS_GET_TERRNO(code);
×
599
  }
600

601
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
12,137✔
602

603
  tmsgSendRsp(&resp);
12,137✔
604
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
12,137✔
605
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
606

607
  return 0;
12,137✔
608
}
609

610
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
19,712,695✔
611
                        int32_t type, int64_t sver, int64_t ever) {
612
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
19,712,695✔
613
    return TSDB_CODE_TMQ_INVALID_MSG;
×
614
  }
615
  int32_t len = 0;
19,712,978✔
616
  int32_t code = 0;
19,712,978✔
617

618
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
19,712,978✔
619
    pRsp->withSchema = 0;
×
620
  }
621
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
19,712,978✔
622
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
1,667✔
623
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
624
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
19,711,311✔
625
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
1,667✔
626
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
1,667✔
627
  }
628

629
  if (code < 0) {
19,710,686✔
630
    return TAOS_GET_TERRNO(code);
×
631
  }
632

633
  int32_t tlen = sizeof(SMqRspHead) + len;
19,710,686✔
634
  void*   buf = rpcMallocCont(tlen);
19,710,686✔
635
  if (buf == NULL) {
19,709,654✔
636
    return terrno;
×
637
  }
638

639
  SMqRspHead* pHead = (SMqRspHead*)buf;
19,709,654✔
640
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
19,709,654✔
641

642
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
19,710,403✔
643

644
  SEncoder encoder = {0};
19,711,352✔
645
  tEncoderInit(&encoder, abuf, len);
19,712,381✔
646

647
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
19,710,960✔
648
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
1,667✔
649
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
650
    code = tEncodeMqDataRsp(&encoder, pRsp);
19,709,293✔
651
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
1,667✔
652
    code = tEncodeSTaosxRsp(&encoder, pRsp);
1,667✔
653
  }
654
  tEncoderClear(&encoder);
19,710,842✔
655
  if (code < 0) {
19,711,122✔
656
    rpcFreeCont(buf);
×
657
    return TAOS_GET_TERRNO(code);
×
658
  }
659
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
19,711,122✔
660

661
  tmsgSendRsp(&rsp);
19,709,811✔
662
  return 0;
19,712,307✔
663
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc