• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5001

24 Mar 2026 01:11AM UTC coverage: 72.254% (-0.05%) from 72.307%
#5001

push

travis-ci

web-flow
fix: possible memory leak in tdb; (#34872)

1 of 20 new or added lines in 2 files covered. (5.0%)

770 existing lines in 143 files now uncovered.

253287 of 350550 relevant lines covered (72.25%)

130300301.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.63
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
10,137,228✔
24
  int32_t    code = TDB_CODE_SUCCESS;
10,137,228✔
25
  int32_t    lino = 0;
10,137,228✔
26
  tqDebug("%s called", __FUNCTION__ );
10,137,228✔
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
10,137,581✔
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
10,137,581✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
10,137,228✔
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
10,136,567✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
10,136,636✔
34

35
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
10,135,270✔
36
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
10,134,312✔
37
  
38
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
10,133,536✔
39
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
10,136,981✔
40
  pRsp->withTbName = 0;
10,137,581✔
41
  pRsp->withSchema = 1;
10,137,581✔
42

43
END:
10,134,592✔
44
  if (code != 0){
10,134,592✔
45
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
46
    taosArrayDestroy(pRsp->blockData);
×
47
    taosArrayDestroy(pRsp->blockDataLen);
×
48
    taosArrayDestroy(pRsp->blockSchema);
×
49
  }
50
  return code;
10,134,155✔
51
}
52

53
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
5,717,051✔
54
  int32_t    code = TDB_CODE_SUCCESS;
5,717,051✔
55
  int32_t    lino = 0;
5,717,051✔
56
  tqDebug("%s called", __FUNCTION__ );
5,717,051✔
57
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
5,717,051✔
58
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
5,717,051✔
59
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
5,716,680✔
60

61
  pRsp->withTbName = 1;
5,717,051✔
62
  pRsp->withSchema = 1;
5,717,051✔
63
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
5,717,051✔
64
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
5,717,051✔
65

66
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
5,717,051✔
67
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
5,717,051✔
68

69
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
5,716,752✔
70
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
5,716,112✔
71

72
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
5,716,072✔
73
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
5,716,047✔
74

75
END:
5,715,751✔
76
  if (code != 0){
5,715,751✔
77
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
78
    taosArrayDestroy(pRsp->blockData);
×
79
    taosArrayDestroy(pRsp->blockDataLen);
×
80
    taosArrayDestroy(pRsp->blockTbName);
×
81
    taosArrayDestroy(pRsp->blockSchema);
×
82
  }
83
  return code;
5,716,388✔
84
}
85

86
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
600,684✔
87
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
88
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
600,684✔
89
    return TSDB_CODE_INVALID_PARA;
×
90
  }
91
  uint64_t   consumerId = pRequest->consumerId;
603,562✔
92
  STqOffset* pOffset = NULL;
603,562✔
93
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
603,562✔
94
  int32_t    vgId = TD_VID(pTq->pVnode);
600,881✔
95

96
  *pBlockReturned = false;
602,202✔
97
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
98
  if (code == 0) {
599,534✔
99
    tOffsetCopy(pOffsetVal, &pOffset->val);
100,769✔
100

101
    char formatBuf[TSDB_OFFSET_LEN] = {0};
100,769✔
102
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
100,769✔
103
    tqDebug("tmq poll: consumer:0x%" PRIx64
100,769✔
104
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
105
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
106
    return 0;
100,769✔
107
  } else {
108
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
109
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
498,765✔
110
      if (pRequest->useSnapshot) {
474,195✔
111
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
242,988✔
112
                consumerId, pHandle->subKey, vgId);
113
        if (pHandle->fetchMeta) {
243,654✔
114
          tqOffsetResetToMeta(pOffsetVal, 0);
115
        } else {
116
          SValue val = {0};
241,823✔
117
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
118
        }
119
      } else {
120
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
229,509✔
121
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
230,877✔
122
      }
123
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
26,905✔
124
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
17,122✔
125
      SMqDataRsp dataRsp = {0};
17,465✔
126
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
17,465✔
127

128
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
16,769✔
129
      if (code != 0) {
17,465✔
130
        return code;
×
131
      }
132
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
17,465✔
133
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
134
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
17,465✔
135
      tDeleteMqDataRsp(&dataRsp);
17,465✔
136

137
      *pBlockReturned = true;
17,112✔
138
      return code;
17,112✔
139
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
9,783✔
140
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
10,136✔
141
                  " in vg %d, subkey %s, reset none failed",
142
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
143
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
10,136✔
144
    }
145
  }
146

147
  return 0;
473,880✔
148
}
149

150
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
10,118,817✔
151
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
152
  int32_t    code = TDB_CODE_SUCCESS;
10,118,817✔
153
  int32_t    lino = 0;
10,118,817✔
154
  tqDebug("%s called", __FUNCTION__ );
10,118,817✔
155
  uint64_t consumerId = pRequest->consumerId;
10,119,162✔
156
  int32_t  vgId = TD_VID(pTq->pVnode);
10,119,452✔
157
  terrno = 0;
10,119,452✔
158

159
  SMqDataRsp dataRsp = {0};
10,119,452✔
160
  code = tqInitDataRsp(&dataRsp, *pOffset);
10,119,452✔
161
  TSDB_CHECK_CODE(code, lino, end);
10,115,456✔
162

163
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
10,115,456✔
164
  TSDB_CHECK_CODE(code, lino, end);
10,119,138✔
165

166
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
10,119,138✔
167
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
10,119,138✔
168
    goto end;
193,349✔
169
  }
170

171
  if (terrno == TSDB_CODE_TMQ_FETCH_TIMEOUT && dataRsp.blockNum == 0) {
9,925,789✔
172
    dataRsp.timeout = true;
×
173
  }
174
  
175
  // reqOffset represents the current date offset, may be changed if wal not exists
176
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
9,925,125✔
177
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
9,925,813✔
178

179
end:
10,115,734✔
180
  {
181
    char buf[TSDB_OFFSET_LEN] = {0};
10,116,699✔
182
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
10,117,279✔
183
    if (code != 0){
10,111,699✔
184
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
193,349✔
185
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
186
    } else {
187
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
9,918,350✔
188
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
189
    }
190

191
    tDeleteMqDataRsp(&dataRsp);
10,119,215✔
192
    return code;
10,119,410✔
193
  }
194
}
195

196
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
197
  SDecoder decoder = {0};                                                                                  \
198
  TYPE     req = {0};                                                                                      \
199
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
200
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
201
  tDecoderInit(&decoder, data, len);                                                                       \
202
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
203
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
204
            " msgType %d",                                                                                 \
205
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
206
    fetchVer++;                                                                                            \
207
    DELETE_FUNC(&req);                                                                                     \
208
    tDecoderClear(&decoder);                                                                               \
209
    continue;                                                                                              \
210
  }                                                                                                        \
211
  DELETE_FUNC(&req);                                                                                       \
212
  tDecoderClear(&decoder);
213

214
static void tDeleteCommon(void* parm) {}
676✔
215

216
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
217
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
218
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
219

220
static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
999✔
221
  int32_t         code = 0;
999✔
222

223
  if (!btMetaRsp->batchMetaReq) {
999✔
224
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
999✔
225
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
999✔
226
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
999✔
227
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
999✔
228
  }
229

230
  SMqMetaRsp tmpMetaRsp = {0};
999✔
231
  tmpMetaRsp.resMsgType = type;
999✔
232
  tmpMetaRsp.metaRspLen = bodyLen;
999✔
233
  tmpMetaRsp.metaRsp = body;
999✔
234
  uint32_t len = 0;
999✔
235
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
999✔
236
  if (TSDB_CODE_SUCCESS != code) {
999✔
237
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
238
    goto END;
×
239
  }
240
  int32_t tLen = sizeof(SMqRspHead) + len;
999✔
241
  void*   tBuf = taosMemoryCalloc(1, tLen);
999✔
242
  TQ_NULL_GO_TO_END(tBuf);
999✔
243
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
999✔
244
  SEncoder encoder = {0};
999✔
245
  tEncoderInit(&encoder, metaBuff, len);
999✔
246
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
999✔
247
  tEncoderClear(&encoder);
999✔
248

249
  if (code < 0) {
999✔
250
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
251
    goto END;
×
252
  }
253
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
1,998✔
254
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
1,998✔
255

256
END:
999✔
257
  return code;
999✔
258
}
259

260
static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
999✔
261
  int32_t code = 0;
999✔
262
  SVCreateTbBatchReq pReq = {0};
999✔
263
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
999✔
264
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
999✔
265
  TQ_NULL_GO_TO_END(pReq.pArray);
999✔
266
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
1,998✔
267
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
999✔
268
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
1,998✔
269
  }
270
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
999✔
271
  if (code < 0) {
999✔
272
    goto END;
×
273
  }
274
  *len += sizeof(SMsgHead);
999✔
275
  *pBuf = taosMemoryMalloc(*len);
999✔
276
  TQ_NULL_GO_TO_END(pBuf);
999✔
277
  SEncoder coder = {0};
999✔
278
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
999✔
279
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
999✔
280
  tEncoderClear(&coder);
999✔
281

282
END:
999✔
283
  taosArrayDestroy(pReq.pArray);
999✔
284
  return code;
999✔
285
}
286

287
#define SEND_BATCH_META_RSP \
288
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
289
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
290
goto END;
291

292
#define SEND_DATA_RSP \
293
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
294
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
295
goto END;
296

297
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
5,716,727✔
298
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
299
  int32_t         vgId = TD_VID(pTq->pVnode);
5,716,727✔
300
  SMqDataRsp      taosxRsp = {0};
5,716,356✔
301
  SMqBatchMetaRsp btMetaRsp = {0};
5,716,356✔
302
  int32_t         code = 0;
5,716,680✔
303

304
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
5,716,680✔
305
  if (offset->type != TMQ_OFFSET__LOG) {
5,715,381✔
306
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
46,558✔
307

308
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
46,558✔
309
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2,492✔
310
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
2,492✔
311
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
312
      goto END;
2,492✔
313
    }
314

315
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
44,066✔
316
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
317
    if (taosxRsp.blockNum > 0) {
44,066✔
318
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
41,977✔
319
      goto END;
41,977✔
320
    } else {
321
      tOffsetCopy(offset, &taosxRsp.rspOffset);
2,089✔
322
    }
323
  }
324

325
  if (offset->type == TMQ_OFFSET__LOG) {
5,671,620✔
326
    walReaderVerifyOffset(pHandle->pWalReader, offset);
5,672,582✔
327
    int64_t fetchVer = offset->version;
5,670,238✔
328

329
    uint64_t st = taosGetTimestampMs();
5,669,573✔
330
    int      totalRows = 0;
5,669,573✔
331
    int32_t  totalMetaRows = 0;
5,668,611✔
332
    while (1) {
48,952,190✔
333
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
54,620,801✔
334
      if (savedEpoch > pRequest->epoch) {
54,654,760✔
335
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
336
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
337
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
338
        goto END;
×
339
      }
340

341
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
54,625,321✔
342
        if (totalMetaRows > 0) {
5,462,973✔
343
          SEND_BATCH_META_RSP
×
344
        }
345
        SEND_DATA_RSP
10,925,946✔
346
      }
347

348
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
49,280,106✔
349
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
49,280,106✔
350
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
351

352
      // process meta
353
      if (pHead->msgType != TDMT_VND_SUBMIT) {
49,287,208✔
354
        if (totalRows > 0) {
28,987✔
355
          SEND_DATA_RSP
11,950✔
356
        }
357

358
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
23,012✔
359
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
676✔
360
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
×
361
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
676✔
362
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, destroyAlterTbReq)
×
363
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
1,352✔
364
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
676✔
365
          } else if (pHead->msgType == TDMT_VND_DELETE) {
×
366
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
×
367
          }
368
        }
369

370
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
23,012✔
371
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
372
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
23,012✔
373
          SMqMetaRsp metaRsp = {0};
23,012✔
374
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
23,012✔
375
          metaRsp.resMsgType = pHead->msgType;
23,012✔
376
          metaRsp.metaRspLen = pHead->bodyLen;
23,012✔
377
          metaRsp.metaRsp = pHead->body;
23,012✔
378
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
23,012✔
379
          goto END;
23,012✔
380
        }
381
        code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
×
382
        fetchVer++;
×
383
        if (code != 0){
×
384
          goto END;
×
385
        }
386
        totalMetaRows++;
×
387
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) || (taosGetTimestampMs() - st > pRequest->timeout)) {
×
388
          SEND_BATCH_META_RSP
×
389
        }
390
        continue;
×
391
      }
392

393
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
49,250,795✔
394
        SEND_BATCH_META_RSP
×
395
      }
396

397
      // process data
398
      SPackedData submit = {
147,752,338✔
399
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
49,250,795✔
400
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
49,250,795✔
401
          .ver = pHead->version,
49,250,795✔
402
      };
403

404
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
49,250,748✔
405

406
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
49,140,633✔
407
        int32_t len = 0;
999✔
408
        void *pBuf = NULL;
999✔
409
        code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
999✔
410
        if (code == 0){
999✔
411
          code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
999✔
412
        }
413
        taosMemoryFree(pBuf);
999✔
414
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
1,998✔
415
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
999✔
416
          if (pCreateTbReq != NULL) {
999✔
417
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
999✔
418
          }
419
          taosMemoryFree(pCreateTbReq);
999✔
420
        }
421
        taosArrayDestroy(taosxRsp.createTableReq);
999✔
422
        taosxRsp.createTableReq = NULL;
999✔
423
        fetchVer++;
999✔
424
        if (code != 0){
999✔
425
          goto END;
×
426
        }
427
        totalMetaRows++;
999✔
428
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) ||
999✔
429
            (taosGetTimestampMs() - st > pRequest->timeout) ||
999✔
430
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
999✔
431
          SEND_BATCH_META_RSP
1,998✔
432
        }
433
        continue;
×
434
      }
435

436
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
49,220,157✔
437
          (taosGetTimestampMs() - st > pRequest->timeout) ||
49,034,371✔
438
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
49,037,532✔
439
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
440
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
292,622✔
441
          terrno = 0;
×
442
        } else{
443
          fetchVer++;
179,623✔
444
        }
445
        SEND_DATA_RSP
359,246✔
446
      } else {
447
        fetchVer++;
48,948,349✔
448
      }
449
    }
450
  }
451

452
END:
5,715,201✔
453
  if (code != 0){
5,715,194✔
454
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
455
            pRequest->subKey);
456
  }
457
  tDeleteMqBatchMetaRsp(&btMetaRsp);
5,715,194✔
458
  tDeleteSTaosxRsp(&taosxRsp);
5,711,478✔
459
  return code;
5,701,596✔
460
}
461

462
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
15,862,558✔
463
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
15,862,558✔
464
    return TSDB_CODE_TMQ_INVALID_MSG;
×
465
  }
466
  int32_t      code = 0;
15,864,104✔
467
  STqOffsetVal reqOffset = {0};
15,864,104✔
468
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
15,864,104✔
469

470
  // reset the offset if needed
471
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
15,864,104✔
472
    bool blockReturned = false;
603,238✔
473
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
602,906✔
474
    if (code != 0) {
602,235✔
475
      goto END;
10,136✔
476
    }
477

478
    // empty block returned, quit
479
    if (blockReturned) {
592,099✔
480
      goto END;
17,112✔
481
    }
482
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
15,259,581✔
483
    uError("req offset type is 0");
×
484
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
485
    goto END;
×
486
  }
487

488
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
15,834,244✔
489
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
10,118,834✔
490
  } else {
491
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
5,716,727✔
492
  }
493

494
END:
15,857,041✔
495
  if (code != 0){
15,857,070✔
496
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
203,485✔
497
  }
498
  tOffsetDestroy(&reqOffset);
15,857,070✔
499
  return code;
15,858,716✔
500
}
501

502
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
15,655,365✔
503
                          int64_t ever) {
504
  if (pMsgHead == NULL) {
15,655,365✔
505
    return;
×
506
  }
507
  pMsgHead->consumerId = consumerId;
15,655,365✔
508
  pMsgHead->epoch = epoch;
15,658,910✔
509
  pMsgHead->mqMsgType = type;
15,653,394✔
510
  pMsgHead->walsver = sver;
15,645,345✔
511
  pMsgHead->walever = ever;
15,655,369✔
512
}
513

514
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
3,491✔
515
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
516
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
3,491✔
517
    return TSDB_CODE_TMQ_INVALID_MSG;
×
518
  }
519
  int32_t len = 0;
3,491✔
520
  int32_t code = 0;
3,491✔
521
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
3,491✔
522
  if (code < 0) {
3,491✔
523
    return TAOS_GET_TERRNO(code);
×
524
  }
525
  int32_t tlen = sizeof(SMqRspHead) + len;
3,491✔
526
  void*   buf = rpcMallocCont(tlen);
3,491✔
527
  if (buf == NULL) {
3,491✔
528
    return TAOS_GET_TERRNO(terrno);
×
529
  }
530

531
  int64_t sver = 0, ever = 0;
3,491✔
532
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
3,491✔
533
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
3,491✔
534

535
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
3,491✔
536

537
  SEncoder encoder = {0};
3,491✔
538
  tEncoderInit(&encoder, abuf, len);
3,491✔
539
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
3,491✔
540
  tEncoderClear(&encoder);
3,491✔
541
  if (code < 0) {
3,491✔
542
    rpcFreeCont(buf);
×
543
    return TAOS_GET_TERRNO(code);
×
544
  }
545
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
3,491✔
546

547
  tmsgSendRsp(&resp);
3,491✔
548
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
3,491✔
549
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
550

551
  return 0;
3,491✔
552
}
553

554
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
23,012✔
555
                          int32_t vgId) {
556
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
23,012✔
557
    return TSDB_CODE_TMQ_INVALID_MSG;
×
558
  }
559
  int32_t len = 0;
23,012✔
560
  int32_t code = 0;
23,012✔
561
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
23,012✔
562
  if (code < 0) {
23,012✔
563
    return TAOS_GET_TERRNO(code);
×
564
  }
565
  int32_t tlen = sizeof(SMqRspHead) + len;
23,012✔
566
  void*   buf = rpcMallocCont(tlen);
23,012✔
567
  if (buf == NULL) {
23,012✔
568
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
569
  }
570

571
  int64_t sver = 0, ever = 0;
23,012✔
572
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
23,012✔
573
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
23,012✔
574

575
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
23,012✔
576

577
  SEncoder encoder = {0};
23,012✔
578
  tEncoderInit(&encoder, abuf, len);
23,012✔
579
  code = tEncodeMqMetaRsp(&encoder, pRsp);
23,012✔
580
  tEncoderClear(&encoder);
23,012✔
581
  if (code < 0) {
23,012✔
582
    rpcFreeCont(buf);
×
583
    return TAOS_GET_TERRNO(code);
×
584
  }
585

586
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
23,012✔
587

588
  tmsgSendRsp(&resp);
23,012✔
589
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
23,012✔
590
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
591

592
  return 0;
23,012✔
593
}
594

595
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
15,634,477✔
596
                        int32_t type, int64_t sver, int64_t ever) {
597
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
15,634,477✔
UNCOV
598
    return TSDB_CODE_TMQ_INVALID_MSG;
×
599
  }
600
  int32_t len = 0;
15,634,780✔
601
  int32_t code = 0;
15,634,780✔
602

603
  if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
15,634,780✔
604
    pRsp->withSchema = 0;
×
605
  }
606
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
15,634,780✔
607
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
1,395✔
608
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
609
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
15,633,385✔
610
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
1,395✔
611
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
1,395✔
612
  }
613

614
  if (code < 0) {
15,628,668✔
615
    return TAOS_GET_TERRNO(code);
×
616
  }
617

618
  int32_t tlen = sizeof(SMqRspHead) + len;
15,628,668✔
619
  void*   buf = rpcMallocCont(tlen);
15,628,668✔
620
  if (buf == NULL) {
15,626,956✔
621
    return terrno;
×
622
  }
623

624
  SMqRspHead* pHead = (SMqRspHead*)buf;
15,626,956✔
625
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
15,626,956✔
626

627
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
15,629,770✔
628

629
  SEncoder encoder = {0};
15,632,009✔
630
  tEncoderInit(&encoder, abuf, len);
15,631,846✔
631

632
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
15,625,029✔
633
      type == TMQ_MSG_TYPE__WALINFO_RSP ||
1,395✔
634
      type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
635
    code = tEncodeMqDataRsp(&encoder, pRsp);
15,623,634✔
636
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
1,395✔
637
    code = tEncodeSTaosxRsp(&encoder, pRsp);
1,395✔
638
  }
639
  tEncoderClear(&encoder);
15,619,831✔
640
  if (code < 0) {
15,618,291✔
641
    rpcFreeCont(buf);
×
642
    return TAOS_GET_TERRNO(code);
×
643
  }
644
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
15,618,291✔
645

646
  tmsgSendRsp(&rsp);
15,616,293✔
647
  return 0;
15,631,719✔
648
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc