• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.24
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
1,861,628✔
24
  int32_t    code = TDB_CODE_SUCCESS;
1,861,628✔
25
  int32_t    lino = 0;
1,861,628✔
26
  tqDebug("%s called", __FUNCTION__ );
1,861,628✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
1,861,628!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
1,861,628✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
1,861,628!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
1,861,628✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
1,861,628!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
1,861,628✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
1,861,628✔
37
  pRsp->withTbName = 0;
1,861,628✔
38
  pRsp->withSchema = false;
1,861,628✔
39

40
END:
1,861,628✔
41
  if (code != 0){
1,861,628!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
1,861,628✔
45
}
46

47
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
259,896✔
48
  int32_t    code = TDB_CODE_SUCCESS;
259,896✔
49
  int32_t    lino = 0;
259,896✔
50
  tqDebug("%s called", __FUNCTION__ );
259,896✔
51
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
259,896!
52
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
259,896✔
53
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
259,896✔
54

55
  pRsp->withTbName = 1;
259,896✔
56
  pRsp->withSchema = 1;
259,896✔
57
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
259,896✔
58
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
259,896!
59

60
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
259,896✔
61
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
259,896!
62

63
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
259,896✔
64
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
259,896!
65

66
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
259,896✔
67
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
259,725!
68

69
END:
259,725✔
70
  if (code != 0){
259,725!
71
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
72
    taosArrayDestroy(pRsp->blockData);
×
73
    taosArrayDestroy(pRsp->blockDataLen);
×
74
    taosArrayDestroy(pRsp->blockTbName);
×
75
    taosArrayDestroy(pRsp->blockSchema);
×
76
  }
77
  return code;
259,554✔
78
}
79

80
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
63,692✔
81
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
82
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
63,692!
UNCOV
83
    return TSDB_CODE_INVALID_PARA;
×
84
  }
85
  uint64_t   consumerId = pRequest->consumerId;
64,053✔
86
  STqOffset* pOffset = NULL;
64,053✔
87
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
64,053✔
88
  int32_t    vgId = TD_VID(pTq->pVnode);
64,053✔
89

90
  *pBlockReturned = false;
64,053✔
91
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
92
  if (code == 0) {
63,710✔
93
    tOffsetCopy(pOffsetVal, &pOffset->val);
4,308✔
94

95
    char formatBuf[TSDB_OFFSET_LEN] = {0};
4,308✔
96
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
4,308✔
97
    tqDebug("tmq poll: consumer:0x%" PRIx64
4,308!
98
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
99
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
100
    return 0;
4,308✔
101
  } else {
102
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
103
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
59,402✔
104
      if (pRequest->useSnapshot) {
58,027✔
105
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
37,330✔
106
                consumerId, pHandle->subKey, vgId);
107
        if (pHandle->fetchMeta) {
37,668✔
108
          tqOffsetResetToMeta(pOffsetVal, 0);
109
        } else {
110
          SValue val = {0};
37,230!
111
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
112
        }
113
      } else {
114
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
20,777✔
115
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
20,777✔
116
      }
117
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
1,380!
118
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
1,380✔
119
      SMqDataRsp dataRsp = {0};
1,380✔
120
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
1,380✔
121

122
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
1,380✔
123
      if (code != 0) {
1,380!
124
        return code;
×
125
      }
126
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
1,380!
127
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
128
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
1,380!
129
      tDeleteMqDataRsp(&dataRsp);
1,380✔
130

131
      *pBlockReturned = true;
1,380✔
132
      return code;
1,380✔
UNCOV
133
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
×
UNCOV
134
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
×
135
                  " in vg %d, subkey %s, reset none failed",
136
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
UNCOV
137
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
×
138
    }
139
  }
140

141
  return 0;
58,365✔
142
}
143

144
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
1,843,554✔
145
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
146
  int32_t    code = TDB_CODE_SUCCESS;
1,843,554✔
147
  int32_t    lino = 0;
1,843,554✔
148
  tqDebug("%s called", __FUNCTION__ );
1,843,554!
149
  uint64_t consumerId = pRequest->consumerId;
1,843,736✔
150
  int32_t  vgId = TD_VID(pTq->pVnode);
1,843,736✔
151
  terrno = 0;
1,843,736✔
152

153
  SMqDataRsp dataRsp = {0};
1,843,736✔
154
  code = tqInitDataRsp(&dataRsp, *pOffset);
1,843,416✔
155
  TSDB_CHECK_CODE(code, lino, end);
1,843,736!
156

157
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
1,843,736✔
158
  TSDB_CHECK_CODE(code, lino, end);
1,843,580!
159

160
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
1,843,580✔
161
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
1,843,736✔
162
    goto end;
25,617✔
163
  }
164

165
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
166
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
1,818,119✔
167
    // lock
168
    taosWLockLatch(&pTq->lock);
842,613✔
169
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
842,613✔
170
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
842,613✔
171
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
829,784✔
172
      taosWUnLockLatch(&pTq->lock);
829,784✔
173
      goto end;
829,784✔
174
    }
175
    taosWUnLockLatch(&pTq->lock);
12,829✔
176
  }
177

178
  // reqOffset represents the current date offset, may be changed if wal not exists
179
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
988,335✔
180
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
988,335✔
181

182
end:
1,843,736✔
183
  {
184
    char buf[TSDB_OFFSET_LEN] = {0};
1,843,736✔
185
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
1,843,736✔
186
    if (code != 0){
1,843,736✔
187
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
25,617!
188
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
189
    } else {
190
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
1,818,119!
191
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
192
    }
193

194
    tDeleteMqDataRsp(&dataRsp);
1,843,736✔
195
    return code;
1,843,736✔
196
  }
197
}
198

199
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
200
  SDecoder decoder = {0};                                                                                  \
201
  TYPE     req = {0};                                                                                      \
202
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
203
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
204
  tDecoderInit(&decoder, data, len);                                                                       \
205
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
206
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
207
            " msgType %d",                                                                                 \
208
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
209
    fetchVer++;                                                                                            \
210
    DELETE_FUNC(&req);                                                                                     \
211
    tDecoderClear(&decoder);                                                                               \
212
    continue;                                                                                              \
213
  }                                                                                                        \
214
  DELETE_FUNC(&req);                                                                                       \
215
  tDecoderClear(&decoder);
216

UNCOV
217
static void tDeleteCommon(void* parm) {}
×
218

219
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
220
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
221
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
222

UNCOV
223
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
×
UNCOV
224
  int32_t         code = 0;
×
225

UNCOV
226
  if (!btMetaRsp->batchMetaReq) {
×
UNCOV
227
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
228
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
×
UNCOV
229
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
×
UNCOV
230
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
×
231
  }
232

UNCOV
233
  SMqMetaRsp tmpMetaRsp = {0};
×
UNCOV
234
  tmpMetaRsp.resMsgType = type;
×
UNCOV
235
  tmpMetaRsp.metaRspLen = bodyLen;
×
UNCOV
236
  tmpMetaRsp.metaRsp = body;
×
UNCOV
237
  uint32_t len = 0;
×
UNCOV
238
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
×
UNCOV
239
  if (TSDB_CODE_SUCCESS != code) {
×
240
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
241
    goto END;
×
242
  }
UNCOV
243
  int32_t tLen = sizeof(SMqRspHead) + len;
×
UNCOV
244
  void*   tBuf = taosMemoryCalloc(1, tLen);
×
UNCOV
245
  TQ_NULL_GO_TO_END(tBuf);
×
UNCOV
246
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
×
UNCOV
247
  SEncoder encoder = {0};
×
UNCOV
248
  tEncoderInit(&encoder, metaBuff, len);
×
UNCOV
249
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
×
UNCOV
250
  tEncoderClear(&encoder);
×
251

UNCOV
252
  if (code < 0) {
×
253
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
254
    goto END;
×
255
  }
UNCOV
256
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
×
UNCOV
257
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
×
258

UNCOV
259
END:
×
UNCOV
260
  return code;
×
261
}
262

UNCOV
263
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
×
UNCOV
264
  int32_t code = 0;
×
UNCOV
265
  SVCreateTbBatchReq pReq = {0};
×
UNCOV
266
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
×
UNCOV
267
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
×
UNCOV
268
  TQ_NULL_GO_TO_END(pReq.pArray);
×
UNCOV
269
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
×
UNCOV
270
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
×
UNCOV
271
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
×
272
  }
UNCOV
273
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
×
UNCOV
274
  if (code < 0) {
×
275
    goto END;
×
276
  }
UNCOV
277
  *len += sizeof(SMsgHead);
×
UNCOV
278
  *pBuf = taosMemoryMalloc(*len);
×
UNCOV
279
  TQ_NULL_GO_TO_END(pBuf);
×
UNCOV
280
  SEncoder coder = {0};
×
UNCOV
281
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
×
UNCOV
282
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
×
UNCOV
283
  tEncoderClear(&coder);
×
284

UNCOV
285
END:
×
UNCOV
286
  taosArrayDestroy(pReq.pArray);
×
UNCOV
287
  return code;
×
288
}
289

290
#define SEND_BATCH_META_RSP \
291
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
292
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
293
goto END;
294

295
#define SEND_DATA_RSP \
296
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
297
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
298
goto END;
299
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
259,896✔
300
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
301
  int32_t         vgId = TD_VID(pTq->pVnode);
259,896✔
302
  SMqDataRsp      taosxRsp = {0};
259,896✔
303
  SMqBatchMetaRsp btMetaRsp = {0};
259,896✔
304
  int32_t         code = 0;
259,704✔
305

306
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
259,704!
307
  if (offset->type != TMQ_OFFSET__LOG) {
259,554✔
308
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
14,264!
309

310
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
14,264✔
311
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
358✔
312
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
358!
313
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
314
      goto END;
358✔
315
    }
316

317
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
13,906!
318
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
319
    if (taosxRsp.blockNum > 0) {
13,906✔
320
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
13,205✔
321
      goto END;
13,031✔
322
    } else {
323
      tOffsetCopy(offset, &taosxRsp.rspOffset);
701✔
324
    }
325
  }
326

327
  if (offset->type == TMQ_OFFSET__LOG) {
246,162!
328
    walReaderVerifyOffset(pHandle->pWalReader, offset);
246,162✔
329
    int64_t fetchVer = offset->version;
246,162✔
330

331
    uint64_t st = taosGetTimestampMs();
246,333✔
332
    int      totalRows = 0;
246,333✔
333
    int32_t  totalMetaRows = 0;
246,162✔
334
    while (1) {
2,360,365✔
335
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
2,606,527✔
336
      if (savedEpoch > pRequest->epoch) {
2,606,869!
337
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
338
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
339
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
340
        goto END;
×
341
      }
342

343
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
2,606,344✔
344
        if (totalMetaRows > 0) {
231,985!
UNCOV
345
          SEND_BATCH_META_RSP
×
346
        }
347
        SEND_DATA_RSP
463,970!
348
      }
349

350
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
2,378,478✔
351
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
2,378,478!
352
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
353

354
      // process meta
355
      if (pHead->msgType != TDMT_VND_SUBMIT) {
2,378,478✔
356
        if (totalRows > 0) {
182!
UNCOV
357
          SEND_DATA_RSP
×
358
        }
359

360
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
182!
UNCOV
361
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
×
UNCOV
362
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
×
UNCOV
363
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
×
UNCOV
364
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
×
UNCOV
365
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
×
UNCOV
366
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
×
UNCOV
367
          } else if (pHead->msgType == TDMT_VND_DELETE) {
×
UNCOV
368
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
×
369
          }
370
        }
371

372
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
182!
373
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
374
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
182!
375
          SMqMetaRsp metaRsp = {0};
182✔
376
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
182✔
377
          metaRsp.resMsgType = pHead->msgType;
182✔
378
          metaRsp.metaRspLen = pHead->bodyLen;
182✔
379
          metaRsp.metaRsp = pHead->body;
182✔
380
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
182✔
381
          goto END;
182✔
382
        }
UNCOV
383
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
×
UNCOV
384
        fetchVer++;
×
UNCOV
385
        if (code != 0){
×
386
          goto END;
×
387
        }
UNCOV
388
        totalMetaRows++;
×
UNCOV
389
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
×
390
          SEND_BATCH_META_RSP
×
391
        }
UNCOV
392
        continue;
×
393
      }
394

395
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
2,378,296!
396
        SEND_BATCH_META_RSP
×
397
      }
398

399
      // process data
400
      SPackedData submit = {
7,134,888✔
401
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
2,378,296✔
402
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
2,378,296✔
403
          .ver = pHead->version,
2,378,296✔
404
      };
405

406
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
2,378,296!
407

408
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
2,374,870!
UNCOV
409
        int32_t len = 0;
×
UNCOV
410
        void *pBuf = NULL;
×
UNCOV
411
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
×
UNCOV
412
        if (code == 0){
×
UNCOV
413
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
×
414
        }
UNCOV
415
        taosMemoryFree(pBuf);
×
UNCOV
416
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
×
UNCOV
417
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
×
UNCOV
418
          if (pCreateTbReq != NULL) {
×
UNCOV
419
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
×
420
          }
UNCOV
421
          taosMemoryFree(pCreateTbReq);
×
422
        }
UNCOV
423
        taosArrayDestroy(taosxRsp.createTableReq);
×
UNCOV
424
        taosxRsp.createTableReq = NULL;
×
UNCOV
425
        fetchVer++;
×
UNCOV
426
        if (code != 0){
×
427
          goto END;
×
428
        }
UNCOV
429
        totalMetaRows++;
×
UNCOV
430
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||
×
UNCOV
431
            (taosGetTimestampMs() - st > pRequest->timeout) ||
×
UNCOV
432
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
×
UNCOV
433
          SEND_BATCH_META_RSP
×
434
        }
435
        continue;
×
436
      }
437

438
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
2,377,438!
439
          (taosGetTimestampMs() - st > pRequest->timeout) ||
2,361,049✔
440
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
2,361,883!
441
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
442
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
17,076!
443
          terrno = 0;
×
444
        } else{
445
          fetchVer++;
14,166✔
446
        }
447
        SEND_DATA_RSP
28,332!
448
      } else {
449
        fetchVer++;
2,359,852✔
450
      }
451
    }
452
  }
453

454
END:
259,551✔
455
  if (code != 0){
259,896!
456
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
457
            pRequest->subKey);
458
  }
459
  tDeleteMqBatchMetaRsp(&btMetaRsp);
259,896✔
460
  tDeleteSTaosxRsp(&taosxRsp);
259,539✔
461
  return code;
259,713✔
462
}
463

464
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
2,105,012✔
465
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
2,105,012!
466
    return TSDB_CODE_TMQ_INVALID_MSG;
×
467
  }
468
  int32_t      code = 0;
2,105,012✔
469
  STqOffsetVal reqOffset = {0};
2,105,012✔
470
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
2,105,012✔
471

472
  // reset the offset if needed
473
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
2,104,833✔
474
    bool blockReturned = false;
64,053✔
475
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
64,053✔
476
    if (code != 0) {
64,053!
UNCOV
477
      goto END;
×
478
    }
479

480
    // empty block returned, quit
481
    if (blockReturned) {
64,053!
482
      goto END;
1,380✔
483
    }
484
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
2,040,959!
485
    uError("req offset type is 0");
×
486
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
487
    goto END;
×
488
  }
489

490
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,103,445✔
491
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
1,843,554✔
492
  } else {
493
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
259,896✔
494
  }
495

496
END:
2,105,016✔
497
  if (code != 0){
2,104,829✔
498
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
25,617!
499
  }
500
  tOffsetDestroy(&reqOffset);
2,104,829✔
501
  return code;
2,104,829✔
502
}
503

504
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
1,265,940✔
505
                          int64_t ever) {
506
  if (pMsgHead == NULL) {
1,265,940!
507
    return;
×
508
  }
509
  pMsgHead->consumerId = consumerId;
1,265,940✔
510
  pMsgHead->epoch = epoch;
1,266,123✔
511
  pMsgHead->mqMsgType = type;
1,266,123✔
512
  pMsgHead->walsver = sver;
1,265,940✔
513
  pMsgHead->walever = ever;
1,265,940✔
514
}
515

516
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
358✔
517
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
518
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
358!
519
    return TSDB_CODE_TMQ_INVALID_MSG;
×
520
  }
521
  int32_t len = 0;
358✔
522
  int32_t code = 0;
358✔
523
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
358!
524
  if (code < 0) {
358!
525
    return TAOS_GET_TERRNO(code);
×
526
  }
527
  int32_t tlen = sizeof(SMqRspHead) + len;
358✔
528
  void*   buf = rpcMallocCont(tlen);
358✔
529
  if (buf == NULL) {
358!
530
    return TAOS_GET_TERRNO(terrno);
×
531
  }
532

533
  int64_t sver = 0, ever = 0;
358✔
534
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
358✔
535
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
358✔
536

537
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
358✔
538

539
  SEncoder encoder = {0};
358✔
540
  tEncoderInit(&encoder, abuf, len);
358✔
541
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
358✔
542
  tEncoderClear(&encoder);
358✔
543
  if (code < 0) {
358!
544
    rpcFreeCont(buf);
×
545
    return TAOS_GET_TERRNO(code);
×
546
  }
547
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
358✔
548

549
  tmsgSendRsp(&resp);
358✔
550
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
358!
551
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
552

553
  return 0;
358✔
554
}
555

556
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
182✔
557
                          int32_t vgId) {
558
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
182!
559
    return TSDB_CODE_TMQ_INVALID_MSG;
×
560
  }
561
  int32_t len = 0;
182✔
562
  int32_t code = 0;
182✔
563
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
182!
564
  if (code < 0) {
182!
565
    return TAOS_GET_TERRNO(code);
×
566
  }
567
  int32_t tlen = sizeof(SMqRspHead) + len;
182✔
568
  void*   buf = rpcMallocCont(tlen);
182✔
569
  if (buf == NULL) {
182!
570
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
571
  }
572

573
  int64_t sver = 0, ever = 0;
182✔
574
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
182✔
575
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
182✔
576

577
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
182✔
578

579
  SEncoder encoder = {0};
182✔
580
  tEncoderInit(&encoder, abuf, len);
182✔
581
  code = tEncodeMqMetaRsp(&encoder, pRsp);
182✔
582
  tEncoderClear(&encoder);
182✔
583
  if (code < 0) {
182!
584
    rpcFreeCont(buf);
×
585
    return TAOS_GET_TERRNO(code);
×
586
  }
587

588
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
182✔
589

590
  tmsgSendRsp(&resp);
182✔
591
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
182!
592
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
593

594
  return 0;
182✔
595
}
596

597
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
1,265,583✔
598
                        int32_t type, int64_t sver, int64_t ever) {
599
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
1,265,583!
600
    return TSDB_CODE_TMQ_INVALID_MSG;
×
601
  }
602
  int32_t len = 0;
1,265,583✔
603
  int32_t code = 0;
1,265,583✔
604

605
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
1,265,583!
606
    pRsp->withSchema = 0;
×
607
  }
608
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
1,265,583!
609
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
182!
610
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
611
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
1,265,401!
612
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
182!
613
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
182!
614
  }
615

616
  if (code < 0) {
1,265,400!
617
    return TAOS_GET_TERRNO(code);
×
618
  }
619

620
  int32_t tlen = sizeof(SMqRspHead) + len;
1,265,400✔
621
  void*   buf = rpcMallocCont(tlen);
1,265,400✔
622
  if (buf == NULL) {
1,265,400!
623
    return terrno;
×
624
  }
625

626
  SMqRspHead* pHead = (SMqRspHead*)buf;
1,265,400✔
627
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
1,265,400✔
628

629
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
1,265,400✔
630

631
  SEncoder encoder = {0};
1,265,400✔
632
  tEncoderInit(&encoder, abuf, len);
1,265,400✔
633

634
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
1,265,405!
635
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
182!
636
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
637
    code = tEncodeMqDataRsp(&encoder, pRsp);
1,265,223✔
638
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
182!
639
    code = tEncodeSTaosxRsp(&encoder, pRsp);
182✔
640
  }
641
  tEncoderClear(&encoder);
1,265,583✔
642
  if (code < 0) {
1,265,400!
643
    rpcFreeCont(buf);
×
644
    return TAOS_GET_TERRNO(code);
×
645
  }
646
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
1,265,400✔
647

648
  tmsgSendRsp(&rsp);
1,265,400✔
649
  return 0;
1,265,409✔
650
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc