• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3578

11 Jan 2025 11:19AM UTC coverage: 63.183% (-0.03%) from 63.211%
#3578

push

travis-ci

web-flow
Merge pull request #29546 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

139873 of 284461 branches covered (49.17%)

Branch coverage included in aggregate %.

20 of 26 new or added lines in 2 files covered. (76.92%)

717 existing lines in 102 files now uncovered.

217827 of 281671 relevant lines covered (77.33%)

19620733.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.43
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
593,673✔
19
  int32_t code = 0;
593,673✔
20
  int32_t lino = 0;
593,673✔
21

22
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
593,673✔
23
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
593,631✔
24
  void*   buf = taosMemoryCalloc(1, dataStrLen);
593,631!
25
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
593,652!
26

27
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
593,652✔
28
  pRetrieve->version = 1;
593,652✔
29
  pRetrieve->precision = precision;
593,652✔
30
  pRetrieve->compressed = 0;
593,652✔
31
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
593,652✔
32

33
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
593,651✔
34
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
593,603!
35

36
  actualLen += sizeof(SRetrieveTableRspForTmq);
593,603✔
37
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
1,187,220!
38
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
1,187,216!
39

40
  buf = NULL;
593,599✔
41
END:
593,599✔
42
  if (code != 0){
593,599!
43
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
44
  }
45
  taosMemoryFree(buf);
593,591!
46
  return code;
593,590✔
47
}
48

49
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
118,103✔
50
  int32_t    code = TDB_CODE_SUCCESS;
118,103✔
51
  int32_t    lino = 0;
118,103✔
52
  SMetaReader mr = {0};
118,103✔
53

54
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
118,103!
55
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
118,103!
56

57
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
118,103✔
58

59
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
118,106✔
60
  TSDB_CHECK_CODE(code, lino, END);
118,056!
61

62
  for (int32_t i = 0; i < n; i++) {
236,166✔
63
    char* tbName = taosStrdup(mr.me.name);
118,057!
64
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
118,055!
65
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
236,112!
66
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
67
      continue;
×
68
    }
69
    tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
118,057!
70
  }
71

72
END:
118,109✔
73
  if (code != TSDB_CODE_SUCCESS) {
118,109!
UNCOV
74
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
75
  }
76
  metaReaderClear(&mr);
118,109✔
77
  return code;
118,111✔
78
}
79

80
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
516,587✔
81
  if (task == NULL || pHandle == NULL || res == NULL) {
516,587!
82
    return TSDB_CODE_INVALID_PARA;
×
83
  }
84
  uint64_t ts = 0;
516,592✔
85
  qStreamSetOpen(task);
516,592✔
86

87
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
516,592!
88
  int32_t code = qExecTask(task, res, &ts);
516,640✔
89
  if (code != TSDB_CODE_SUCCESS) {
516,637!
90
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
91
    return code;
×
92
  }
93

94
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
516,637✔
95
  return 0;
516,641✔
96
}
97

98
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
33✔
99
  int32_t code = 0;
33✔
100
  int32_t lino = 0;
33✔
101

102
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
33✔
103
    blockDataDestroy(pHandle->block);
1✔
104
    pHandle->block = NULL;
1✔
105
  }
106
  if (pHandle->block == NULL) {
33✔
107
    if (pDataBlock == NULL) {
11✔
108
      goto END;
6✔
109
    }
110

111
    STqOffsetVal offset = {0};
5✔
112
    code = qStreamExtractOffset(task, &offset);
5✔
113
    TSDB_CHECK_CODE(code, lino, END);
5!
114

115
    pHandle->block = NULL;
5✔
116

117
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
5✔
118
    TSDB_CHECK_CODE(code, lino, END);
5!
119

120
    pHandle->blockTime = offset.ts;
5✔
121
    tOffsetDestroy(&offset);
5✔
122
    int32_t vgId = TD_VID(pTq->pVnode);
5✔
123
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
5✔
124
    TSDB_CHECK_CODE(code, lino, END);
5!
125
  }
126

127
  const STqExecHandle* pExec = &pHandle->execHandle;
27✔
128
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
27✔
129
  TSDB_CHECK_CODE(code, lino, END);
27!
130

131
  pRsp->blockNum++;
27✔
132
  if (pDataBlock == NULL) {
27✔
133
    blockDataDestroy(pHandle->block);
4✔
134
    pHandle->block = NULL;
4✔
135
  } else {
136
    code = copyDataBlock(pHandle->block, pDataBlock);
23✔
137
    TSDB_CHECK_CODE(code, lino, END);
23!
138

139
    STqOffsetVal offset = {0};
23✔
140
    code = qStreamExtractOffset(task, &offset);
23✔
141
    TSDB_CHECK_CODE(code, lino, END);
23!
142

143
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
23✔
144
    pHandle->blockTime = offset.ts;
23✔
145
    tOffsetDestroy(&offset);
23✔
146
  }
147

148
END:
33✔
149
  if (code != TSDB_CODE_SUCCESS) {
33!
150
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
151
  }
152
  return code;
33✔
153
}
154

155
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
46,319✔
156
  int32_t code = 0;
46,319✔
157
  int32_t lino = 0;
46,319✔
158
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
46,319!
159
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
46,319!
160
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
46,319!
161
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
46,319!
162
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
46,319!
163

164
  int32_t vgId = TD_VID(pTq->pVnode);
46,319✔
165
  int32_t totalRows = 0;
46,319✔
166

167
  const STqExecHandle* pExec = &pHandle->execHandle;
46,319✔
168
  qTaskInfo_t          task = pExec->task;
46,319✔
169

170
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
46,319✔
171
  TSDB_CHECK_CODE(code, lino, END);
46,319✔
172

173
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
45,631✔
174
  int64_t st = taosGetTimestampMs();
45,630✔
175
  while (1) {
470,957✔
176
    SSDataBlock* pDataBlock = NULL;
516,587✔
177
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
516,587✔
178
    TSDB_CHECK_CODE(code, lino, END);
516,635!
179

180
    if (pRequest->enableReplay) {
516,635✔
181
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
33✔
182
      TSDB_CHECK_CODE(code, lino, END);
33!
183
      break;
33✔
184
    }
185
    if (pDataBlock == NULL) {
516,602✔
186
      break;
43,187✔
187
    }
188
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
473,415✔
189
    TSDB_CHECK_CODE(code, lino, END);
473,366!
190

191
    pRsp->blockNum++;
473,366✔
192
    totalRows += pDataBlock->info.rows;
473,366✔
193
    if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
944,463✔
194
      break;
195
    }
196
  }
197

198
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
45,628!
199
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
45,628✔
200

201
END:
46,319✔
202
  if (code != 0) {
46,319✔
203
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
688!
204
  }
205
  return code;
46,319✔
206
}
207

208
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) {
80✔
209
  int32_t code = 0;
80✔
210
  int32_t lino = 0;
80✔
211
  char* tbName = NULL;
80✔
212
  SSchemaWrapper* pSW = NULL;
80✔
213
  const STqExecHandle* pExec = &pHandle->execHandle;
80✔
214
  qTaskInfo_t          task = pExec->task;
80✔
215
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
80✔
216
  TSDB_CHECK_CODE(code, lino, END);
79!
217

218
  int32_t rowCnt = 0;
79✔
219
  int64_t st = taosGetTimestampMs();
79✔
220
  while (1) {
4,186✔
221
    SSDataBlock* pDataBlock = NULL;
4,265✔
222
    uint64_t     ts = 0;
4,265✔
223
    tqDebug("tmqsnap task start to execute");
4,265✔
224
    code = qExecTask(task, &pDataBlock, &ts);
4,268✔
225
    TSDB_CHECK_CODE(code, lino, END);
4,267!
226
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
4,267!
227

228
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
4,267!
229
      if (pRsp->withTbName) {
2,132!
230
        tbName = taosStrdup(qExtractTbnameFromTask(task));
2,132!
231
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
2,132!
232
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
4,264!
233
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
2,132!
234
        tbName = NULL;
2,132✔
235
      }
236
      if (pRsp->withSchema) {
2,132!
237
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
2,132✔
238
        TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
2,132!
239
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
4,264!
240
        pSW = NULL;
2,132✔
241
      }
242

243
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
2,132✔
244
                                 pTq->pVnode->config.tsdbCfg.precision);
2,132✔
245
      TSDB_CHECK_CODE(code, lino, END);
2,131!
246

247
      pRsp->blockNum++;
2,131✔
248
      rowCnt += pDataBlock->info.rows;
2,131✔
249
      if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) {
4,228!
250
        continue;
4,186✔
251
      }
252
    }
253

254
    // get meta
255
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
2,170✔
256
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
2,170✔
257
      code = qStreamExtractOffset(task, &tmp->rspOffset);
16✔
258
      TSDB_CHECK_CODE(code, lino, END);
16!
259
      *pBatchMetaRsp = *tmp;
16✔
260
      tqDebug("tmqsnap task get meta");
16!
261
      break;
16✔
262
    }
263

264
    if (pDataBlock == NULL) {
2,154✔
265
      code = qStreamExtractOffset(task, pOffset);
2,119✔
266
      TSDB_CHECK_CODE(code, lino, END);
2,118!
267

268
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,118✔
269
        continue;
2,089✔
270
      }
271

272
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
29!
273
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
29✔
274
      break;
29✔
275
    }
276

277
    if (pRsp->blockNum > 0) {
35!
278
      tqDebug("tmqsnap task exec exited, get data");
35!
279
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
35✔
280
      break;
35✔
281
    }
282
  }
283
  tqDebug("%s:%d success", __FUNCTION__, lino);
80!
284
END:
×
285
  if (code != 0){
80!
286
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
287
  }
288
  taosMemoryFree(pSW);
80!
289
  taosMemoryFree(tbName);
80!
290
  return code;
80✔
291
}
292

293
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
31✔
294
  int32_t code = 0;
31✔
295
  int32_t lino = 0;
31✔
296
  void*   createReq = NULL;
31✔
297
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
31!
298
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
31!
299

300
  if (pRsp->createTableNum == 0) {
31✔
301
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
14✔
302
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
14!
303
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
14✔
304
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
14!
305
  }
306

307
  uint32_t len = 0;
31✔
308
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
31!
309
  TSDB_CHECK_CODE(code, lino, END);
31!
310
  createReq = taosMemoryCalloc(1, len);
31!
311
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
31!
312

313
  SEncoder encoder = {0};
31✔
314
  tEncoderInit(&encoder, createReq, len);
31✔
315
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
31✔
316
  tEncoderClear(&encoder);
31✔
317
  TSDB_CHECK_CODE(code, lino, END);
31!
318
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
62!
319
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
62!
320
  pRsp->createTableNum++;
31✔
321
  tqDebug("build create table info msg success");
31!
322

323
END:
×
324
  if (code != 0){
31!
325
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
326
    taosMemoryFree(createReq);
×
327
  }
328
  return code;
31✔
329
}
330

331
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
118,657✔
332
  int32_t code = 0;
118,657✔
333
  int32_t lino = 0;
118,657✔
334
  SArray* pBlocks = NULL;
118,657✔
335
  SArray* pSchemas = NULL;
118,657✔
336

337
  STqExecHandle* pExec = &pHandle->execHandle;
118,657✔
338
  STqReader* pReader = pExec->pTqReader;
118,657✔
339

340
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
118,657✔
341
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
118,655!
342
  pSchemas = taosArrayInit(0, sizeof(void*));
118,655✔
343
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
118,657!
344

345
  SSubmitTbData* pSubmitTbDataRet = NULL;
118,657✔
346
  int64_t createTime = INT64_MAX;
118,657✔
347
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
118,657✔
348
  TSDB_CHECK_CODE(code, lino, END);
118,651✔
349
  bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0;
118,219✔
350
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
118,219✔
351
  if (pRsp->withTbName) {
118,105!
352
    int64_t uid = pExec->pTqReader->lastBlkUid;
118,105✔
353
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
118,105✔
354
    TSDB_CHECK_CODE(code, lino, END);
118,111!
355
  }
356
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
118,111✔
357
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
31!
358
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
31✔
359
      TSDB_CHECK_CODE(code, lino, END);
31!
360
    }
361
  }
362
  tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL);
118,111✔
363
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
118,111✔
364
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
236,187✔
365
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
118,060✔
366
    if (pBlock == NULL) {
118,102!
367
      continue;
×
368
    }
369
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
118,102!
370
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
371
      continue;
×
372
    }
373
    *totalRows += pBlock->info.rows;
118,075✔
374
    blockDataFreeRes(pBlock);
118,075✔
375
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
118,091✔
376
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
236,171!
377
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
378
      continue;
×
379
    }
380
    pRsp->blockNum++;
118,081✔
381
  }
382
  tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
118,082!
383
END:
×
384
  if (code != 0){
118,658✔
385
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
432!
386
    taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
432✔
387
    taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
432✔
388
  } else {
389
    taosArrayDestroy(pBlocks);
118,226✔
390
    taosArrayDestroy(pSchemas);
118,224✔
391
  }
392
}
118,657✔
393

394
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
122,090✔
395
  int32_t code = 0;
122,090✔
396
  int32_t lino = 0;
122,090✔
397
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
122,090!
398
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
122,090!
399
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
122,090!
400
  TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA);
122,090!
401
  STqExecHandle* pExec = &pHandle->execHandle;
122,090✔
402
  STqReader* pReader = pExec->pTqReader;
122,090✔
403
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
122,090✔
404
  TSDB_CHECK_CODE(code, lino, END);
122,084!
405

406
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
122,084✔
407
    while (tqNextBlockImpl(pReader, NULL)) {
69,557✔
408
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
33,026✔
409
    }
410
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
85,553!
411
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
171,189✔
412
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
85,632✔
413
    }
414
  }
415

416
END:
85,555✔
417
  if (code != 0){
122,086!
418
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
419
  }
420
  return code;
122,089✔
421
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc