• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.3
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
31,434✔
19
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
31,434✔
20
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
31,435✔
21
  void*   buf = taosMemoryCalloc(1, dataStrLen);
31,435✔
22
  if (buf == NULL) {
31,433!
23
    return terrno;
×
24
  }
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
31,433✔
27
  pRetrieve->version = 1;
31,433✔
28
  pRetrieve->precision = precision;
31,433✔
29
  pRetrieve->compressed = 0;
31,433✔
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
31,433✔
31

32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
31,433✔
33
  if(actualLen < 0){
31,432!
34
    taosMemoryFree(buf);
×
35
    return terrno;
×
36
  }
37
  actualLen += sizeof(SRetrieveTableRspForTmq);
31,432✔
38
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
62,864!
39
    taosMemoryFree(buf);
×
40
    return terrno;
×
41
  }
42
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
62,865!
43
    taosMemoryFree(buf);
×
44
    return terrno;
×
45
  }
46

47
  return TSDB_CODE_SUCCESS;
31,433✔
48
}
49

50
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
×
51
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
×
52
  if (pSW == NULL) {
×
53
    return terrno;
×
54
  }
55
  if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
×
56
    return terrno;
×
57
  }
58
  return 0;
×
59
}
60

61
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
2,845✔
62
  SMetaReader mr = {0};
2,845✔
63
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
2,845✔
64

65
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
2,845✔
66
  if (code < 0) {
2,845!
67
    metaReaderClear(&mr);
×
68
    return code;
×
69
  }
70

71
  for (int32_t i = 0; i < n; i++) {
5,690✔
72
    char* tbName = taosStrdup(mr.me.name);
2,845✔
73
    if (tbName == NULL) {
2,845!
74
      metaReaderClear(&mr);
×
75
      return terrno;
×
76
    }
77
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
5,690!
78
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
79
      continue;
×
80
    }
81
  }
82
  metaReaderClear(&mr);
2,845✔
83
  return 0;
2,845✔
84
}
85

86
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
31,231✔
87
  uint64_t ts = 0;
31,231✔
88
  qStreamSetOpen(task);
31,231✔
89

90
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
31,231!
91
  int32_t code = qExecTask(task, res, &ts);
31,233✔
92
  if (code != TSDB_CODE_SUCCESS) {
31,230!
93
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
94
    return code;
×
95
  }
96

97
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
31,230!
98
  return 0;
31,233✔
99
}
100

101
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
2,764✔
102
  int32_t vgId = TD_VID(pTq->pVnode);
2,764✔
103
  int32_t code = 0;
2,764✔
104
  int32_t line = 0;
2,764✔
105
  int32_t totalRows = 0;
2,764✔
106

107
  const STqExecHandle* pExec = &pHandle->execHandle;
2,764✔
108
  qTaskInfo_t          task = pExec->task;
2,764✔
109

110
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
2,764✔
111
  TSDB_CHECK_CODE(code, line, END);
2,764!
112

113
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
2,764✔
114
  uint64_t st = taosGetTimestampMs();
2,764✔
115
  while (1) {
28,467✔
116
    SSDataBlock* pDataBlock = NULL;
31,231✔
117
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
31,231✔
118
    TSDB_CHECK_CODE(code, line, END);
31,233!
119

120
    if (pRequest->enableReplay) {
31,233!
121
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
×
122
        blockDataDestroy(pHandle->block);
×
123
        pHandle->block = NULL;
×
124
      }
125
      if (pHandle->block == NULL) {
×
126
        if (pDataBlock == NULL) {
×
127
          break;
×
128
        }
129

130
        STqOffsetVal offset = {0};
×
131
        code = qStreamExtractOffset(task, &offset);
×
132
        TSDB_CHECK_CODE(code, line, END);
×
133

134
        pHandle->block = NULL;
×
135

136
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
×
137
        TSDB_CHECK_CODE(code, line, END);
×
138

139
        pHandle->blockTime = offset.ts;
×
140
        tOffsetDestroy(&offset);
×
141
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
×
142
        TSDB_CHECK_CODE(code, line, END);
×
143
      }
144

145
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
×
146
      TSDB_CHECK_CODE(code, line, END);
×
147

148
      pRsp->blockNum++;
×
149
      if (pDataBlock == NULL) {
×
150
        blockDataDestroy(pHandle->block);
×
151
        pHandle->block = NULL;
×
152
      } else {
153
        code = copyDataBlock(pHandle->block, pDataBlock);
×
154
        TSDB_CHECK_CODE(code, line, END);
×
155

156
        STqOffsetVal offset = {0};
×
157
        code = qStreamExtractOffset(task, &offset);
×
158
        TSDB_CHECK_CODE(code, line, END);
×
159

160
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
×
161
        pHandle->blockTime = offset.ts;
×
162
        tOffsetDestroy(&offset);
×
163
      }
164
      break;
×
165
    } else {
166
      if (pDataBlock == NULL) {
31,233✔
167
        break;
2,649✔
168
      }
169
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
28,584✔
170
      TSDB_CHECK_CODE(code, line, END);
28,582!
171

172
      pRsp->blockNum++;
28,582✔
173
      totalRows += pDataBlock->info.rows;
28,582✔
174
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
57,049!
175
        break;
176
      }
177
    }
178
  }
179

180
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
2,764!
181
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
182
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
2,764✔
183
END:
2,764✔
184
  if (code != 0) {
2,764!
185
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
×
186
            code);
187
  }
188
  return code;
2,764✔
189
}
190

191
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
11✔
192
  const STqExecHandle* pExec = &pHandle->execHandle;
11✔
193
  qTaskInfo_t          task = pExec->task;
11✔
194
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
11✔
195
  if (code != 0) {
11!
196
    return code;
×
197
  }
198

199
  int32_t rowCnt = 0;
11✔
200
  while (1) {
2✔
201
    SSDataBlock* pDataBlock = NULL;
13✔
202
    uint64_t     ts = 0;
13✔
203
    tqDebug("tmqsnap task start to execute");
13!
204
    code = qExecTask(task, &pDataBlock, &ts);
13✔
205
    if (code != 0) {
13!
206
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
207
      return code;
×
208
    }
209

210
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
13!
211

212
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
13!
213
      if (pRsp->withTbName) {
6!
214
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
6✔
215
        if (tbName == NULL) {
6!
216
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
217
          return terrno;
×
218
        }
219
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
12!
220
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
221
          continue;
×
222
        }
223
      }
224
      if (pRsp->withSchema) {
6!
225
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
6✔
226
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
12!
227
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
228
          continue;
×
229
        }
230
      }
231

232
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
6!
233
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
6✔
234
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
235
        continue;
×
236
      }
237
      pRsp->blockNum++;
6✔
238
      rowCnt += pDataBlock->info.rows;
6✔
239
      if (rowCnt <= tmqRowSize) continue;
6!
240

241
    }
242

243
    // get meta
244
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
13✔
245
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
13✔
246
      code = qStreamExtractOffset(task, &tmp->rspOffset);
2✔
247
      if (code) {
2!
248
        return code;
×
249
      }
250

251
      *pBatchMetaRsp = *tmp;
2✔
252
      tqDebug("tmqsnap task get meta");
2!
253
      break;
2✔
254
    }
255

256
    if (pDataBlock == NULL) {
11✔
257
      code = qStreamExtractOffset(task, pOffset);
5✔
258
      if (code) {
5!
259
        break;
×
260
      }
261

262
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
5✔
263
        continue;
2✔
264
      }
265

266
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
3!
267
              pHandle->snapshotVer + 1);
268
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
3✔
269
      break;
3✔
270
    }
271

272
    if (pRsp->blockNum > 0) {
6!
273
      tqDebug("tmqsnap task exec exited, get data");
6!
274
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
6✔
275
      break;
6✔
276
    }
277
  }
278

279
  return code;
11✔
280
}
281

282
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
×
283
  int32_t code = 0;
×
284
  void*   createReq = NULL;
×
285
  if (pRsp->createTableNum == 0) {
×
286
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
×
287
    if (pRsp->createTableLen == NULL) {
×
288
      code = terrno;
×
289
      goto END;
×
290
    }
291
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
×
292
    if (pRsp->createTableReq == NULL) {
×
293
      code = terrno;
×
294
      goto END;
×
295
    }
296
  }
297

298
  uint32_t len = 0;
×
299
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
×
300
  if (TSDB_CODE_SUCCESS != code) {
×
301
    goto END;
×
302
  }
303
  createReq = taosMemoryCalloc(1, len);
×
304
  if (createReq == NULL){
×
305
    code = terrno;
×
306
    goto END;
×
307
  }
308
  SEncoder encoder = {0};
×
309
  tEncoderInit(&encoder, createReq, len);
×
310
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
×
311
  tEncoderClear(&encoder);
×
312
  if (code < 0) {
×
313
    goto END;
×
314
  }
315
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
×
316
    code = terrno;
×
317
    goto END;
×
318
  }
319
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
×
320
    code = terrno;
×
321
    goto END;
×
322
  }
323
  pRsp->createTableNum++;
×
324

325
  return 0;
×
326
END:
×
327
  taosMemoryFree(createReq);
×
328
  return code;
×
329
}
330

331
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
2,845✔
332
  int32_t code = 0;
2,845✔
333
  STqExecHandle* pExec = &pHandle->execHandle;
2,845✔
334
  STqReader* pReader = pExec->pTqReader;
2,845✔
335
  SArray* pBlocks = NULL;
2,845✔
336
  SArray* pSchemas = NULL;
2,845✔
337
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
2,845✔
338
  if (pBlocks == NULL) {
2,845!
339
    code = terrno;
×
340
    goto END;
×
341
  }
342
  pSchemas = taosArrayInit(0, sizeof(void*));
2,845✔
343
  if(pSchemas == NULL){
2,845!
344
    code = terrno;
×
345
    goto END;
×
346
  }
347

348
  SSubmitTbData* pSubmitTbDataRet = NULL;
2,845✔
349
  int64_t createTime = INT64_MAX;
2,845✔
350
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
2,845✔
351
  if (code != 0) {
2,845!
352
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
×
353
    goto END;
×
354
  }
355

356
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
2,845!
357
    goto END;
×
358
  }
359
  if (pRsp->withTbName) {
2,845!
360
    int64_t uid = pExec->pTqReader->lastBlkUid;
2,845✔
361
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
2,845✔
362
    if (code != 0) {
2,844!
363
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
364
      goto END;
×
365
    }
366
  }
367
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
2,844!
368
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
×
369
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
×
370
      if (code != 0){
×
UNCOV
371
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
×
372
        goto END;
×
373
      }
374
    }
375
  }
376
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
2,844!
377
    goto END;
×
378
  }
379
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
5,688✔
380
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
2,844✔
381
    if (pBlock == NULL) {
2,844!
382
      continue;
×
383
    }
384
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
2,844!
385
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
2,844✔
386
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
387
      continue;
×
388
    }
389
    *totalRows += pBlock->info.rows;
2,845✔
390
    blockDataFreeRes(pBlock);
2,845✔
391
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
2,845✔
392
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
5,689!
393
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
394
      continue;
×
395
    }
396
    pRsp->blockNum++;
2,844✔
397
  }
398

399
  taosArrayDestroy(pBlocks);
2,843✔
400
  taosArrayDestroy(pSchemas);
2,845✔
401
  return;
2,845✔
402

403
END:
×
404
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
×
405
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
×
406
}
407

408
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
2,845✔
409
                       int8_t sourceExcluded) {
410
  STqExecHandle* pExec = &pHandle->execHandle;
2,845✔
411
  int32_t        code = 0;
2,845✔
412
  STqReader* pReader = pExec->pTqReader;
2,845✔
413
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
2,845✔
414
  if (code != 0) {
2,845!
415
    return code;
×
416
  }
417

418
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
2,845✔
419
    while (tqNextBlockImpl(pReader, NULL)) {
28✔
420
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
14✔
421
    }
422
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
2,831!
423
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
5,659✔
424
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
2,831✔
425
    }
426
  }
427

428
  return code;
2,845✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc