• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4969

27 Feb 2026 07:19AM UTC coverage: 67.69% (+0.8%) from 66.902%
#4969

push

travis-ci

web-flow
merge: from main to 3.0 #34603

15 of 58 new or added lines in 2 files covered. (25.86%)

5075 existing lines in 154 files now uncovered.

208337 of 307781 relevant lines covered (67.69%)

129686642.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.63
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
×
18
  int32_t    code = TDB_CODE_SUCCESS;
×
19
  int32_t    lino = 0;
×
20
  void*      buf = NULL;
×
21

22
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
×
23
  buf = taosMemoryCalloc(1, dataStrLen);
×
24
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
27
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
×
28
  pRetrieve->precision = precision;
×
29
  pRetrieve->compressed = 0;
×
30

31
  memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
×
32
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
×
33
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
34
  pRsp->blockDataElementFree = true;
×
35

36
  tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
37
  END:
×
38
  if (code != TSDB_CODE_SUCCESS) {
×
39
    taosMemoryFree(buf);
×
40
    tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
×
41
  }
42
  return code;
×
43
}
44

45
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, const SSchemaWrapper* pSW, int8_t precision) {
57,128,283✔
46
  int32_t code = 0;
57,128,283✔
47
  int32_t lino = 0;
57,128,283✔
48
  SSchemaWrapper* pSchema = NULL;
57,128,283✔
49
  
50
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
57,146,028✔
51
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
57,193,427✔
52
  void*   buf = taosMemoryCalloc(1, dataStrLen);
57,193,427✔
53
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
57,145,649✔
54

55
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
57,145,649✔
56
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
57,145,649✔
57
  pRetrieve->precision = precision;
57,158,228✔
58
  pRetrieve->compressed = 0;
57,166,537✔
59
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
57,149,619✔
60

61
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, pSW->nCols);
57,174,199✔
62
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
57,138,600✔
63

64
  actualLen += sizeof(SRetrieveTableRspForTmq);
57,138,600✔
65
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
114,324,402✔
66
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
114,374,352✔
67
  pSchema = tCloneSSchemaWrapper(pSW);
57,164,381✔
68
  TSDB_CHECK_NULL(pSchema, code, lino, END, terrno);
57,164,381✔
69
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSchema), code, lino, END, terrno);
114,347,852✔
70
  pSchema = NULL;
57,183,471✔
71
  pRsp->blockDataElementFree = true;
57,183,471✔
72
  tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
57,174,302✔
73

74
END:
57,174,302✔
75
  tDeleteSchemaWrapper(pSchema);
57,160,268✔
76
  if (code != TSDB_CODE_SUCCESS){
57,130,508✔
77
    taosMemoryFree(buf);
×
78
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
79
  }
80
  return code;
57,130,508✔
81
}
82

83
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
43,370,265✔
84
  int32_t    code = TDB_CODE_SUCCESS;
43,370,265✔
85
  int32_t    lino = 0;
43,370,265✔
86
  SMetaReader mr = {0};
43,370,265✔
87

88
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
43,388,004✔
89
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
43,388,004✔
90

91
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
43,388,004✔
92

93
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
43,372,837✔
94
  TSDB_CHECK_CODE(code, lino, END);
43,360,657✔
95

96
  for (int32_t i = 0; i < n; i++) {
86,764,492✔
97
    char* tbName = taosStrdup(mr.me.name);
43,318,932✔
98
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
43,364,768✔
99
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
86,768,897✔
100
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
101
      continue;
×
102
    }
103
    tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
43,404,129✔
104
  }
105

106
END:
43,445,906✔
107
  if (code != TSDB_CODE_SUCCESS) {
43,389,668✔
108
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
109
  }
110
  metaReaderClear(&mr);
43,389,668✔
111
  return code;
43,382,398✔
112
}
113

114
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
23,974,561✔
115
  if (task == NULL || pHandle == NULL || res == NULL) {
23,974,561✔
UNCOV
116
    return TSDB_CODE_INVALID_PARA;
×
117
  }
118
  uint64_t ts = 0;
23,978,394✔
119
  qStreamSetOpen(task);
23,978,729✔
120

121
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
23,975,044✔
122
  int32_t code = qExecTask(task, res, &ts);
23,977,702✔
123
  if (code != TSDB_CODE_SUCCESS) {
23,980,064✔
124
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
125
  }
126

127
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
23,980,064✔
128
  return code;
23,980,064✔
129
}
130

131
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
6,006✔
132
  int32_t code = 0;
6,006✔
133
  int32_t lino = 0;
6,006✔
134

135
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
6,006✔
136
    blockDataDestroy(pHandle->block);
273✔
137
    pHandle->block = NULL;
273✔
138
  }
139
  if (pHandle->block == NULL) {
6,006✔
140
    if (pDataBlock == NULL) {
2,730✔
141
      goto END;
1,365✔
142
    }
143

144
    STqOffsetVal offset = {0};
1,365✔
145
    code = qStreamExtractOffset(task, &offset);
1,365✔
146
    TSDB_CHECK_CODE(code, lino, END);
1,365✔
147

148
    pHandle->block = NULL;
1,365✔
149

150
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
1,365✔
151
    TSDB_CHECK_CODE(code, lino, END);
1,365✔
152

153
    pHandle->blockTime = offset.ts;
1,365✔
154
    tOffsetDestroy(&offset);
1,365✔
155
    int32_t vgId = TD_VID(pTq->pVnode);
1,365✔
156
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
1,365✔
157
    TSDB_CHECK_CODE(code, lino, END);
1,365✔
158
  }
159

160
  const STqExecHandle* pExec = &pHandle->execHandle;
4,641✔
161
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
4,641✔
162
  TSDB_CHECK_CODE(code, lino, END);
4,641✔
163

164
  pRsp->blockNum++;
4,641✔
165
  if (pDataBlock == NULL) {
4,641✔
166
    blockDataDestroy(pHandle->block);
1,092✔
167
    pHandle->block = NULL;
1,092✔
168
  } else {
169
    code = copyDataBlock(pHandle->block, pDataBlock);
3,549✔
170
    TSDB_CHECK_CODE(code, lino, END);
3,549✔
171

172
    STqOffsetVal offset = {0};
3,549✔
173
    code = qStreamExtractOffset(task, &offset);
3,549✔
174
    TSDB_CHECK_CODE(code, lino, END);
3,549✔
175

176
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
3,549✔
177
    pHandle->blockTime = offset.ts;
3,549✔
178
    tOffsetDestroy(&offset);
3,549✔
179
  }
180

181
END:
6,006✔
182
  if (code != TSDB_CODE_SUCCESS) {
6,006✔
UNCOV
183
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
184
  }
185
  return code;
6,006✔
186
}
187

188
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
24,159,328✔
189
  int32_t code = 0;
24,159,328✔
190
  int32_t lino = 0;
24,159,328✔
191
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
24,159,328✔
192
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
24,159,328✔
193
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
24,159,328✔
194
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
24,159,328✔
195
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
24,159,328✔
196

197
  int32_t vgId = TD_VID(pTq->pVnode);
24,159,328✔
198
  int32_t totalRows = 0;
24,159,615✔
199

200
  const STqExecHandle* pExec = &pHandle->execHandle;
24,159,615✔
201
  qTaskInfo_t          task = pExec->task;
24,158,710✔
202

203
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
24,158,357✔
204
  TSDB_CHECK_CODE(code, lino, END);
24,156,282✔
205

206
  qStreamSetParams(task, pRequest->sourceExcluded, pRequest->minPollRows, pRequest->timeout, pRequest->enableReplay);
23,973,662✔
207
  do {
208
    SSDataBlock* pDataBlock = NULL;
23,969,719✔
209
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
23,970,719✔
210
    TSDB_CHECK_CODE(code, lino, END);
23,978,699✔
211

212
    if (pRequest->enableReplay) {
23,978,699✔
213
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
6,006✔
214
      TSDB_CHECK_CODE(code, lino, END);
6,006✔
215
      break;
6,006✔
216
    }
217
    if (pDataBlock == NULL) {
23,972,693✔
218
      break;
10,918,474✔
219
    }
220
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
13,054,219✔
221
    TSDB_CHECK_CODE(code, lino, END);
13,053,549✔
222

223
    pRsp->blockNum++;
13,053,549✔
224
    totalRows += pDataBlock->info.rows;
13,053,549✔
225
  } while(0);
226

227
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
23,977,759✔
228
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
23,978,429✔
229

230
END:
24,161,319✔
231
  if (code != 0) {
24,161,319✔
232
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
182,620✔
233
  }
234
  return code;
24,161,319✔
235
}
236

237
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
44,202✔
238
  int32_t code = 0;
44,202✔
239
  int32_t lino = 0;
44,202✔
240
  char* tbName = NULL;
44,202✔
241
  const STqExecHandle* pExec = &pHandle->execHandle;
44,202✔
242
  qTaskInfo_t          task = pExec->task;
44,202✔
243
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
44,202✔
244
  TSDB_CHECK_CODE(code, lino, END);
44,202✔
245

246
  qStreamSetParams(task, pRequest->sourceExcluded, pRequest->minPollRows, pRequest->timeout, false);
44,202✔
247

248
  int32_t rowCnt = 0;
44,202✔
249
  int64_t st = taosGetTimestampMs();
44,202✔
250
  while (1) {
1,425,695✔
251
    SSDataBlock* pDataBlock = NULL;
1,469,897✔
252
    uint64_t     ts = 0;
1,469,897✔
253
    tqDebug("tmqsnap task start to execute");
1,469,897✔
254
    code = qExecTask(task, &pDataBlock, &ts);
1,469,897✔
255
    TSDB_CHECK_CODE(code, lino, END);
1,469,897✔
256
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
1,469,897✔
257

258
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
1,469,897✔
259
      if (pRsp->withTbName) {
724,673✔
260
        tbName = taosStrdup(qExtractTbnameFromTask(task));
724,673✔
261
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
724,673✔
262
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
1,449,346✔
263
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
724,673✔
264
        tbName = NULL;
724,673✔
265
      }
266

267
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, qExtractSchemaFromTask(task), pTq->pVnode->config.tsdbCfg.precision);
724,673✔
268
      TSDB_CHECK_CODE(code, lino, END);
724,379✔
269

270
      pRsp->blockNum++;
724,379✔
271
      rowCnt += pDataBlock->info.rows;
724,379✔
272
      if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
1,418,350✔
273
        continue;
693,971✔
274
      }
275
    }
276

277
    // get meta
278
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
776,220✔
279
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
775,926✔
280
      code = qStreamExtractOffset(task, &tmp->rspOffset);
4,231✔
281
      TSDB_CHECK_CODE(code, lino, END);
4,231✔
282
      *pBatchMetaRsp = *tmp;
4,231✔
283
      tqDebug("tmqsnap task get meta");
4,231✔
284
      break;
4,231✔
285
    }
286

287
    if (pDataBlock == NULL) {
771,695✔
288
      code = qStreamExtractOffset(task, pOffset);
740,993✔
289
      TSDB_CHECK_CODE(code, lino, END);
740,993✔
290

291
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
740,993✔
292
        continue;
731,724✔
293
      }
294

295
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
9,269✔
296
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
9,269✔
297
      break;
9,269✔
298
    }
299

300
    if (pRsp->blockNum > 0) {
30,702✔
301
      tqDebug("tmqsnap task exec exited, get data");
30,702✔
302
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
30,702✔
303
      break;
30,702✔
304
    }
305
  }
306
  tqDebug("%s:%d success", __FUNCTION__, lino);
44,202✔
307
END:
44,202✔
308
  if (code != 0){
44,202✔
UNCOV
309
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
310
  }
311
  taosMemoryFree(tbName);
44,202✔
312
  return code;
44,202✔
313
}
314

315
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
43,624,916✔
316
                             const SMqPollReq* pRequest, SArray* rawList){
317
  int32_t code = 0;
43,624,916✔
318
  int32_t lino = 0;
43,624,916✔
319
  SArray* pBlocks = NULL;
43,624,916✔
320
  SArray* pSchemas = NULL;
43,624,916✔
321

322
  STqExecHandle* pExec = &pHandle->execHandle;
43,624,916✔
323
  STqReader* pReader = pExec->pTqReader;
43,631,896✔
324

325
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
43,624,075✔
326
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
43,637,319✔
327
  pSchemas = taosArrayInit(0, sizeof(void*));
43,637,319✔
328
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
43,643,917✔
329

330
  SSubmitTbData* pSubmitTbData = NULL;
43,643,917✔
331
  code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
43,639,732✔
332
  TSDB_CHECK_CODE(code, lino, END);
43,592,289✔
333
  bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
43,393,226✔
334
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
43,422,596✔
335

336
  if (pHandle->fetchMeta == ONLY_META){
43,394,930✔
337
    goto END;
6,624✔
338
  }
339

340
  int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
43,407,501✔
341
  if (pRsp->withTbName) {
43,409,094✔
342
    int64_t uid = pExec->pTqReader->lastBlkUid;
43,404,395✔
343
    code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
43,408,171✔
344
    TSDB_CHECK_CODE(code, lino, END);
43,331,655✔
345
  }
346

347
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
43,333,961✔
348
  for (int32_t i = 0; i < blockNum; i++) {
86,724,516✔
349
    if (taosArrayGetSize(pBlocks) == 0){
43,322,745✔
UNCOV
350
      void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
×
UNCOV
351
      if (rawData == NULL) {
×
UNCOV
352
        continue;
×
353
      }
354
      if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
×
355
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
356
        continue;
×
357
      }
358
      *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
×
359
    } else {
360
      SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
43,383,774✔
361
      if (pBlock == NULL) {
43,362,988✔
UNCOV
362
        continue;
×
363
      }
364

365
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pSchemas, i);
43,362,988✔
366
      if (tqAddBlockDataToRsp(pBlock, pRsp, pSW, pTq->pVnode->config.tsdbCfg.precision) != 0){
43,402,173✔
UNCOV
367
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
368
        continue;
×
369
      }
370
      *totalRows += pBlock->info.rows;
43,371,819✔
371
    }
372

373
    pRsp->blockNum++;
43,398,861✔
374
  }
375
  tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
43,401,771✔
376
END:
43,635,124✔
377
  if (code != 0) {
43,567,856✔
378
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
199,063✔
379
  }
380
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
43,567,856✔
381
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
43,573,992✔
382
}
43,614,479✔
383

384
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
44,766,679✔
385
  STqExecHandle* pExec = &pHandle->execHandle;
44,766,679✔
386
  STqReader* pReader = pExec->pTqReader;
44,767,304✔
387
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
44,770,659✔
388
  for (int32_t i = 0; i < blockSz; i++){
89,560,197✔
389
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
44,786,496✔
390
    if (pSubmitTbData== NULL){
44,789,192✔
UNCOV
391
      taosArrayDestroy(*rawList);
×
UNCOV
392
      *rawList = NULL;
×
UNCOV
393
      return;
×
394
    }
395

396
    int64_t uid = pSubmitTbData->uid;
44,789,192✔
397
    if (pRequest->rawData) {
44,791,865✔
UNCOV
398
      if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
×
UNCOV
399
        tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
×
UNCOV
400
        terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
401
        return;
×
402
      } else {
403
        int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
×
404
        if (code != 0) {
×
UNCOV
405
          tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid);
×
406
        }
407
      }
408
    }
409

410
    if (pSubmitTbData->pCreateTbReq == NULL){
44,792,484✔
411
      continue;
43,908,508✔
412
    }
413

414
    int64_t createTime = INT64_MAX;
878,564✔
415
    int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
879,200✔
416
    if (cTime != NULL){
879,835✔
417
      createTime = *cTime;
3,494✔
418
    } else{
419
      createTime = metaGetTableCreateTime(pReader->pVnode->pMeta, uid, 1);
876,341✔
420
      if (createTime != INT64_MAX){
872,527✔
421
        int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
875,387✔
422
        if (code != 0){
876,341✔
UNCOV
423
          tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
×
424
        }
425
      }
426
    }
427
    if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
876,975✔
428
      tDestroySVSubmitCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
868,314✔
429
      taosMemoryFreeClear(pSubmitTbData->pCreateTbReq);
864,819✔
430
    } else{
431
      taosArrayDestroy(*rawList);
10,250✔
432
      *rawList = NULL;
10,250✔
433
    }
434
  }
435
}
436

437
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
44,777,095✔
438
  int32_t code = 0;
44,777,095✔
439
  int32_t lino = 0;
44,777,095✔
440
  SDecoder decoder = {0};
44,777,095✔
441
  STqExecHandle* pExec = &pHandle->execHandle;
44,777,423✔
442
  STqReader* pReader = pExec->pTqReader;
44,777,423✔
443
  SArray *rawList = NULL;
44,777,423✔
444
  if (pRequest->rawData){
44,777,423✔
UNCOV
445
    rawList = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
446
    TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
×
447
  }
448
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList, &decoder);
44,777,088✔
449
  TSDB_CHECK_CODE(code, lino, END);
44,771,093✔
450
  preProcessSubmitMsg(pHandle, pRequest, &rawList);
44,771,093✔
451
  // data could not contains same uid data in rawdata mode
452
  if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
44,758,896✔
UNCOV
453
    goto END;
×
454
  }
455

456
  // this submit data is metadata and previous data is rawdata
457
  if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
44,771,631✔
UNCOV
458
    tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
×
UNCOV
459
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
UNCOV
460
    goto END;
×
461
  }
462

463
  // this submit data is rawdata and previous data is metadata
464
  if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
44,762,083✔
UNCOV
465
    tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
×
UNCOV
466
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
UNCOV
467
    goto END;
×
468
  }
469

470
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
44,757,217✔
471
    while (tqNextBlockImpl(pReader, NULL)) {
22,968,443✔
472
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
10,909,042✔
473
    }
474
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
32,697,152✔
475
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
65,369,421✔
476
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
32,724,466✔
477
    }
478
  }
479

480
END:
44,686,080✔
481
  tDecoderClear(&decoder);
44,732,398✔
482
  tqReaderClearSubmitMsg(pReader);
44,746,611✔
483
  taosArrayDestroy(rawList);
44,770,278✔
484
  if (code != 0){
44,768,939✔
UNCOV
485
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
486
  }
487
  return code;
44,768,939✔
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc