• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4898

26 Dec 2025 09:58AM UTC coverage: 65.061% (-0.7%) from 65.717%
#4898

push

travis-ci

web-flow
feat: support encryption of configuration files, data files and metadata files (#33801)

350 of 1333 new or added lines in 31 files covered. (26.26%)

2796 existing lines in 159 files now uncovered.

184024 of 282850 relevant lines covered (65.06%)

113940470.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.49
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
UNCOV
17
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
×
UNCOV
18
  int32_t    code = TDB_CODE_SUCCESS;
×
UNCOV
19
  int32_t    lino = 0;
×
UNCOV
20
  void*      buf = NULL;
×
21

UNCOV
22
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
×
UNCOV
23
  buf = taosMemoryCalloc(1, dataStrLen);
×
UNCOV
24
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
25

UNCOV
26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
UNCOV
27
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
×
UNCOV
28
  pRetrieve->precision = precision;
×
UNCOV
29
  pRetrieve->compressed = 0;
×
30

UNCOV
31
  memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
×
UNCOV
32
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
×
UNCOV
33
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
UNCOV
34
  pRsp->blockDataElementFree = true;
×
35

UNCOV
36
  tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
UNCOV
37
  END:
×
UNCOV
38
  if (code != TSDB_CODE_SUCCESS) {
×
39
    taosMemoryFree(buf);
×
40
    tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
×
41
  }
UNCOV
42
  return code;
×
43
}
44

45
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, const SSchemaWrapper* pSW, int8_t precision) {
143,187,298✔
46
  int32_t code = 0;
143,187,298✔
47
  int32_t lino = 0;
143,187,298✔
48
  SSchemaWrapper* pSchema = NULL;
143,187,298✔
49
  
50
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
143,203,026✔
51
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
143,255,018✔
52
  void*   buf = taosMemoryCalloc(1, dataStrLen);
143,255,018✔
53
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
143,157,270✔
54

55
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
143,157,270✔
56
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
143,157,270✔
57
  pRetrieve->precision = precision;
143,140,113✔
58
  pRetrieve->compressed = 0;
143,175,695✔
59
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
143,176,200✔
60

61
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, pSW->nCols);
143,193,254✔
62
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
143,200,030✔
63

64
  actualLen += sizeof(SRetrieveTableRspForTmq);
143,200,030✔
65
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
286,440,284✔
66
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
286,479,353✔
67
  pSchema = tCloneSSchemaWrapper(pSW);
143,228,714✔
68
  TSDB_CHECK_NULL(pSchema, code, lino, END, terrno);
143,228,714✔
69
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSchema), code, lino, END, terrno);
286,461,900✔
70
  pSchema = NULL;
143,233,186✔
71
  pRsp->blockDataElementFree = true;
143,233,186✔
72
  tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
143,192,792✔
73

74
END:
143,192,792✔
75
  tDeleteSchemaWrapper(pSchema);
143,194,122✔
76
  if (code != TSDB_CODE_SUCCESS){
143,134,294✔
77
    taosMemoryFree(buf);
×
78
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
79
  }
80
  return code;
143,134,294✔
81
}
82

83
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
40,935,337✔
84
  int32_t    code = TDB_CODE_SUCCESS;
40,935,337✔
85
  int32_t    lino = 0;
40,935,337✔
86
  SMetaReader mr = {0};
40,935,337✔
87

88
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
40,943,852✔
89
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
40,943,852✔
90

91
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
40,943,852✔
92

93
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
40,926,247✔
94
  TSDB_CHECK_CODE(code, lino, END);
40,898,670✔
95

96
  for (int32_t i = 0; i < n; i++) {
81,876,842✔
97
    char* tbName = taosStrdup(mr.me.name);
40,868,055✔
98
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
40,908,056✔
99
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
81,883,556✔
100
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
101
      continue;
×
102
    }
103
    tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
40,975,500✔
104
  }
105

106
END:
41,008,173✔
107
  if (code != TSDB_CODE_SUCCESS) {
40,958,413✔
108
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
109
  }
110
  metaReaderClear(&mr);
40,958,413✔
111
  return code;
40,890,159✔
112
}
113

114
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
119,784,723✔
115
  if (task == NULL || pHandle == NULL || res == NULL) {
119,784,723✔
116
    return TSDB_CODE_INVALID_PARA;
×
117
  }
118
  uint64_t ts = 0;
119,795,407✔
119
  qStreamSetOpen(task);
119,792,896✔
120

121
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
119,792,047✔
122
  int32_t code = qExecTask(task, res, &ts);
119,800,930✔
123
  if (code != TSDB_CODE_SUCCESS) {
119,804,819✔
124
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
125
    return code;
×
126
  }
127

128
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
119,804,819✔
129
  return 0;
119,804,143✔
130
}
131

132
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
8,712✔
133
  int32_t code = 0;
8,712✔
134
  int32_t lino = 0;
8,712✔
135

136
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
8,712✔
137
    blockDataDestroy(pHandle->block);
264✔
138
    pHandle->block = NULL;
264✔
139
  }
140
  if (pHandle->block == NULL) {
8,712✔
141
    if (pDataBlock == NULL) {
2,904✔
142
      goto END;
1,584✔
143
    }
144

145
    STqOffsetVal offset = {0};
1,320✔
146
    code = qStreamExtractOffset(task, &offset);
1,320✔
147
    TSDB_CHECK_CODE(code, lino, END);
1,320✔
148

149
    pHandle->block = NULL;
1,320✔
150

151
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
1,320✔
152
    TSDB_CHECK_CODE(code, lino, END);
1,320✔
153

154
    pHandle->blockTime = offset.ts;
1,320✔
155
    tOffsetDestroy(&offset);
1,320✔
156
    int32_t vgId = TD_VID(pTq->pVnode);
1,320✔
157
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
1,320✔
158
    TSDB_CHECK_CODE(code, lino, END);
1,320✔
159
  }
160

161
  const STqExecHandle* pExec = &pHandle->execHandle;
7,128✔
162
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
7,128✔
163
  TSDB_CHECK_CODE(code, lino, END);
7,128✔
164

165
  pRsp->blockNum++;
7,128✔
166
  if (pDataBlock == NULL) {
7,128✔
167
    blockDataDestroy(pHandle->block);
1,056✔
168
    pHandle->block = NULL;
1,056✔
169
  } else {
170
    code = copyDataBlock(pHandle->block, pDataBlock);
6,072✔
171
    TSDB_CHECK_CODE(code, lino, END);
6,072✔
172

173
    STqOffsetVal offset = {0};
6,072✔
174
    code = qStreamExtractOffset(task, &offset);
6,072✔
175
    TSDB_CHECK_CODE(code, lino, END);
6,072✔
176

177
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
6,072✔
178
    pHandle->blockTime = offset.ts;
6,072✔
179
    tOffsetDestroy(&offset);
6,072✔
180
  }
181

182
END:
8,712✔
183
  if (code != TSDB_CODE_SUCCESS) {
8,712✔
184
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
185
  }
186
  return code;
8,712✔
187
}
188

189
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
19,468,588✔
190
  int32_t code = 0;
19,468,588✔
191
  int32_t lino = 0;
19,468,588✔
192
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
19,468,588✔
193
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
19,468,588✔
194
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
19,468,588✔
195
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
19,468,588✔
196
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
19,468,588✔
197

198
  int32_t vgId = TD_VID(pTq->pVnode);
19,468,588✔
199
  int32_t totalRows = 0;
19,468,893✔
200

201
  const STqExecHandle* pExec = &pHandle->execHandle;
19,468,893✔
202
  qTaskInfo_t          task = pExec->task;
19,468,893✔
203

204
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
19,467,995✔
205
  TSDB_CHECK_CODE(code, lino, END);
19,467,113✔
206

207
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
19,274,704✔
208
  int64_t st = taosGetTimestampMs();
19,271,596✔
209
  while (1) {
100,519,230✔
210
    SSDataBlock* pDataBlock = NULL;
119,790,826✔
211
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
119,781,971✔
212
    TSDB_CHECK_CODE(code, lino, END);
119,799,817✔
213

214
    if (pRequest->enableReplay) {
119,799,817✔
215
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
8,712✔
216
      TSDB_CHECK_CODE(code, lino, END);
8,712✔
217
      break;
8,712✔
218
    }
219
    if (pDataBlock == NULL) {
119,795,781✔
220
      break;
18,263,914✔
221
    }
222
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
101,531,867✔
223
    TSDB_CHECK_CODE(code, lino, END);
101,515,863✔
224

225
    pRsp->blockNum++;
101,515,863✔
226
    totalRows += pDataBlock->info.rows;
101,516,591✔
227
    if (totalRows >= pRequest->minPollRows || (taosGetTimestampMs() - st > pRequest->timeout)) {
202,328,601✔
228
      break;
229
    }
230
  }
231

232
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
19,276,823✔
233
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
19,276,823✔
234

235
END:
19,469,232✔
236
  if (code != 0) {
19,469,232✔
237
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
192,409✔
238
  }
239
  return code;
19,469,232✔
240
}
241

242
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
47,702✔
243
  int32_t code = 0;
47,702✔
244
  int32_t lino = 0;
47,702✔
245
  char* tbName = NULL;
47,702✔
246
  const STqExecHandle* pExec = &pHandle->execHandle;
47,702✔
247
  qTaskInfo_t          task = pExec->task;
47,702✔
248
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
47,702✔
249
  TSDB_CHECK_CODE(code, lino, END);
47,403✔
250

251
  int32_t rowCnt = 0;
47,403✔
252
  int64_t st = taosGetTimestampMs();
47,403✔
253
  while (1) {
1,411,191✔
254
    SSDataBlock* pDataBlock = NULL;
1,458,594✔
255
    uint64_t     ts = 0;
1,458,295✔
256
    tqDebug("tmqsnap task start to execute");
1,458,295✔
257
    code = qExecTask(task, &pDataBlock, &ts);
1,458,594✔
258
    TSDB_CHECK_CODE(code, lino, END);
1,458,893✔
259
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
1,458,893✔
260

261
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
1,458,606✔
262
      if (pRsp->withTbName) {
730,692✔
263
        tbName = taosStrdup(qExtractTbnameFromTask(task));
730,692✔
264
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
730,692✔
265
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
1,461,384✔
266
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
730,692✔
267
        tbName = NULL;
730,405✔
268
      }
269

270
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, qExtractSchemaFromTask(task), pTq->pVnode->config.tsdbCfg.precision);
730,405✔
271
      TSDB_CHECK_CODE(code, lino, END);
730,692✔
272

273
      pRsp->blockNum++;
730,692✔
274
      rowCnt += pDataBlock->info.rows;
730,393✔
275
      if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
1,426,787✔
276
        continue;
695,796✔
277
      }
278
    }
279

280
    // get meta
281
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
762,511✔
282
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
762,798✔
283
      code = qStreamExtractOffset(task, &tmp->rspOffset);
4,084✔
284
      TSDB_CHECK_CODE(code, lino, END);
4,084✔
285
      *pBatchMetaRsp = *tmp;
4,084✔
286
      tqDebug("tmqsnap task get meta");
4,084✔
287
      break;
4,084✔
288
    }
289

290
    if (pDataBlock == NULL) {
758,714✔
291
      code = qStreamExtractOffset(task, pOffset);
724,117✔
292
      TSDB_CHECK_CODE(code, lino, END);
724,117✔
293

294
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
724,117✔
295
        continue;
715,096✔
296
      }
297

298
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
9,021✔
299
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
9,021✔
300
      break;
9,021✔
301
    }
302

303
    if (pRsp->blockNum > 0) {
34,597✔
304
      tqDebug("tmqsnap task exec exited, get data");
34,597✔
305
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
34,597✔
306
      break;
34,597✔
307
    }
308
  }
309
  tqDebug("%s:%d success", __FUNCTION__, lino);
47,702✔
310
END:
47,702✔
311
  if (code != 0){
47,702✔
312
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
313
  }
314
  taosMemoryFree(tbName);
47,702✔
315
  return code;
47,702✔
316
}
317

318
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
41,193,502✔
319
                             const SMqPollReq* pRequest, SArray* rawList){
320
  int32_t code = 0;
41,193,502✔
321
  int32_t lino = 0;
41,193,502✔
322
  SArray* pBlocks = NULL;
41,193,502✔
323
  SArray* pSchemas = NULL;
41,193,502✔
324

325
  STqExecHandle* pExec = &pHandle->execHandle;
41,193,502✔
326
  STqReader* pReader = pExec->pTqReader;
41,213,454✔
327

328
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
41,214,238✔
329
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
41,209,857✔
330
  pSchemas = taosArrayInit(0, sizeof(void*));
41,209,857✔
331
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
41,214,266✔
332

333
  SSubmitTbData* pSubmitTbData = NULL;
41,214,266✔
334
  code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
41,213,341✔
335
  TSDB_CHECK_CODE(code, lino, END);
41,135,987✔
336
  bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
40,941,976✔
337
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
41,006,423✔
338

339
  if (pHandle->fetchMeta == ONLY_META){
40,979,817✔
340
    goto END;
6,392✔
341
  }
342

343
  int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
40,986,400✔
344
  if (pRsp->withTbName) {
40,986,726✔
345
    int64_t uid = pExec->pTqReader->lastBlkUid;
40,987,070✔
346
    code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
40,987,646✔
347
    TSDB_CHECK_CODE(code, lino, END);
40,897,579✔
348
  }
349

350
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
40,897,849✔
351
  for (int32_t i = 0; i < blockNum; i++) {
81,876,714✔
352
    if (taosArrayGetSize(pBlocks) == 0){
40,870,235✔
UNCOV
353
      void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
×
UNCOV
354
      if (rawData == NULL) {
×
355
        continue;
×
356
      }
UNCOV
357
      if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
×
358
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
359
        continue;
×
360
      }
UNCOV
361
      *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
×
362
    } else {
363
      SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
40,939,366✔
364
      if (pBlock == NULL) {
40,921,258✔
365
        continue;
×
366
      }
367

368
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pSchemas, i);
40,921,258✔
369
      if (tqAddBlockDataToRsp(pBlock, pRsp, pSW, pTq->pVnode->config.tsdbCfg.precision) != 0){
40,976,714✔
370
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
371
        continue;
×
372
      }
373
      *totalRows += pBlock->info.rows;
40,874,461✔
374
    }
375

376
    pRsp->blockNum++;
40,967,545✔
377
  }
378
  tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
41,006,479✔
379
END:
41,233,488✔
380
  if (code != 0) {
41,097,939✔
381
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
194,011✔
382
  }
383
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
41,097,939✔
384
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
41,132,597✔
385
}
41,187,358✔
386

387
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
42,301,634✔
388
  STqExecHandle* pExec = &pHandle->execHandle;
42,301,634✔
389
  STqReader* pReader = pExec->pTqReader;
42,309,221✔
390
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
42,310,480✔
391
  for (int32_t i = 0; i < blockSz; i++){
84,636,453✔
392
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
42,324,599✔
393
    if (pSubmitTbData== NULL){
42,327,529✔
394
      taosArrayDestroy(*rawList);
×
395
      *rawList = NULL;
×
396
      return;
×
397
    }
398

399
    int64_t uid = pSubmitTbData->uid;
42,327,529✔
400
    if (pRequest->rawData) {
42,331,457✔
UNCOV
401
      if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
×
UNCOV
402
        tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
×
UNCOV
403
        terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
UNCOV
404
        return;
×
405
      } else {
UNCOV
406
        int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
×
UNCOV
407
        if (code != 0) {
×
408
          tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid);
×
409
        }
410
      }
411
    }
412

413
    if (pSubmitTbData->pCreateTbReq == NULL){
42,320,732✔
414
      continue;
42,219,978✔
415
    }
416

417
    int64_t createTime = INT64_MAX;
104,640✔
418
    int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
104,640✔
419
    if (cTime != NULL){
104,640✔
420
      createTime = *cTime;
909✔
421
    } else{
422
      createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1);
103,731✔
423
      if (createTime != INT64_MAX){
103,428✔
424
        int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
103,428✔
425
        if (code != 0){
103,731✔
426
          tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
×
427
        }
428
      }
429
    }
430
    if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
104,640✔
431
      tDestroySVSubmitCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
94,454✔
432
      taosMemoryFreeClear(pSubmitTbData->pCreateTbReq);
94,145✔
433
    } else{
434
      taosArrayDestroy(*rawList);
10,186✔
435
      *rawList = NULL;
10,186✔
436
    }
437
  }
438
}
439

440
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
42,317,278✔
441
  int32_t code = 0;
42,317,278✔
442
  int32_t lino = 0;
42,317,278✔
443
  SDecoder decoder = {0};
42,317,278✔
444
  STqExecHandle* pExec = &pHandle->execHandle;
42,317,854✔
445
  STqReader* pReader = pExec->pTqReader;
42,317,854✔
446
  SArray *rawList = NULL;
42,317,854✔
447
  if (pRequest->rawData){
42,317,535✔
UNCOV
448
    rawList = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
449
    TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
×
450
  }
451
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList, &decoder);
42,317,823✔
452
  TSDB_CHECK_CODE(code, lino, END);
42,312,429✔
453
  preProcessSubmitMsg(pHandle, pRequest, &rawList);
42,312,429✔
454
  // data could not contains same uid data in rawdata mode
455
  if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
42,300,767✔
UNCOV
456
    goto END;
×
457
  }
458

459
  // this submit data is metadata and previous data is rawdata
460
  if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
42,312,325✔
461
    tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
×
462
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
463
    goto END;
×
464
  }
465

466
  // this submit data is rawdata and previous data is metadata
467
  if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
42,305,548✔
468
    tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
×
469
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
470
    goto END;
×
471
  }
472

473
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
42,296,982✔
474
    while (tqNextBlockImpl(pReader, NULL)) {
22,243,739✔
475
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
10,563,669✔
476
    }
477
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
30,614,276✔
478
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
61,180,282✔
479
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
30,642,324✔
480
    }
481
  }
482

483
END:
42,203,073✔
484
  tDecoderClear(&decoder);
42,260,489✔
485
  tqReaderClearSubmitMsg(pReader);
42,268,015✔
486
  taosArrayDestroy(rawList);
42,306,030✔
487
  if (code != 0){
42,307,863✔
488
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
489
  }
490
  return code;
42,307,863✔
491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc