• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
UNCOV
17
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
×
UNCOV
18
  int32_t    code = TDB_CODE_SUCCESS;
×
UNCOV
19
  int32_t    lino = 0;
×
UNCOV
20
  void*      buf = NULL;
×
21

UNCOV
22
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
×
UNCOV
23
  buf = taosMemoryCalloc(1, dataStrLen);
×
UNCOV
24
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
25

UNCOV
26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
UNCOV
27
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
×
UNCOV
28
  pRetrieve->precision = precision;
×
UNCOV
29
  pRetrieve->compressed = 0;
×
30

UNCOV
31
  memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
×
UNCOV
32
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
×
UNCOV
33
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
UNCOV
34
  pRsp->blockDataElementFree = true;
×
35

UNCOV
36
  tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
UNCOV
37
  END:
×
UNCOV
38
  if (code != TSDB_CODE_SUCCESS) {
×
39
    taosMemoryFree(buf);
×
40
    tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
×
41
  }
UNCOV
42
  return code;
×
43
}
44

UNCOV
45
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
×
UNCOV
46
  int32_t code = 0;
×
UNCOV
47
  int32_t lino = 0;
×
48

UNCOV
49
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
50
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
×
UNCOV
51
  void*   buf = taosMemoryCalloc(1, dataStrLen);
×
UNCOV
52
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
53

UNCOV
54
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
UNCOV
55
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
×
UNCOV
56
  pRetrieve->precision = precision;
×
UNCOV
57
  pRetrieve->compressed = 0;
×
UNCOV
58
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
×
59

UNCOV
60
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
×
UNCOV
61
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
×
62

UNCOV
63
  actualLen += sizeof(SRetrieveTableRspForTmq);
×
UNCOV
64
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
×
UNCOV
65
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
UNCOV
66
  pRsp->blockDataElementFree = true;
×
UNCOV
67
  tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
68

UNCOV
69
END:
×
UNCOV
70
  if (code != TSDB_CODE_SUCCESS){
×
71
    taosMemoryFree(buf);
×
72
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
73
  }
UNCOV
74
  return code;
×
75
}
76

UNCOV
77
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
×
UNCOV
78
  int32_t    code = TDB_CODE_SUCCESS;
×
UNCOV
79
  int32_t    lino = 0;
×
UNCOV
80
  SMetaReader mr = {0};
×
81

UNCOV
82
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
83
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
×
84

UNCOV
85
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
×
86

UNCOV
87
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
×
UNCOV
88
  TSDB_CHECK_CODE(code, lino, END);
×
89

UNCOV
90
  for (int32_t i = 0; i < n; i++) {
×
UNCOV
91
    char* tbName = taosStrdup(mr.me.name);
×
UNCOV
92
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
×
UNCOV
93
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
×
94
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
95
      continue;
×
96
    }
UNCOV
97
    tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
×
98
  }
99

UNCOV
100
END:
×
UNCOV
101
  if (code != TSDB_CODE_SUCCESS) {
×
102
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
103
  }
UNCOV
104
  metaReaderClear(&mr);
×
UNCOV
105
  return code;
×
106
}
107

UNCOV
108
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
×
UNCOV
109
  if (task == NULL || pHandle == NULL || res == NULL) {
×
110
    return TSDB_CODE_INVALID_PARA;
×
111
  }
UNCOV
112
  uint64_t ts = 0;
×
UNCOV
113
  qStreamSetOpen(task);
×
114

UNCOV
115
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
×
UNCOV
116
  int32_t code = qExecTask(task, res, &ts);
×
UNCOV
117
  if (code != TSDB_CODE_SUCCESS) {
×
118
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
119
    return code;
×
120
  }
121

UNCOV
122
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
×
UNCOV
123
  return 0;
×
124
}
125

UNCOV
126
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
×
UNCOV
127
  int32_t code = 0;
×
UNCOV
128
  int32_t lino = 0;
×
129

UNCOV
130
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
×
UNCOV
131
    blockDataDestroy(pHandle->block);
×
UNCOV
132
    pHandle->block = NULL;
×
133
  }
UNCOV
134
  if (pHandle->block == NULL) {
×
UNCOV
135
    if (pDataBlock == NULL) {
×
UNCOV
136
      goto END;
×
137
    }
138

UNCOV
139
    STqOffsetVal offset = {0};
×
UNCOV
140
    code = qStreamExtractOffset(task, &offset);
×
UNCOV
141
    TSDB_CHECK_CODE(code, lino, END);
×
142

UNCOV
143
    pHandle->block = NULL;
×
144

UNCOV
145
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
×
UNCOV
146
    TSDB_CHECK_CODE(code, lino, END);
×
147

UNCOV
148
    pHandle->blockTime = offset.ts;
×
UNCOV
149
    tOffsetDestroy(&offset);
×
UNCOV
150
    int32_t vgId = TD_VID(pTq->pVnode);
×
UNCOV
151
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
×
UNCOV
152
    TSDB_CHECK_CODE(code, lino, END);
×
153
  }
154

UNCOV
155
  const STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
156
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
×
UNCOV
157
  TSDB_CHECK_CODE(code, lino, END);
×
158

UNCOV
159
  pRsp->blockNum++;
×
UNCOV
160
  if (pDataBlock == NULL) {
×
UNCOV
161
    blockDataDestroy(pHandle->block);
×
UNCOV
162
    pHandle->block = NULL;
×
163
  } else {
UNCOV
164
    code = copyDataBlock(pHandle->block, pDataBlock);
×
UNCOV
165
    TSDB_CHECK_CODE(code, lino, END);
×
166

UNCOV
167
    STqOffsetVal offset = {0};
×
UNCOV
168
    code = qStreamExtractOffset(task, &offset);
×
UNCOV
169
    TSDB_CHECK_CODE(code, lino, END);
×
170

UNCOV
171
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
×
UNCOV
172
    pHandle->blockTime = offset.ts;
×
UNCOV
173
    tOffsetDestroy(&offset);
×
174
  }
175

UNCOV
176
END:
×
UNCOV
177
  if (code != TSDB_CODE_SUCCESS) {
×
178
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
179
  }
UNCOV
180
  return code;
×
181
}
182

UNCOV
183
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
×
UNCOV
184
  int32_t code = 0;
×
UNCOV
185
  int32_t lino = 0;
×
UNCOV
186
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
187
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
188
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
189
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
190
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
×
191

UNCOV
192
  int32_t vgId = TD_VID(pTq->pVnode);
×
UNCOV
193
  int32_t totalRows = 0;
×
194

UNCOV
195
  const STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
196
  qTaskInfo_t          task = pExec->task;
×
197

UNCOV
198
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
×
UNCOV
199
  TSDB_CHECK_CODE(code, lino, END);
×
200

UNCOV
201
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
×
UNCOV
202
  int64_t st = taosGetTimestampMs();
×
UNCOV
203
  while (1) {
×
UNCOV
204
    SSDataBlock* pDataBlock = NULL;
×
UNCOV
205
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
×
UNCOV
206
    TSDB_CHECK_CODE(code, lino, END);
×
207

UNCOV
208
    if (pRequest->enableReplay) {
×
UNCOV
209
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
×
UNCOV
210
      TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
211
      break;
×
212
    }
UNCOV
213
    if (pDataBlock == NULL) {
×
UNCOV
214
      break;
×
215
    }
UNCOV
216
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
×
UNCOV
217
    TSDB_CHECK_CODE(code, lino, END);
×
218

UNCOV
219
    pRsp->blockNum++;
×
UNCOV
220
    totalRows += pDataBlock->info.rows;
×
UNCOV
221
    if (totalRows >= pRequest->minPollRows || (taosGetTimestampMs() - st > pRequest->timeout)) {
×
222
      break;
223
    }
224
  }
225

UNCOV
226
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
×
UNCOV
227
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
228

UNCOV
229
END:
×
UNCOV
230
  if (code != 0) {
×
UNCOV
231
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
×
232
  }
UNCOV
233
  return code;
×
234
}
235

UNCOV
236
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
×
UNCOV
237
  int32_t code = 0;
×
UNCOV
238
  int32_t lino = 0;
×
UNCOV
239
  char* tbName = NULL;
×
UNCOV
240
  SSchemaWrapper* pSW = NULL;
×
UNCOV
241
  const STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
242
  qTaskInfo_t          task = pExec->task;
×
UNCOV
243
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
×
UNCOV
244
  TSDB_CHECK_CODE(code, lino, END);
×
245

UNCOV
246
  int32_t rowCnt = 0;
×
UNCOV
247
  int64_t st = taosGetTimestampMs();
×
UNCOV
248
  while (1) {
×
UNCOV
249
    SSDataBlock* pDataBlock = NULL;
×
UNCOV
250
    uint64_t     ts = 0;
×
UNCOV
251
    tqDebug("tmqsnap task start to execute");
×
UNCOV
252
    code = qExecTask(task, &pDataBlock, &ts);
×
UNCOV
253
    TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
254
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
×
255

UNCOV
256
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
×
UNCOV
257
      if (pRsp->withTbName) {
×
UNCOV
258
        tbName = taosStrdup(qExtractTbnameFromTask(task));
×
UNCOV
259
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
×
UNCOV
260
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
×
UNCOV
261
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
×
UNCOV
262
        tbName = NULL;
×
263
      }
UNCOV
264
      if (pRsp->withSchema) {
×
UNCOV
265
        pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
×
UNCOV
266
        TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
×
UNCOV
267
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
×
UNCOV
268
        pSW = NULL;
×
269
      }
270

UNCOV
271
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
×
UNCOV
272
                                 pTq->pVnode->config.tsdbCfg.precision);
×
UNCOV
273
      TSDB_CHECK_CODE(code, lino, END);
×
274

UNCOV
275
      pRsp->blockNum++;
×
UNCOV
276
      rowCnt += pDataBlock->info.rows;
×
UNCOV
277
      if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
×
UNCOV
278
        continue;
×
279
      }
280
    }
281

282
    // get meta
UNCOV
283
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
×
UNCOV
284
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
×
UNCOV
285
      code = qStreamExtractOffset(task, &tmp->rspOffset);
×
UNCOV
286
      TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
287
      *pBatchMetaRsp = *tmp;
×
UNCOV
288
      tqDebug("tmqsnap task get meta");
×
UNCOV
289
      break;
×
290
    }
291

UNCOV
292
    if (pDataBlock == NULL) {
×
UNCOV
293
      code = qStreamExtractOffset(task, pOffset);
×
UNCOV
294
      TSDB_CHECK_CODE(code, lino, END);
×
295

UNCOV
296
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
×
UNCOV
297
        continue;
×
298
      }
299

UNCOV
300
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
×
UNCOV
301
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
UNCOV
302
      break;
×
303
    }
304

UNCOV
305
    if (pRsp->blockNum > 0) {
×
UNCOV
306
      tqDebug("tmqsnap task exec exited, get data");
×
UNCOV
307
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
UNCOV
308
      break;
×
309
    }
310
  }
UNCOV
311
  tqDebug("%s:%d success", __FUNCTION__, lino);
×
312
END:
×
UNCOV
313
  if (code != 0){
×
314
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
315
  }
UNCOV
316
  taosMemoryFree(pSW);
×
UNCOV
317
  taosMemoryFree(tbName);
×
UNCOV
318
  return code;
×
319
}
320

UNCOV
321
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
×
322
                             const SMqPollReq* pRequest, SArray* rawList){
UNCOV
323
  int32_t code = 0;
×
UNCOV
324
  int32_t lino = 0;
×
UNCOV
325
  SArray* pBlocks = NULL;
×
UNCOV
326
  SArray* pSchemas = NULL;
×
327

UNCOV
328
  STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
329
  STqReader* pReader = pExec->pTqReader;
×
330

UNCOV
331
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
×
UNCOV
332
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
×
UNCOV
333
  pSchemas = taosArrayInit(0, sizeof(void*));
×
UNCOV
334
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
×
335

UNCOV
336
  SSubmitTbData* pSubmitTbData = NULL;
×
UNCOV
337
  code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
×
UNCOV
338
  TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
339
  bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
×
UNCOV
340
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
×
341

342

UNCOV
343
  int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
×
UNCOV
344
  if (pRsp->withTbName) {
×
UNCOV
345
    int64_t uid = pExec->pTqReader->lastBlkUid;
×
UNCOV
346
    code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
×
UNCOV
347
    TSDB_CHECK_CODE(code, lino, END);
×
348
  }
349

UNCOV
350
  tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL);
×
UNCOV
351
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
×
UNCOV
352
  for (int32_t i = 0; i < blockNum; i++) {
×
UNCOV
353
    if (taosArrayGetSize(pBlocks) == 0){
×
UNCOV
354
      void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
×
UNCOV
355
      if (rawData == NULL) {
×
356
        continue;
×
357
      }
UNCOV
358
      if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
×
359
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
360
        continue;
×
361
      }
UNCOV
362
      *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
×
363
    } else {
UNCOV
364
      SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
×
UNCOV
365
      if (pBlock == NULL) {
×
366
        continue;
×
367
      }
368

UNCOV
369
      if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
×
370
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
371
        continue;
×
372
      }
UNCOV
373
      *totalRows += pBlock->info.rows;
×
374
    }
375

UNCOV
376
    void** pSW = taosArrayGet(pSchemas, i);
×
UNCOV
377
    if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
×
378
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
379
      continue;
×
380
    }
UNCOV
381
    *pSW = NULL;
×
UNCOV
382
    pRsp->blockNum++;
×
383
  }
UNCOV
384
  tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
×
UNCOV
385
END:
×
UNCOV
386
  if (code != 0) {
×
UNCOV
387
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
×
388
  }
UNCOV
389
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
×
UNCOV
390
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
×
UNCOV
391
}
×
392

UNCOV
393
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
×
UNCOV
394
  STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
395
  STqReader* pReader = pExec->pTqReader;
×
UNCOV
396
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
397
  for (int32_t i = 0; i < blockSz; i++){
×
UNCOV
398
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
×
UNCOV
399
    if (pSubmitTbData== NULL){
×
400
      taosArrayDestroy(*rawList);
×
401
      *rawList = NULL;
×
UNCOV
402
      return;
×
403
    }
404

UNCOV
405
    int64_t uid = pSubmitTbData->uid;
×
UNCOV
406
    if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
×
UNCOV
407
      tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
×
UNCOV
408
      terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
UNCOV
409
      return;
×
410
    } else {
UNCOV
411
      int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
×
UNCOV
412
      if (code != 0){
×
413
        tqError("failed to add table uid to hash, code:%d, uid:%"PRId64, code, uid);
×
414
      }
415
    }
416

UNCOV
417
    if (pSubmitTbData->pCreateTbReq == NULL){
×
UNCOV
418
      continue;
×
419
    }
420

UNCOV
421
    int64_t createTime = INT64_MAX;
×
UNCOV
422
    int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
×
UNCOV
423
    if (cTime != NULL){
×
UNCOV
424
      createTime = *cTime;
×
425
    } else{
UNCOV
426
      createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1);
×
UNCOV
427
      if (createTime != INT64_MAX){
×
UNCOV
428
        int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
×
UNCOV
429
        if (code != 0){
×
430
          tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
×
431
        }
432
      }
433
    }
UNCOV
434
    if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
×
UNCOV
435
      tDestroySVCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
×
UNCOV
436
      pSubmitTbData->pCreateTbReq = NULL;
×
437
    } else{
UNCOV
438
      taosArrayDestroy(*rawList);
×
UNCOV
439
      *rawList = NULL;
×
440
    }
441
  }
442
}
443

UNCOV
444
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
×
UNCOV
445
  int32_t code = 0;
×
UNCOV
446
  int32_t lino = 0;
×
UNCOV
447
  STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
448
  STqReader* pReader = pExec->pTqReader;
×
UNCOV
449
  SArray *rawList = NULL;
×
UNCOV
450
  if (pRequest->rawData){
×
UNCOV
451
    rawList = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
452
    TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
×
453
  }
UNCOV
454
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
×
UNCOV
455
  TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
456
  if (pRequest->rawData) {
×
UNCOV
457
    preProcessSubmitMsg(pHandle, pRequest, &rawList);
×
458
  }
459
  // data could not contains same uid data in rawdata mode
UNCOV
460
  if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
×
UNCOV
461
    goto END;
×
462
  }
463

464
  // this submit data is metadata and previous data is rawdata
UNCOV
465
  if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
×
466
    tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
×
467
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
468
    goto END;
×
469
  }
470

471
  // this submit data is rawdata and previous data is metadata
UNCOV
472
  if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
×
473
    tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
×
474
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
475
    goto END;
×
476
  }
477

UNCOV
478
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
479
    while (tqNextBlockImpl(pReader, NULL)) {
×
UNCOV
480
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
×
481
    }
UNCOV
482
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
×
UNCOV
483
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
×
UNCOV
484
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
×
485
    }
486
  }
487

UNCOV
488
END:
×
UNCOV
489
  tqReaderClearSubmitMsg(pReader);
×
UNCOV
490
  taosArrayDestroy(rawList);
×
UNCOV
491
  if (code != 0){
×
492
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
493
  }
UNCOV
494
  return code;
×
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc