• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.92
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
358,820✔
19
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
358,820✔
20
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
358,793✔
21
  void*   buf = taosMemoryCalloc(1, dataStrLen);
358,793✔
22
  if (buf == NULL) {
358,791!
23
    return terrno;
×
24
  }
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
358,791✔
27
  pRetrieve->version = 1;
358,791✔
28
  pRetrieve->precision = precision;
358,791✔
29
  pRetrieve->compressed = 0;
358,791✔
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
358,791✔
31

32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
358,809✔
33
  if(actualLen < 0){
358,857!
34
    taosMemoryFree(buf);
×
35
    return terrno;
×
36
  }
37
  actualLen += sizeof(SRetrieveTableRspForTmq);
358,857✔
38
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
717,681!
39
    taosMemoryFree(buf);
×
40
    return terrno;
×
41
  }
42
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
717,640!
43
    taosMemoryFree(buf);
×
44
    return terrno;
×
45
  }
46

47
  return TSDB_CODE_SUCCESS;
358,816✔
48
}
49

50
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
×
51
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
×
52
  if (pSW == NULL) {
×
53
    return terrno;
×
54
  }
55
  if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
×
56
    return terrno;
×
57
  }
58
  return 0;
×
59
}
60

61
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
62,852✔
62
  SMetaReader mr = {0};
62,852✔
63
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
62,852✔
64

65
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
62,895✔
66
  if (code < 0) {
62,814!
UNCOV
67
    metaReaderClear(&mr);
×
UNCOV
68
    return code;
×
69
  }
70

71
  for (int32_t i = 0; i < n; i++) {
125,629✔
72
    char* tbName = taosStrdup(mr.me.name);
62,815✔
73
    if (tbName == NULL) {
62,826!
74
      metaReaderClear(&mr);
×
75
      return terrno;
×
76
    }
77
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
125,641!
78
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
79
      continue;
×
80
    }
81
  }
82
  metaReaderClear(&mr);
62,814✔
83
  return 0;
62,862✔
84
}
85

86
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
342,595✔
87
  uint64_t ts = 0;
342,595✔
88
  qStreamSetOpen(task);
342,595✔
89

90
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
342,590!
91
  int32_t code = qExecTask(task, res, &ts);
342,617✔
92
  if (code != TSDB_CODE_SUCCESS) {
342,613!
93
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
94
    return code;
×
95
  }
96

97
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
342,613!
98
  return 0;
342,620✔
99
}
100

101
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
49,433✔
102
  int32_t vgId = TD_VID(pTq->pVnode);
49,433✔
103
  int32_t code = 0;
49,433✔
104
  int32_t line = 0;
49,433✔
105
  int32_t totalRows = 0;
49,433✔
106

107
  const STqExecHandle* pExec = &pHandle->execHandle;
49,433✔
108
  qTaskInfo_t          task = pExec->task;
49,433✔
109

110
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
49,433✔
111
  TSDB_CHECK_CODE(code, line, END);
49,431✔
112

113
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
49,162✔
114
  uint64_t st = taosGetTimestampMs();
49,160✔
115
  while (1) {
293,437✔
116
    SSDataBlock* pDataBlock = NULL;
342,597✔
117
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
342,597✔
118
    TSDB_CHECK_CODE(code, line, END);
342,615!
119

120
    if (pRequest->enableReplay) {
342,615✔
121
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
33✔
122
        blockDataDestroy(pHandle->block);
1✔
123
        pHandle->block = NULL;
1✔
124
      }
125
      if (pHandle->block == NULL) {
33✔
126
        if (pDataBlock == NULL) {
11✔
127
          break;
6✔
128
        }
129

130
        STqOffsetVal offset = {0};
5✔
131
        code = qStreamExtractOffset(task, &offset);
5✔
132
        TSDB_CHECK_CODE(code, line, END);
5!
133

134
        pHandle->block = NULL;
5✔
135

136
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
5✔
137
        TSDB_CHECK_CODE(code, line, END);
5!
138

139
        pHandle->blockTime = offset.ts;
5✔
140
        tOffsetDestroy(&offset);
5✔
141
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
5✔
142
        TSDB_CHECK_CODE(code, line, END);
5!
143
      }
144

145
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
27✔
146
      TSDB_CHECK_CODE(code, line, END);
27!
147

148
      pRsp->blockNum++;
27✔
149
      if (pDataBlock == NULL) {
27✔
150
        blockDataDestroy(pHandle->block);
4✔
151
        pHandle->block = NULL;
4✔
152
      } else {
153
        code = copyDataBlock(pHandle->block, pDataBlock);
23✔
154
        TSDB_CHECK_CODE(code, line, END);
23!
155

156
        STqOffsetVal offset = {0};
23✔
157
        code = qStreamExtractOffset(task, &offset);
23✔
158
        TSDB_CHECK_CODE(code, line, END);
23!
159

160
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
23✔
161
        pHandle->blockTime = offset.ts;
23✔
162
        tOffsetDestroy(&offset);
23✔
163
      }
164
      break;
27✔
165
    } else {
166
      if (pDataBlock == NULL) {
342,582✔
167
        break;
46,655✔
168
      }
169
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
295,927✔
170
      TSDB_CHECK_CODE(code, line, END);
295,909!
171

172
      pRsp->blockNum++;
295,909✔
173
      totalRows += pDataBlock->info.rows;
295,909✔
174
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
590,716✔
175
        break;
176
      }
177
    }
178
  }
179

180
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
49,156!
181
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
182
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
49,156✔
183
END:
49,433✔
184
  if (code != 0) {
49,433✔
185
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
269!
186
            code);
187
  }
188
  return code;
49,433✔
189
}
190

191
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
41✔
192
  const STqExecHandle* pExec = &pHandle->execHandle;
41✔
193
  qTaskInfo_t          task = pExec->task;
41✔
194
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
41✔
195
  if (code != 0) {
41!
196
    return code;
×
197
  }
198

199
  int32_t rowCnt = 0;
41✔
200
  while (1) {
147✔
201
    SSDataBlock* pDataBlock = NULL;
188✔
202
    uint64_t     ts = 0;
188✔
203
    tqDebug("tmqsnap task start to execute");
188!
204
    code = qExecTask(task, &pDataBlock, &ts);
188✔
205
    if (code != 0) {
188!
206
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
207
      return code;
×
208
    }
209

210
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
188!
211

212
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
188!
213
      if (pRsp->withTbName) {
73!
214
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
73✔
215
        if (tbName == NULL) {
73!
216
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
217
          return terrno;
×
218
        }
219
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
146!
220
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
221
          continue;
×
222
        }
223
      }
224
      if (pRsp->withSchema) {
73!
225
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
73✔
226
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
146!
227
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
228
          continue;
×
229
        }
230
      }
231

232
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
73!
233
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
73✔
234
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
235
        continue;
×
236
      }
237
      pRsp->blockNum++;
73✔
238
      rowCnt += pDataBlock->info.rows;
73✔
239
      if (rowCnt <= tmqRowSize) continue;
73✔
240

241
    }
242

243
    // get meta
244
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
121✔
245
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
121✔
246
      code = qStreamExtractOffset(task, &tmp->rspOffset);
16✔
247
      if (code) {
16!
248
        return code;
×
249
      }
250

251
      *pBatchMetaRsp = *tmp;
16✔
252
      tqDebug("tmqsnap task get meta");
16!
253
      break;
16✔
254
    }
255

256
    if (pDataBlock == NULL) {
105✔
257
      code = qStreamExtractOffset(task, pOffset);
99✔
258
      if (code) {
99!
259
        break;
×
260
      }
261

262
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
99✔
263
        continue;
80✔
264
      }
265

266
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
19!
267
              pHandle->snapshotVer + 1);
268
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
19✔
269
      break;
19✔
270
    }
271

272
    if (pRsp->blockNum > 0) {
6!
273
      tqDebug("tmqsnap task exec exited, get data");
6!
274
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
6✔
275
      break;
6✔
276
    }
277
  }
278

279
  return code;
41✔
280
}
281

282
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
29✔
283
  int32_t code = 0;
29✔
284
  void*   createReq = NULL;
29✔
285
  if (pRsp->createTableNum == 0) {
29✔
286
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
12✔
287
    if (pRsp->createTableLen == NULL) {
12!
288
      code = terrno;
×
289
      goto END;
×
290
    }
291
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
12✔
292
    if (pRsp->createTableReq == NULL) {
12!
293
      code = terrno;
×
294
      goto END;
×
295
    }
296
  }
297

298
  uint32_t len = 0;
29✔
299
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
29!
300
  if (TSDB_CODE_SUCCESS != code) {
29!
301
    goto END;
×
302
  }
303
  createReq = taosMemoryCalloc(1, len);
29✔
304
  if (createReq == NULL){
29!
305
    code = terrno;
×
306
    goto END;
×
307
  }
308
  SEncoder encoder = {0};
29✔
309
  tEncoderInit(&encoder, createReq, len);
29✔
310
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
29✔
311
  tEncoderClear(&encoder);
29✔
312
  if (code < 0) {
29!
313
    goto END;
×
314
  }
315
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
58!
316
    code = terrno;
×
317
    goto END;
×
318
  }
319
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
58!
320
    code = terrno;
×
321
    goto END;
×
322
  }
323
  pRsp->createTableNum++;
29✔
324

325
  return 0;
29✔
326
END:
×
327
  taosMemoryFree(createReq);
×
328
  return code;
×
329
}
330

331
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
63,032✔
332
  int32_t code = 0;
63,032✔
333
  STqExecHandle* pExec = &pHandle->execHandle;
63,032✔
334
  STqReader* pReader = pExec->pTqReader;
63,032✔
335
  SArray* pBlocks = NULL;
63,032✔
336
  SArray* pSchemas = NULL;
63,032✔
337
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
63,032✔
338
  if (pBlocks == NULL) {
63,032!
339
    code = terrno;
×
340
    goto END;
×
341
  }
342
  pSchemas = taosArrayInit(0, sizeof(void*));
63,032✔
343
  if(pSchemas == NULL){
63,037!
344
    code = terrno;
×
345
    goto END;
×
346
  }
347

348
  SSubmitTbData* pSubmitTbDataRet = NULL;
63,037✔
349
  int64_t createTime = INT64_MAX;
63,037✔
350
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
63,037✔
351
  if (code != 0) {
62,984✔
352
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
23✔
353
    goto END;
22✔
354
  }
355

356
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
62,961✔
357
    goto END;
113✔
358
  }
359
  if (pRsp->withTbName) {
62,848!
360
    int64_t uid = pExec->pTqReader->lastBlkUid;
62,857✔
361
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
62,857✔
362
    if (code != 0) {
62,856!
UNCOV
363
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
364
      goto END;
×
365
    }
366
  }
367
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
62,847✔
368
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
29!
369
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
29✔
370
      if (code != 0){
29✔
371
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
3!
372
        goto END;
×
373
      }
374
    }
375
  }
376
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
62,844✔
377
    goto END;
5✔
378
  }
379
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
125,689✔
380
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
62,823✔
381
    if (pBlock == NULL) {
62,812!
382
      continue;
×
383
    }
384
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
62,817!
385
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
62,812✔
386
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
387
      continue;
×
388
    }
389
    *totalRows += pBlock->info.rows;
62,837✔
390
    blockDataFreeRes(pBlock);
62,837✔
391
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
62,873✔
392
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
125,718!
393
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
394
      continue;
×
395
    }
396
    pRsp->blockNum++;
62,850✔
397
  }
398

399
  taosArrayDestroy(pBlocks);
62,776✔
400
  taosArrayDestroy(pSchemas);
62,881✔
401
  return;
62,886✔
402

403
END:
140✔
404
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
140✔
405
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
140✔
406
}
407

408
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
63,378✔
409
                       int8_t sourceExcluded) {
410
  STqExecHandle* pExec = &pHandle->execHandle;
63,378✔
411
  int32_t        code = 0;
63,378✔
412
  STqReader* pReader = pExec->pTqReader;
63,378✔
413
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
63,378✔
414
  if (code != 0) {
63,377!
415
    return code;
×
416
  }
417

418
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
63,377✔
419
    while (tqNextBlockImpl(pReader, NULL)) {
57,183✔
420
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
28,590✔
421
    }
422
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
34,782!
423
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
69,223✔
424
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
34,457✔
425
    }
426
  }
427

428
  return code;
63,348✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc