• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4824

27 Oct 2025 05:39AM UTC coverage: 61.409% (+0.2%) from 61.242%
#4824

push

travis-ci

web-flow
Merge pull request #33376 from taosdata/3.0

merge 3.0

156919 of 324854 branches covered (48.3%)

Branch coverage included in aggregate %.

371 of 493 new or added lines in 25 files covered. (75.25%)

2822 existing lines in 108 files now uncovered.

208404 of 270043 relevant lines covered (77.17%)

229356519.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.98
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
×
18
  int32_t    code = TDB_CODE_SUCCESS;
×
19
  int32_t    lino = 0;
×
20
  void*      buf = NULL;
×
21

22
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
×
23
  buf = taosMemoryCalloc(1, dataStrLen);
×
24
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
27
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
×
28
  pRetrieve->precision = precision;
×
29
  pRetrieve->compressed = 0;
×
30

31
  memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
×
32
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
×
33
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
34
  pRsp->blockDataElementFree = true;
×
35

36
  tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
37
  END:
×
38
  if (code != TSDB_CODE_SUCCESS) {
×
39
    taosMemoryFree(buf);
×
40
    tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
×
41
  }
42
  return code;
×
43
}
44

45
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
274,196,001✔
46
  int32_t code = 0;
274,196,001✔
47
  int32_t lino = 0;
274,196,001✔
48

49
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
274,196,001✔
50
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
274,436,895✔
51
  void*   buf = taosMemoryCalloc(1, dataStrLen);
274,436,895!
52
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
274,066,569!
53

54
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
274,066,569✔
55
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
274,066,569✔
56
  pRetrieve->precision = precision;
274,127,563✔
57
  pRetrieve->compressed = 0;
274,197,133✔
58
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
274,224,571✔
59

60
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
274,285,955✔
61
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
274,127,782!
62

63
  actualLen += sizeof(SRetrieveTableRspForTmq);
274,127,782✔
64
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
548,501,535!
65
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
548,747,365!
66
  pRsp->blockDataElementFree = true;
274,373,612✔
67
  tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
274,305,323!
68

69
END:
274,305,323✔
70
  if (code != TSDB_CODE_SUCCESS){
274,343,817!
71
    taosMemoryFree(buf);
×
72
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
73
  }
74
  return code;
274,343,817✔
75
}
76

77
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
100,312,235✔
78
  int32_t    code = TDB_CODE_SUCCESS;
100,312,235✔
79
  int32_t    lino = 0;
100,312,235✔
80
  SMetaReader mr = {0};
100,312,235✔
81

82
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
100,376,764!
83
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
100,376,764!
84

85
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
100,376,764✔
86

87
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
100,246,677✔
88
  TSDB_CHECK_CODE(code, lino, END);
100,240,641!
89

90
  for (int32_t i = 0; i < n; i++) {
200,693,663✔
91
    char* tbName = taosStrdup(mr.me.name);
100,112,810!
92
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
100,254,768!
93
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
200,698,490!
94
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
95
      continue;
×
96
    }
97
    tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
100,443,722!
98
  }
99

100
END:
100,581,584✔
101
  if (code != TSDB_CODE_SUCCESS) {
100,369,464!
102
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
103
  }
104
  metaReaderClear(&mr);
100,369,464✔
105
  return code;
100,389,846✔
106
}
107

108
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
199,920,034✔
109
  if (task == NULL || pHandle == NULL || res == NULL) {
199,920,034!
110
    return TSDB_CODE_INVALID_PARA;
×
111
  }
112
  uint64_t ts = 0;
199,951,616✔
113
  qStreamSetOpen(task);
199,953,427✔
114

115
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
199,936,270✔
116
  int32_t code = qExecTask(task, res, &ts);
199,963,505✔
117
  if (code != TSDB_CODE_SUCCESS) {
199,972,931!
118
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
119
    return code;
×
120
  }
121

122
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
199,972,931✔
123
  return 0;
199,971,473✔
124
}
125

126
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
22,242✔
127
  int32_t code = 0;
22,242✔
128
  int32_t lino = 0;
22,242✔
129

130
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
22,242✔
131
    blockDataDestroy(pHandle->block);
674✔
132
    pHandle->block = NULL;
674✔
133
  }
134
  if (pHandle->block == NULL) {
22,242✔
135
    if (pDataBlock == NULL) {
7,414✔
136
      goto END;
4,044✔
137
    }
138

139
    STqOffsetVal offset = {0};
3,370✔
140
    code = qStreamExtractOffset(task, &offset);
3,370✔
141
    TSDB_CHECK_CODE(code, lino, END);
3,370!
142

143
    pHandle->block = NULL;
3,370✔
144

145
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
3,370✔
146
    TSDB_CHECK_CODE(code, lino, END);
3,370!
147

148
    pHandle->blockTime = offset.ts;
3,370✔
149
    tOffsetDestroy(&offset);
3,370✔
150
    int32_t vgId = TD_VID(pTq->pVnode);
3,370✔
151
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
3,370✔
152
    TSDB_CHECK_CODE(code, lino, END);
3,370!
153
  }
154

155
  const STqExecHandle* pExec = &pHandle->execHandle;
18,198✔
156
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
18,198✔
157
  TSDB_CHECK_CODE(code, lino, END);
18,198!
158

159
  pRsp->blockNum++;
18,198✔
160
  if (pDataBlock == NULL) {
18,198✔
161
    blockDataDestroy(pHandle->block);
2,696✔
162
    pHandle->block = NULL;
2,696✔
163
  } else {
164
    code = copyDataBlock(pHandle->block, pDataBlock);
15,502✔
165
    TSDB_CHECK_CODE(code, lino, END);
15,502!
166

167
    STqOffsetVal offset = {0};
15,502✔
168
    code = qStreamExtractOffset(task, &offset);
15,502✔
169
    TSDB_CHECK_CODE(code, lino, END);
15,502!
170

171
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
15,502✔
172
    pHandle->blockTime = offset.ts;
15,502✔
173
    tOffsetDestroy(&offset);
15,502✔
174
  }
175

176
END:
22,242✔
177
  if (code != TSDB_CODE_SUCCESS) {
22,242!
178
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
179
  }
180
  return code;
22,242✔
181
}
182

183
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
30,264,264✔
184
  int32_t code = 0;
30,264,264✔
185
  int32_t lino = 0;
30,264,264✔
186
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
30,264,264!
187
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
30,264,264!
188
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
30,264,264!
189
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
30,264,264!
190
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
30,264,264!
191

192
  int32_t vgId = TD_VID(pTq->pVnode);
30,264,264✔
193
  int32_t totalRows = 0;
30,270,393✔
194

195
  const STqExecHandle* pExec = &pHandle->execHandle;
30,270,393✔
196
  qTaskInfo_t          task = pExec->task;
30,267,866✔
197

198
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
30,266,583✔
199
  TSDB_CHECK_CODE(code, lino, END);
30,265,793✔
200

201
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
29,829,924✔
202
  int64_t st = taosGetTimestampMs();
29,827,792✔
203
  while (1) {
170,112,650✔
204
    SSDataBlock* pDataBlock = NULL;
199,940,442✔
205
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
199,926,249✔
206
    TSDB_CHECK_CODE(code, lino, END);
199,969,561!
207

208
    if (pRequest->enableReplay) {
199,969,561✔
209
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
22,242✔
210
      TSDB_CHECK_CODE(code, lino, END);
22,242!
211
      break;
22,242✔
212
    }
213
    if (pDataBlock == NULL) {
199,946,595✔
214
      break;
27,731,868✔
215
    }
216
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
172,214,727✔
217
    TSDB_CHECK_CODE(code, lino, END);
172,166,219!
218

219
    pRsp->blockNum++;
172,166,219✔
220
    totalRows += pDataBlock->info.rows;
172,174,142✔
221
    if (totalRows >= pRequest->minPollRows || (taosGetTimestampMs() - st > pRequest->timeout)) {
342,478,587✔
222
      break;
223
    }
224
  }
225

226
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
29,838,177!
227
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
29,838,177✔
228

229
END:
30,274,046✔
230
  if (code != 0) {
30,274,046✔
231
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
435,869!
232
  }
233
  return code;
30,274,046✔
234
}
235

236
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
103,467✔
237
  int32_t code = 0;
103,467✔
238
  int32_t lino = 0;
103,467✔
239
  char* tbName = NULL;
103,467✔
240
  SSchemaWrapper* pSW = NULL;
105,581✔
241
  const STqExecHandle* pExec = &pHandle->execHandle;
105,581✔
242
  qTaskInfo_t          task = pExec->task;
105,581✔
243
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
105,581✔
244
  TSDB_CHECK_CODE(code, lino, END);
105,581!
245

246
  int32_t rowCnt = 0;
105,581✔
247
  int64_t st = taosGetTimestampMs();
105,581✔
248
  while (1) {
3,431,526✔
249
    SSDataBlock* pDataBlock = NULL;
3,537,107✔
250
    uint64_t     ts = 0;
3,536,415✔
251
    tqDebug("tmqsnap task start to execute");
3,537,107!
252
    code = qExecTask(task, &pDataBlock, &ts);
3,537,799✔
253
    TSDB_CHECK_CODE(code, lino, END);
3,537,799!
254
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
3,537,799!
255

256
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
3,537,799!
257
      if (pRsp->withTbName) {
1,728,783!
258
        tbName = taosStrdup(qExtractTbnameFromTask(task));
1,728,783!
259
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
1,728,783!
260
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
3,457,566!
261
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
1,728,783!
262
        tbName = NULL;
1,728,783✔
263
      }
264
      if (pRsp->withSchema) {
1,728,783!
265
        pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
1,728,783✔
266
        TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
1,728,783!
267
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
3,457,566!
268
        pSW = NULL;
1,728,783✔
269
      }
270

271
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
1,728,783✔
272
                                 pTq->pVnode->config.tsdbCfg.precision);
1,728,783✔
273
      TSDB_CHECK_CODE(code, lino, END);
1,728,783!
274

275
      pRsp->blockNum++;
1,728,783✔
276
      rowCnt += pDataBlock->info.rows;
1,728,783✔
277
      if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
3,388,343!
278
        continue;
1,659,560✔
279
      }
280
    }
281

282
    // get meta
283
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
1,877,547✔
284
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
1,878,239✔
285
      code = qStreamExtractOffset(task, &tmp->rspOffset);
11,222✔
286
      TSDB_CHECK_CODE(code, lino, END);
11,222!
287
      *pBatchMetaRsp = *tmp;
11,222✔
288
      tqDebug("tmqsnap task get meta");
11,222!
289
      break;
11,222✔
290
    }
291

292
    if (pDataBlock == NULL) {
1,867,017✔
293
      code = qStreamExtractOffset(task, pOffset);
1,797,794✔
294
      TSDB_CHECK_CODE(code, lino, END);
1,797,794!
295

296
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1,797,794✔
297
        continue;
1,772,658✔
298
      }
299

300
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
25,136!
301
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
25,136✔
302
      break;
25,136✔
303
    }
304

305
    if (pRsp->blockNum > 0) {
69,223!
306
      tqDebug("tmqsnap task exec exited, get data");
69,223!
307
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
69,223✔
308
      break;
69,223✔
309
    }
310
  }
311
  tqDebug("%s:%d success", __FUNCTION__, lino);
105,581!
312
END:
106,273✔
313
  if (code != 0){
105,581!
314
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
315
  }
316
  tDeleteSchemaWrapper(pSW);
105,581!
317
  taosMemoryFree(tbName);
105,581!
318
  return code;
105,581✔
319
}
320

321
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
100,972,347✔
322
                             const SMqPollReq* pRequest, SArray* rawList){
323
  int32_t code = 0;
100,972,347✔
324
  int32_t lino = 0;
100,972,347✔
325
  SArray* pBlocks = NULL;
100,972,347✔
326
  SArray* pSchemas = NULL;
100,972,347✔
327

328
  STqExecHandle* pExec = &pHandle->execHandle;
100,972,347✔
329
  STqReader* pReader = pExec->pTqReader;
101,020,627✔
330

331
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
101,011,515✔
332
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
100,993,333!
333
  pSchemas = taosArrayInit(0, sizeof(void*));
100,993,333✔
334
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
101,007,012!
335

336
  SSubmitTbData* pSubmitTbData = NULL;
101,007,012✔
337
  code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
101,018,328✔
338
  TSDB_CHECK_CODE(code, lino, END);
100,767,304✔
339
  bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
100,302,917✔
340
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
100,525,349✔
341

342
  if (pHandle->fetchMeta == ONLY_META){
100,453,905✔
343
    goto END;
14,828✔
344
  }
345

346
  int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
100,454,558!
347
  if (pRsp->withTbName) {
100,439,473!
348
    int64_t uid = pExec->pTqReader->lastBlkUid;
100,464,303✔
349
    code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
100,462,455✔
350
    TSDB_CHECK_CODE(code, lino, END);
100,212,008!
351
  }
352

353
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
100,197,189!
354
  for (int32_t i = 0; i < blockNum; i++) {
200,645,909✔
355
    if (taosArrayGetSize(pBlocks) == 0){
100,116,802!
356
      void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
×
357
      if (rawData == NULL) {
×
358
        continue;
×
359
      }
360
      if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
×
361
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
362
        continue;
×
363
      }
364
      *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
×
365
    } else {
366
      SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
100,277,049✔
367
      if (pBlock == NULL) {
100,281,550!
368
        continue;
×
369
      }
370

371
      if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
100,281,550!
372
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
373
        continue;
×
374
      }
375
      *totalRows += pBlock->info.rows;
100,189,709✔
376
    }
377

378
    void** pSW = taosArrayGet(pSchemas, i);
100,407,715✔
379
    if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
200,843,472!
380
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
381
      continue;
×
382
    }
383
    *pSW = NULL;
100,446,630✔
384
    pRsp->blockNum++;
100,262,631✔
385
  }
386
  tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
100,529,107!
387
END:
101,079,766✔
388
  if (code != 0) {
100,748,797✔
389
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
465,120!
390
  }
391
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
100,748,797✔
392
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
100,710,569✔
393
}
100,898,712✔
394

395
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
103,489,469✔
396
  STqExecHandle* pExec = &pHandle->execHandle;
103,489,469✔
397
  STqReader* pReader = pExec->pTqReader;
103,516,436✔
398
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
103,527,201✔
399
  for (int32_t i = 0; i < blockSz; i++){
207,112,029✔
400
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
103,556,397✔
401
    if (pSubmitTbData== NULL){
103,576,622!
402
      taosArrayDestroy(*rawList);
×
403
      *rawList = NULL;
×
404
      return;
×
405
    }
406

407
    int64_t uid = pSubmitTbData->uid;
103,576,622✔
408
    if (pRequest->rawData) {
103,570,457!
409
      if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
×
410
        tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
×
411
        terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
412
        return;
×
413
      } else {
414
        int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
×
415
        if (code != 0) {
×
416
          tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid);
×
417
        }
418
      }
419
    }
420

421
    if (pSubmitTbData->pCreateTbReq == NULL){
103,588,795✔
422
      continue;
103,390,327✔
423
    }
424

425
    int64_t createTime = INT64_MAX;
166,931✔
426
    int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
166,931✔
427
    if (cTime != NULL){
166,931!
UNCOV
428
      createTime = *cTime;
×
429
    } else{
430
      createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1);
166,931✔
431
      if (createTime != INT64_MAX){
166,931!
432
        int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
166,931✔
433
        if (code != 0){
166,931!
434
          tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
×
435
        }
436
      }
437
    }
438
    if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
166,931!
439
      tDestroySVSubmitCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
143,274✔
440
      taosMemoryFreeClear(pSubmitTbData->pCreateTbReq);
143,274!
441
    } else{
442
      taosArrayDestroy(*rawList);
23,657✔
443
      *rawList = NULL;
23,657✔
444
    }
445
  }
446
}
447

448
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
103,552,847✔
449
  int32_t code = 0;
103,552,847✔
450
  int32_t lino = 0;
103,552,847✔
451
  SDecoder decoder = {0};
103,552,847✔
452
  STqExecHandle* pExec = &pHandle->execHandle;
103,554,286✔
453
  STqReader* pReader = pExec->pTqReader;
103,554,980✔
454
  SArray *rawList = NULL;
103,554,980✔
455
  if (pRequest->rawData){
103,554,980!
456
    rawList = taosArrayInit(0, POINTER_BYTES);
×
457
    TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
×
458
  }
459
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList, &decoder);
103,551,992✔
460
  TSDB_CHECK_CODE(code, lino, END);
103,513,347!
461
  preProcessSubmitMsg(pHandle, pRequest, &rawList);
103,513,347✔
462
  // data could not contains same uid data in rawdata mode
463
  if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
103,488,116!
464
    goto END;
×
465
  }
466

467
  // this submit data is metadata and previous data is rawdata
468
  if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
103,533,818!
469
    tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
×
470
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
471
    goto END;
×
472
  }
473

474
  // this submit data is rawdata and previous data is metadata
475
  if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
103,483,011!
476
    tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
×
477
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
478
    goto END;
×
479
  }
480

481
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
103,483,117✔
482
    while (tqNextBlockImpl(pReader, NULL)) {
50,099,694✔
483
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
23,774,773✔
484
    }
485
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
77,145,851!
486
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
154,097,240✔
487
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
77,229,202✔
488
    }
489
  }
490

491
END:
103,151,506✔
492
  tDecoderClear(&decoder);
103,332,411✔
493
  tqReaderClearSubmitMsg(pReader);
103,454,034✔
494
  taosArrayDestroy(rawList);
103,517,788✔
495
  if (code != 0){
103,510,070!
496
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
497
  }
498
  return code;
103,510,070✔
499
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc