• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.92
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
703,291✔
19
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
703,291✔
20
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
703,063✔
21
  void*   buf = taosMemoryCalloc(1, dataStrLen);
703,063✔
22
  if (buf == NULL) {
703,333!
23
    return terrno;
×
24
  }
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
703,333✔
27
  pRetrieve->version = 1;
703,333✔
28
  pRetrieve->precision = precision;
703,333✔
29
  pRetrieve->compressed = 0;
703,333✔
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
703,333✔
31

32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
703,307✔
33
  if(actualLen < 0){
703,246!
34
    taosMemoryFree(buf);
×
35
    return terrno;
×
36
  }
37
  actualLen += sizeof(SRetrieveTableRspForTmq);
703,246✔
38
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
1,406,507!
39
    taosMemoryFree(buf);
×
40
    return terrno;
×
41
  }
42
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
1,406,417!
43
    taosMemoryFree(buf);
×
44
    return terrno;
×
45
  }
46

47
  return TSDB_CODE_SUCCESS;
703,156✔
48
}
49

50
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
×
51
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
×
52
  if (pSW == NULL) {
×
53
    return terrno;
×
54
  }
55
  if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
×
56
    return terrno;
×
57
  }
58
  return 0;
×
59
}
60

61
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
117,380✔
62
  SMetaReader mr = {0};
117,380✔
63
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
117,380✔
64

65
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
117,494✔
66
  if (code < 0) {
117,299!
UNCOV
67
    metaReaderClear(&mr);
×
UNCOV
68
    return code;
×
69
  }
70

71
  for (int32_t i = 0; i < n; i++) {
234,599✔
72
    char* tbName = taosStrdup(mr.me.name);
117,296✔
73
    if (tbName == NULL) {
117,335!
74
      metaReaderClear(&mr);
×
75
      return terrno;
×
76
    }
77
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
234,635!
78
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
79
      continue;
×
80
    }
81
  }
82
  metaReaderClear(&mr);
117,303✔
83
  return 0;
117,448✔
84
}
85

86
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
647,533✔
87
  uint64_t ts = 0;
647,533✔
88
  qStreamSetOpen(task);
647,533✔
89

90
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
647,529!
91
  int32_t code = qExecTask(task, res, &ts);
647,597✔
92
  if (code != TSDB_CODE_SUCCESS) {
647,595!
93
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
94
    return code;
×
95
  }
96

97
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
647,595!
98
  return 0;
647,602✔
99
}
100

101
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
68,095✔
102
  int32_t vgId = TD_VID(pTq->pVnode);
68,095✔
103
  int32_t code = 0;
68,095✔
104
  int32_t line = 0;
68,095✔
105
  int32_t totalRows = 0;
68,095✔
106

107
  const STqExecHandle* pExec = &pHandle->execHandle;
68,095✔
108
  qTaskInfo_t          task = pExec->task;
68,095✔
109

110
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
68,095✔
111
  TSDB_CHECK_CODE(code, line, END);
68,092✔
112

113
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
67,428✔
114
  uint64_t st = taosGetTimestampMs();
67,429✔
115
  while (1) {
580,101✔
116
    SSDataBlock* pDataBlock = NULL;
647,530✔
117
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
647,530✔
118
    TSDB_CHECK_CODE(code, line, END);
647,597!
119

120
    if (pRequest->enableReplay) {
647,597✔
121
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
33✔
122
        blockDataDestroy(pHandle->block);
1✔
123
        pHandle->block = NULL;
1✔
124
      }
125
      if (pHandle->block == NULL) {
33✔
126
        if (pDataBlock == NULL) {
11✔
127
          break;
6✔
128
        }
129

130
        STqOffsetVal offset = {0};
5✔
131
        code = qStreamExtractOffset(task, &offset);
5✔
132
        TSDB_CHECK_CODE(code, line, END);
5!
133

134
        pHandle->block = NULL;
5✔
135

136
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
5✔
137
        TSDB_CHECK_CODE(code, line, END);
5!
138

139
        pHandle->blockTime = offset.ts;
5✔
140
        tOffsetDestroy(&offset);
5✔
141
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
5✔
142
        TSDB_CHECK_CODE(code, line, END);
5!
143
      }
144

145
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
27✔
146
      TSDB_CHECK_CODE(code, line, END);
27!
147

148
      pRsp->blockNum++;
27✔
149
      if (pDataBlock == NULL) {
27✔
150
        blockDataDestroy(pHandle->block);
4✔
151
        pHandle->block = NULL;
4✔
152
      } else {
153
        code = copyDataBlock(pHandle->block, pDataBlock);
23✔
154
        TSDB_CHECK_CODE(code, line, END);
23!
155

156
        STqOffsetVal offset = {0};
23✔
157
        code = qStreamExtractOffset(task, &offset);
23✔
158
        TSDB_CHECK_CODE(code, line, END);
23!
159

160
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
23✔
161
        pHandle->blockTime = offset.ts;
23✔
162
        tOffsetDestroy(&offset);
23✔
163
      }
164
      break;
27✔
165
    } else {
166
      if (pDataBlock == NULL) {
647,564✔
167
        break;
64,147✔
168
      }
169
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
583,417✔
170
      TSDB_CHECK_CODE(code, line, END);
583,339!
171

172
      pRsp->blockNum++;
583,339✔
173
      totalRows += pDataBlock->info.rows;
583,339✔
174
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
1,164,384✔
175
        break;
176
      }
177
    }
178
  }
179

180
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
67,429!
181
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
182
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
67,430✔
183
END:
68,095✔
184
  if (code != 0) {
68,095✔
185
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
664!
186
            code);
187
  }
188
  return code;
68,095✔
189
}
190

191
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
166✔
192
  const STqExecHandle* pExec = &pHandle->execHandle;
166✔
193
  qTaskInfo_t          task = pExec->task;
166✔
194
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
166✔
195
  if (code != 0) {
166!
196
    return code;
×
197
  }
198

199
  int32_t rowCnt = 0;
166✔
200
  while (1) {
4,927✔
201
    SSDataBlock* pDataBlock = NULL;
5,093✔
202
    uint64_t     ts = 0;
5,093✔
203
    tqDebug("tmqsnap task start to execute");
5,093✔
204
    code = qExecTask(task, &pDataBlock, &ts);
5,095✔
205
    if (code != 0) {
5,094!
206
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
207
      return code;
×
208
    }
209

210
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
5,094!
211

212
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
5,094!
213
      if (pRsp->withTbName) {
2,548!
214
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
2,548✔
215
        if (tbName == NULL) {
2,546!
216
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
217
          return terrno;
×
218
        }
219
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
5,092!
220
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
221
          continue;
×
222
        }
223
      }
224
      if (pRsp->withSchema) {
2,546!
225
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
2,546✔
226
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
5,096!
227
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
228
          continue;
×
229
        }
230
      }
231

232
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
2,548!
233
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
2,548✔
234
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
235
        continue;
×
236
      }
237
      pRsp->blockNum++;
2,547✔
238
      rowCnt += pDataBlock->info.rows;
2,547✔
239
      if (rowCnt <= tmqRowSize) continue;
2,547✔
240

241
    }
242

243
    // get meta
244
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
2,663✔
245
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
2,663✔
246
      code = qStreamExtractOffset(task, &tmp->rspOffset);
16✔
247
      if (code) {
16!
248
        return code;
×
249
      }
250

251
      *pBatchMetaRsp = *tmp;
16✔
252
      tqDebug("tmqsnap task get meta");
16!
253
      break;
16✔
254
    }
255

256
    if (pDataBlock == NULL) {
2,647✔
257
      code = qStreamExtractOffset(task, pOffset);
2,530✔
258
      if (code) {
2,530!
259
        break;
×
260
      }
261

262
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,530✔
263
        continue;
2,497✔
264
      }
265

266
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
33!
267
              pHandle->snapshotVer + 1);
268
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
33✔
269
      break;
33✔
270
    }
271

272
    if (pRsp->blockNum > 0) {
117!
273
      tqDebug("tmqsnap task exec exited, get data");
117!
274
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
117✔
275
      break;
117✔
276
    }
277
  }
278

279
  return code;
166✔
280
}
281

282
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
30✔
283
  int32_t code = 0;
30✔
284
  void*   createReq = NULL;
30✔
285
  if (pRsp->createTableNum == 0) {
30✔
286
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
13✔
287
    if (pRsp->createTableLen == NULL) {
13!
288
      code = terrno;
×
289
      goto END;
×
290
    }
291
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
13✔
292
    if (pRsp->createTableReq == NULL) {
13!
293
      code = terrno;
×
294
      goto END;
×
295
    }
296
  }
297

298
  uint32_t len = 0;
30✔
299
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
30!
300
  if (TSDB_CODE_SUCCESS != code) {
30!
301
    goto END;
×
302
  }
303
  createReq = taosMemoryCalloc(1, len);
30✔
304
  if (createReq == NULL){
30!
305
    code = terrno;
×
306
    goto END;
×
307
  }
308
  SEncoder encoder = {0};
30✔
309
  tEncoderInit(&encoder, createReq, len);
30✔
310
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
30✔
311
  tEncoderClear(&encoder);
30✔
312
  if (code < 0) {
30!
313
    goto END;
×
314
  }
315
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
60!
316
    code = terrno;
×
317
    goto END;
×
318
  }
319
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
60!
320
    code = terrno;
×
321
    goto END;
×
322
  }
323
  pRsp->createTableNum++;
30✔
324

325
  return 0;
30✔
326
END:
×
327
  taosMemoryFree(createReq);
×
328
  return code;
×
329
}
330

331
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
117,675✔
332
  int32_t code = 0;
117,675✔
333
  STqExecHandle* pExec = &pHandle->execHandle;
117,675✔
334
  STqReader* pReader = pExec->pTqReader;
117,675✔
335
  SArray* pBlocks = NULL;
117,675✔
336
  SArray* pSchemas = NULL;
117,675✔
337
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
117,675✔
338
  if (pBlocks == NULL) {
117,683!
339
    code = terrno;
×
340
    goto END;
×
341
  }
342
  pSchemas = taosArrayInit(0, sizeof(void*));
117,683✔
343
  if(pSchemas == NULL){
117,694!
344
    code = terrno;
×
345
    goto END;
×
346
  }
347

348
  SSubmitTbData* pSubmitTbDataRet = NULL;
117,694✔
349
  int64_t createTime = INT64_MAX;
117,694✔
350
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
117,694✔
351
  if (code != 0) {
117,551✔
352
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
42!
353
    goto END;
48✔
354
  }
355

356
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
117,509✔
357
    goto END;
113✔
358
  }
359
  if (pRsp->withTbName) {
117,396!
360
    int64_t uid = pExec->pTqReader->lastBlkUid;
117,404✔
361
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
117,404✔
362
    if (code != 0) {
117,439!
UNCOV
363
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
364
      goto END;
×
365
    }
366
  }
367
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
117,431✔
368
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
30!
369
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
30✔
370
      if (code != 0){
30✔
371
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
28!
372
        goto END;
×
373
      }
374
    }
375
  }
376
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
117,403✔
377
    goto END;
5✔
378
  }
379
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
234,794✔
380
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
117,379✔
381
    if (pBlock == NULL) {
117,358!
382
      continue;
×
383
    }
384
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
117,339!
385
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
117,358✔
386
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
387
      continue;
×
388
    }
389
    *totalRows += pBlock->info.rows;
117,252✔
390
    blockDataFreeRes(pBlock);
117,252✔
391
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
117,411✔
392
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
234,818!
393
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
394
      continue;
×
395
    }
396
    pRsp->blockNum++;
117,396✔
397
  }
398

399
  taosArrayDestroy(pBlocks);
117,117✔
400
  taosArrayDestroy(pSchemas);
117,460✔
401
  return;
117,459✔
402

403
END:
166✔
404
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
166✔
405
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
166✔
406
}
407

408
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
121,540✔
409
                       int8_t sourceExcluded) {
410
  STqExecHandle* pExec = &pHandle->execHandle;
121,540✔
411
  int32_t        code = 0;
121,540✔
412
  STqReader* pReader = pExec->pTqReader;
121,540✔
413
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
121,540✔
414
  if (code != 0) {
121,530!
415
    return code;
×
416
  }
417

418
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
121,530✔
419
    while (tqNextBlockImpl(pReader, NULL)) {
69,618✔
420
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
33,059✔
421
    }
422
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
84,966!
423
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
169,498✔
424
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
84,432✔
425
    }
426
  }
427

428
  return code;
121,471✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc