• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.65
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
605,083✔
19
  if (pBlock == NULL || pRsp == NULL) {
605,083!
UNCOV
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
22
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
605,116✔
23
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
604,940✔
24
  void*   buf = taosMemoryCalloc(1, dataStrLen);
604,940!
25
  if (buf == NULL) {
605,193!
UNCOV
26
    return terrno;
×
27
  }
28

29
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
605,193✔
30
  pRetrieve->version = 1;
605,193✔
31
  pRetrieve->precision = precision;
605,193✔
32
  pRetrieve->compressed = 0;
605,193✔
33
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
605,193✔
34

35
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
605,131✔
36
  if(actualLen < 0){
605,084!
UNCOV
37
    taosMemoryFree(buf);
×
UNCOV
38
    return terrno;
×
39
  }
40
  actualLen += sizeof(SRetrieveTableRspForTmq);
605,084✔
41
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
1,210,174!
UNCOV
42
    taosMemoryFree(buf);
×
43
    return terrno;
×
44
  }
45
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
1,210,107!
UNCOV
46
    taosMemoryFree(buf);
×
UNCOV
47
    return terrno;
×
48
  }
49

50
  return TSDB_CODE_SUCCESS;
605,017✔
51
}
52

53
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
117,903✔
54
  if (pRsp == NULL || pTq == NULL) {
117,903!
55
    return TSDB_CODE_INVALID_PARA;
×
56
  }
57
  SMetaReader mr = {0};
117,934✔
58
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
117,934✔
59

60
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
118,025✔
61
  if (code < 0) {
117,840✔
62
    metaReaderClear(&mr);
1✔
63
    return code;
1✔
64
  }
65

66
  for (int32_t i = 0; i < n; i++) {
235,683✔
67
    char* tbName = taosStrdup(mr.me.name);
117,814!
68
    if (tbName == NULL) {
117,857!
UNCOV
69
      metaReaderClear(&mr);
×
UNCOV
70
      return terrno;
×
71
    }
72
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
235,701!
UNCOV
73
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
74
      continue;
×
75
    }
76
  }
77
  metaReaderClear(&mr);
117,869✔
78
  return 0;
117,982✔
79
}
80

81
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
539,573✔
82
  if (task == NULL || pHandle == NULL || res == NULL) {
539,573!
UNCOV
83
    return TSDB_CODE_INVALID_PARA;
×
84
  }
85
  uint64_t ts = 0;
539,575✔
86
  qStreamSetOpen(task);
539,575✔
87

88
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
539,575!
89
  int32_t code = qExecTask(task, res, &ts);
539,634✔
90
  if (code != TSDB_CODE_SUCCESS) {
539,633!
UNCOV
91
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
UNCOV
92
    return code;
×
93
  }
94

95
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
539,633!
96
  return 0;
539,637✔
97
}
98

99
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
57,475✔
100
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || pOffset == NULL || pRequest == NULL){
57,475!
UNCOV
101
    return TSDB_CODE_INVALID_PARA;
×
102
  }
103
  int32_t vgId = TD_VID(pTq->pVnode);
57,476✔
104
  int32_t code = 0;
57,476✔
105
  int32_t line = 0;
57,476✔
106
  int32_t totalRows = 0;
57,476✔
107

108
  const STqExecHandle* pExec = &pHandle->execHandle;
57,476✔
109
  qTaskInfo_t          task = pExec->task;
57,476✔
110

111
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
57,476✔
112
  TSDB_CHECK_CODE(code, line, END);
57,475✔
113

114
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
56,783✔
115
  uint64_t st = taosGetTimestampMs();
56,783✔
116
  while (1) {
482,792✔
117
    SSDataBlock* pDataBlock = NULL;
539,575✔
118
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
539,575✔
119
    TSDB_CHECK_CODE(code, line, END);
539,632!
120

121
    if (pRequest->enableReplay) {
539,632✔
122
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
33✔
123
        blockDataDestroy(pHandle->block);
1✔
124
        pHandle->block = NULL;
1✔
125
      }
126
      if (pHandle->block == NULL) {
33✔
127
        if (pDataBlock == NULL) {
11✔
128
          break;
6✔
129
        }
130

131
        STqOffsetVal offset = {0};
5✔
132
        code = qStreamExtractOffset(task, &offset);
5✔
133
        TSDB_CHECK_CODE(code, line, END);
5!
134

135
        pHandle->block = NULL;
5✔
136

137
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
5✔
138
        TSDB_CHECK_CODE(code, line, END);
5!
139

140
        pHandle->blockTime = offset.ts;
5✔
141
        tOffsetDestroy(&offset);
5✔
142
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
5✔
143
        TSDB_CHECK_CODE(code, line, END);
5!
144
      }
145

146
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
27✔
147
      TSDB_CHECK_CODE(code, line, END);
27!
148

149
      pRsp->blockNum++;
27✔
150
      if (pDataBlock == NULL) {
27✔
151
        blockDataDestroy(pHandle->block);
4✔
152
        pHandle->block = NULL;
4✔
153
      } else {
154
        code = copyDataBlock(pHandle->block, pDataBlock);
23✔
155
        TSDB_CHECK_CODE(code, line, END);
23!
156

157
        STqOffsetVal offset = {0};
23✔
158
        code = qStreamExtractOffset(task, &offset);
23✔
159
        TSDB_CHECK_CODE(code, line, END);
23!
160

161
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
23✔
162
        pHandle->blockTime = offset.ts;
23✔
163
        tOffsetDestroy(&offset);
23✔
164
      }
165
      break;
27✔
166
    } else {
167
      if (pDataBlock == NULL) {
539,599✔
168
        break;
54,477✔
169
      }
170
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
485,122✔
171
      TSDB_CHECK_CODE(code, line, END);
485,065!
172

173
      pRsp->blockNum++;
485,065✔
174
      totalRows += pDataBlock->info.rows;
485,065✔
175
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
967,854!
176
        break;
177
      }
178
    }
179
  }
180

181
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
56,781!
182
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
183
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
56,781✔
184
END:
57,476✔
185
  if (code != 0) {
57,476✔
186
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
692!
187
            code);
188
  }
189
  return code;
57,476✔
190
}
191

192
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
80✔
193
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) {
80!
UNCOV
194
    return TSDB_CODE_INVALID_PARA;
×
195
  }
196
  const STqExecHandle* pExec = &pHandle->execHandle;
80✔
197
  qTaskInfo_t          task = pExec->task;
80✔
198
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
80✔
199
  if (code != 0) {
80!
UNCOV
200
    return code;
×
201
  }
202

203
  int32_t rowCnt = 0;
80✔
204
  while (1) {
4,186✔
205
    SSDataBlock* pDataBlock = NULL;
4,266✔
206
    uint64_t     ts = 0;
4,266✔
207
    tqDebug("tmqsnap task start to execute");
4,266✔
208
    code = qExecTask(task, &pDataBlock, &ts);
4,269✔
209
    if (code != 0) {
4,268!
UNCOV
210
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
UNCOV
211
      return code;
×
212
    }
213

214
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
4,268!
215

216
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
4,268!
217
      if (pRsp->withTbName) {
2,133!
218
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
2,133!
219
        if (tbName == NULL) {
2,133!
220
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
221
          return terrno;
×
222
        }
223
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
4,266!
UNCOV
224
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
225
          continue;
×
226
        }
227
      }
228
      if (pRsp->withSchema) {
2,133!
229
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
2,133✔
230
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
4,266!
UNCOV
231
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
232
          continue;
×
233
        }
234
      }
235

236
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
2,133!
237
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
2,133✔
UNCOV
238
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
239
        continue;
×
240
      }
241
      pRsp->blockNum++;
2,131✔
242
      rowCnt += pDataBlock->info.rows;
2,131✔
243
      if (rowCnt <= tmqRowSize) continue;
2,131✔
244

245
    }
246

247
    // get meta
248
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
2,170✔
249
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
2,170✔
250
      code = qStreamExtractOffset(task, &tmp->rspOffset);
16✔
251
      if (code) {
16!
UNCOV
252
        return code;
×
253
      }
254

255
      *pBatchMetaRsp = *tmp;
16✔
256
      tqDebug("tmqsnap task get meta");
16!
257
      break;
16✔
258
    }
259

260
    if (pDataBlock == NULL) {
2,154✔
261
      code = qStreamExtractOffset(task, pOffset);
2,119✔
262
      if (code) {
2,119!
UNCOV
263
        break;
×
264
      }
265

266
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,119✔
267
        continue;
2,090✔
268
      }
269

270
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
29!
271
              pHandle->snapshotVer + 1);
272
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
29✔
273
      break;
29✔
274
    }
275

276
    if (pRsp->blockNum > 0) {
35!
277
      tqDebug("tmqsnap task exec exited, get data");
35!
278
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
35✔
279
      break;
35✔
280
    }
281
  }
282

283
  return code;
80✔
284
}
285

286
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
29✔
287
  if (pRsp == NULL || pCreateTbReq == NULL) {
29!
288
    return TSDB_CODE_INVALID_PARA;
×
289
  }
290
  int32_t code = 0;
29✔
291
  void*   createReq = NULL;
29✔
292
  if (pRsp->createTableNum == 0) {
29✔
293
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
12✔
294
    if (pRsp->createTableLen == NULL) {
12!
UNCOV
295
      code = terrno;
×
UNCOV
296
      goto END;
×
297
    }
298
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
12✔
299
    if (pRsp->createTableReq == NULL) {
12!
UNCOV
300
      code = terrno;
×
301
      goto END;
×
302
    }
303
  }
304

305
  uint32_t len = 0;
29✔
306
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
29!
307
  if (TSDB_CODE_SUCCESS != code) {
29!
UNCOV
308
    goto END;
×
309
  }
310
  createReq = taosMemoryCalloc(1, len);
29!
311
  if (createReq == NULL){
29!
UNCOV
312
    code = terrno;
×
313
    goto END;
×
314
  }
315
  SEncoder encoder = {0};
29✔
316
  tEncoderInit(&encoder, createReq, len);
29✔
317
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
29✔
318
  tEncoderClear(&encoder);
29✔
319
  if (code < 0) {
29!
320
    goto END;
×
321
  }
322
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
58!
UNCOV
323
    code = terrno;
×
UNCOV
324
    goto END;
×
325
  }
326
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
58!
327
    code = terrno;
×
328
    goto END;
×
329
  }
330
  pRsp->createTableNum++;
29✔
331

332
  return 0;
29✔
UNCOV
333
END:
×
UNCOV
334
  taosMemoryFree(createReq);
×
UNCOV
335
  return code;
×
336
}
337

338
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
118,598✔
339
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
118,598!
340
    return;
118,020✔
341
  }
342
  int32_t code = 0;
118,596✔
343
  STqExecHandle* pExec = &pHandle->execHandle;
118,596✔
344
  STqReader* pReader = pExec->pTqReader;
118,596✔
345
  SArray* pBlocks = NULL;
118,596✔
346
  SArray* pSchemas = NULL;
118,596✔
347
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
118,596✔
348
  if (pBlocks == NULL) {
118,590!
UNCOV
349
    code = terrno;
×
UNCOV
350
    goto END;
×
351
  }
352
  pSchemas = taosArrayInit(0, sizeof(void*));
118,590✔
353
  if(pSchemas == NULL){
118,603!
UNCOV
354
    code = terrno;
×
UNCOV
355
    goto END;
×
356
  }
357

358
  SSubmitTbData* pSubmitTbDataRet = NULL;
118,603✔
359
  int64_t createTime = INT64_MAX;
118,603✔
360
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
118,603✔
361
  if (code != 0) {
118,447✔
362
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
420✔
363
    goto END;
417✔
364
  }
365

366
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
118,027✔
367
    goto END;
113✔
368
  }
369
  if (pRsp->withTbName) {
117,914!
370
    int64_t uid = pExec->pTqReader->lastBlkUid;
117,920✔
371
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
117,920✔
372
    if (code != 0) {
117,969✔
373
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
1!
374
      goto END;
1✔
375
    }
376
  }
377
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
117,962✔
378
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
29!
379
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
29✔
380
      if (code != 0){
29!
381
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
45!
382
        goto END;
×
383
      }
384
    }
385
  }
386
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
117,917✔
387
    goto END;
5✔
388
  }
389
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
235,860✔
390
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
117,912✔
391
    if (pBlock == NULL) {
117,891!
UNCOV
392
      continue;
×
393
    }
394
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
117,867!
395
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
117,891✔
UNCOV
396
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
397
      continue;
×
398
    }
399
    *totalRows += pBlock->info.rows;
117,839✔
400
    blockDataFreeRes(pBlock);
117,839✔
401
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
118,002✔
402
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
235,925!
UNCOV
403
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
404
      continue;
×
405
    }
406
    pRsp->blockNum++;
117,948✔
407
  }
408

409
  taosArrayDestroy(pBlocks);
117,683✔
410
  taosArrayDestroy(pSchemas);
118,024✔
411
  return;
118,018✔
412

413
END:
536✔
414
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
536✔
415
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
536✔
416
}
417

418
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
122,035✔
419
                       int8_t sourceExcluded) {
420
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
122,035!
421
    return TSDB_CODE_INVALID_PARA;
2✔
422
  }
423
  STqExecHandle* pExec = &pHandle->execHandle;
122,033✔
424
  int32_t        code = 0;
122,033✔
425
  STqReader* pReader = pExec->pTqReader;
122,033✔
426
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
122,033✔
427
  if (code != 0) {
122,032!
UNCOV
428
    return code;
×
429
  }
430

431
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
122,032✔
432
    while (tqNextBlockImpl(pReader, NULL)) {
69,716✔
433
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
33,112✔
434
    }
435
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
85,415!
436
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
170,867✔
437
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
85,308✔
438
    }
439
  }
440

441
  return code;
121,985✔
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc