• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3545

02 Dec 2024 06:22AM UTC coverage: 60.839% (-0.04%) from 60.88%
#3545

push

travis-ci

web-flow
Merge pull request #28961 from taosdata/fix/refactor-vnode-management-open-vnode

fix/refactor-vnode-management-open-vnode

120592 of 253473 branches covered (47.58%)

Branch coverage included in aggregate %.

102 of 145 new or added lines in 3 files covered. (70.34%)

477 existing lines in 108 files now uncovered.

201840 of 276506 relevant lines covered (73.0%)

19392204.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.92
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
692,390✔
19
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
692,390✔
20
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
692,176✔
21
  void*   buf = taosMemoryCalloc(1, dataStrLen);
692,176✔
22
  if (buf == NULL) {
692,461!
23
    return terrno;
×
24
  }
25

26
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
692,461✔
27
  pRetrieve->version = 1;
692,461✔
28
  pRetrieve->precision = precision;
692,461✔
29
  pRetrieve->compressed = 0;
692,461✔
30
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
692,461✔
31

32
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
692,369✔
33
  if(actualLen < 0){
692,355!
34
    taosMemoryFree(buf);
×
35
    return terrno;
×
36
  }
37
  actualLen += sizeof(SRetrieveTableRspForTmq);
692,355✔
38
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
1,384,730!
39
    taosMemoryFree(buf);
×
40
    return terrno;
×
41
  }
42
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
1,384,608!
43
    taosMemoryFree(buf);
×
44
    return terrno;
×
45
  }
46

47
  return TSDB_CODE_SUCCESS;
692,233✔
48
}
49

50
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
×
51
  SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
×
52
  if (pSW == NULL) {
×
53
    return terrno;
×
54
  }
55
  if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
×
56
    return terrno;
×
57
  }
58
  return 0;
×
59
}
60

61
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
117,925✔
62
  SMetaReader mr = {0};
117,925✔
63
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
117,925✔
64

65
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
118,037✔
66
  if (code < 0) {
117,832!
UNCOV
67
    metaReaderClear(&mr);
×
UNCOV
68
    return code;
×
69
  }
70

71
  for (int32_t i = 0; i < n; i++) {
235,664✔
72
    char* tbName = taosStrdup(mr.me.name);
117,815✔
73
    if (tbName == NULL) {
117,883!
74
      metaReaderClear(&mr);
×
75
      return terrno;
×
76
    }
77
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
235,715!
78
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
79
      continue;
×
80
    }
81
  }
82
  metaReaderClear(&mr);
117,849✔
83
  return 0;
117,987✔
84
}
85

86
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
632,858✔
87
  uint64_t ts = 0;
632,858✔
88
  qStreamSetOpen(task);
632,858✔
89

90
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
632,851!
91
  int32_t code = qExecTask(task, res, &ts);
632,936✔
92
  if (code != TSDB_CODE_SUCCESS) {
632,934!
93
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
94
    return code;
×
95
  }
96

97
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
632,934!
98
  return 0;
632,945✔
99
}
100

101
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
64,089✔
102
  int32_t vgId = TD_VID(pTq->pVnode);
64,089✔
103
  int32_t code = 0;
64,089✔
104
  int32_t line = 0;
64,089✔
105
  int32_t totalRows = 0;
64,089✔
106

107
  const STqExecHandle* pExec = &pHandle->execHandle;
64,089✔
108
  qTaskInfo_t          task = pExec->task;
64,089✔
109

110
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
64,089✔
111
  TSDB_CHECK_CODE(code, line, END);
64,093✔
112

113
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
63,432✔
114
  uint64_t st = taosGetTimestampMs();
63,432✔
115
  while (1) {
569,429✔
116
    SSDataBlock* pDataBlock = NULL;
632,861✔
117
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
632,861✔
118
    TSDB_CHECK_CODE(code, line, END);
632,940!
119

120
    if (pRequest->enableReplay) {
632,940✔
121
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
33✔
122
        blockDataDestroy(pHandle->block);
1✔
123
        pHandle->block = NULL;
1✔
124
      }
125
      if (pHandle->block == NULL) {
33✔
126
        if (pDataBlock == NULL) {
11✔
127
          break;
6✔
128
        }
129

130
        STqOffsetVal offset = {0};
5✔
131
        code = qStreamExtractOffset(task, &offset);
5✔
132
        TSDB_CHECK_CODE(code, line, END);
5!
133

134
        pHandle->block = NULL;
5✔
135

136
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
5✔
137
        TSDB_CHECK_CODE(code, line, END);
5!
138

139
        pHandle->blockTime = offset.ts;
5✔
140
        tOffsetDestroy(&offset);
5✔
141
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
5✔
142
        TSDB_CHECK_CODE(code, line, END);
5!
143
      }
144

145
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
27✔
146
      TSDB_CHECK_CODE(code, line, END);
27!
147

148
      pRsp->blockNum++;
27✔
149
      if (pDataBlock == NULL) {
27✔
150
        blockDataDestroy(pHandle->block);
4✔
151
        pHandle->block = NULL;
4✔
152
      } else {
153
        code = copyDataBlock(pHandle->block, pDataBlock);
23✔
154
        TSDB_CHECK_CODE(code, line, END);
23!
155

156
        STqOffsetVal offset = {0};
23✔
157
        code = qStreamExtractOffset(task, &offset);
23✔
158
        TSDB_CHECK_CODE(code, line, END);
23!
159

160
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
23✔
161
        pHandle->blockTime = offset.ts;
23✔
162
        tOffsetDestroy(&offset);
23✔
163
      }
164
      break;
27✔
165
    } else {
166
      if (pDataBlock == NULL) {
632,907✔
167
        break;
60,503✔
168
      }
169
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
572,404✔
170
      TSDB_CHECK_CODE(code, line, END);
572,316!
171

172
      pRsp->blockNum++;
572,316✔
173
      totalRows += pDataBlock->info.rows;
572,316✔
174
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
1,142,339✔
175
        break;
176
      }
177
    }
178
  }
179

180
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
63,429!
181
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
182
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
63,429✔
183
END:
64,093✔
184
  if (code != 0) {
64,093✔
185
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
661!
186
            code);
187
  }
188
  return code;
64,093✔
189
}
190

191
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
77✔
192
  const STqExecHandle* pExec = &pHandle->execHandle;
77✔
193
  qTaskInfo_t          task = pExec->task;
77✔
194
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
77✔
195
  if (code != 0) {
77!
196
    return code;
×
197
  }
198

199
  int32_t rowCnt = 0;
77✔
200
  while (1) {
4,181✔
201
    SSDataBlock* pDataBlock = NULL;
4,258✔
202
    uint64_t     ts = 0;
4,258✔
203
    tqDebug("tmqsnap task start to execute");
4,258✔
204
    code = qExecTask(task, &pDataBlock, &ts);
4,262✔
205
    if (code != 0) {
4,260!
206
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
207
      return code;
×
208
    }
209

210
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
4,260!
211

212
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
4,260!
213
      if (pRsp->withTbName) {
2,127!
214
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
2,127✔
215
        if (tbName == NULL) {
2,127!
216
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
217
          return terrno;
×
218
        }
219
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
4,254!
220
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
221
          continue;
×
222
        }
223
      }
224
      if (pRsp->withSchema) {
2,127!
225
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
2,127✔
226
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
4,250!
227
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
228
          continue;
×
229
        }
230
      }
231

232
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
2,125!
233
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
2,125✔
234
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
235
        continue;
×
236
      }
237
      pRsp->blockNum++;
2,125✔
238
      rowCnt += pDataBlock->info.rows;
2,125✔
239
      if (rowCnt <= tmqRowSize) continue;
2,125✔
240

241
    }
242

243
    // get meta
244
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
2,165✔
245
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
2,165✔
246
      code = qStreamExtractOffset(task, &tmp->rspOffset);
16✔
247
      if (code) {
16!
248
        return code;
×
249
      }
250

251
      *pBatchMetaRsp = *tmp;
16✔
252
      tqDebug("tmqsnap task get meta");
16!
253
      break;
16✔
254
    }
255

256
    if (pDataBlock == NULL) {
2,149✔
257
      code = qStreamExtractOffset(task, pOffset);
2,117✔
258
      if (code) {
2,117!
259
        break;
×
260
      }
261

262
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
2,117✔
263
        continue;
2,088✔
264
      }
265

266
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
29!
267
              pHandle->snapshotVer + 1);
268
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
29✔
269
      break;
29✔
270
    }
271

272
    if (pRsp->blockNum > 0) {
32!
273
      tqDebug("tmqsnap task exec exited, get data");
32!
274
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
32✔
275
      break;
32✔
276
    }
277
  }
278

279
  return code;
77✔
280
}
281

282
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
30✔
283
  int32_t code = 0;
30✔
284
  void*   createReq = NULL;
30✔
285
  if (pRsp->createTableNum == 0) {
30✔
286
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
13✔
287
    if (pRsp->createTableLen == NULL) {
13!
288
      code = terrno;
×
289
      goto END;
×
290
    }
291
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
13✔
292
    if (pRsp->createTableReq == NULL) {
13!
293
      code = terrno;
×
294
      goto END;
×
295
    }
296
  }
297

298
  uint32_t len = 0;
30✔
299
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
30!
300
  if (TSDB_CODE_SUCCESS != code) {
30!
301
    goto END;
×
302
  }
303
  createReq = taosMemoryCalloc(1, len);
30✔
304
  if (createReq == NULL){
30!
305
    code = terrno;
×
306
    goto END;
×
307
  }
308
  SEncoder encoder = {0};
30✔
309
  tEncoderInit(&encoder, createReq, len);
30✔
310
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
30✔
311
  tEncoderClear(&encoder);
30✔
312
  if (code < 0) {
30!
313
    goto END;
×
314
  }
315
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
60!
316
    code = terrno;
×
317
    goto END;
×
318
  }
319
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
60!
320
    code = terrno;
×
321
    goto END;
×
322
  }
323
  pRsp->createTableNum++;
30✔
324

325
  return 0;
30✔
326
END:
×
327
  taosMemoryFree(createReq);
×
328
  return code;
×
329
}
330

331
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
118,232✔
332
  int32_t code = 0;
118,232✔
333
  STqExecHandle* pExec = &pHandle->execHandle;
118,232✔
334
  STqReader* pReader = pExec->pTqReader;
118,232✔
335
  SArray* pBlocks = NULL;
118,232✔
336
  SArray* pSchemas = NULL;
118,232✔
337
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
118,232✔
338
  if (pBlocks == NULL) {
118,226!
339
    code = terrno;
×
340
    goto END;
×
341
  }
342
  pSchemas = taosArrayInit(0, sizeof(void*));
118,226✔
343
  if(pSchemas == NULL){
118,238!
344
    code = terrno;
×
345
    goto END;
×
346
  }
347

348
  SSubmitTbData* pSubmitTbDataRet = NULL;
118,238✔
349
  int64_t createTime = INT64_MAX;
118,238✔
350
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
118,238✔
351
  if (code != 0) {
118,081✔
352
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
1!
353
    goto END;
22✔
354
  }
355

356
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
118,080✔
357
    goto END;
113✔
358
  }
359
  if (pRsp->withTbName) {
117,967✔
360
    int64_t uid = pExec->pTqReader->lastBlkUid;
117,964✔
361
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
117,964✔
362
    if (code != 0) {
117,983!
UNCOV
363
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
364
      goto END;
×
365
    }
366
  }
367
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
117,986✔
368
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
30!
369
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
30✔
370
      if (code != 0){
30!
371
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
54!
372
        goto END;
×
373
      }
374
    }
375
  }
376
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
117,932✔
377
    goto END;
5✔
378
  }
379
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
235,884✔
380
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
117,904✔
381
    if (pBlock == NULL) {
117,885!
382
      continue;
×
383
    }
384
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
117,889!
385
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
117,885✔
386
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
387
      continue;
×
388
    }
389
    *totalRows += pBlock->info.rows;
117,816✔
390
    blockDataFreeRes(pBlock);
117,816✔
391
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
118,010✔
392
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
235,967!
393
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
394
      continue;
×
395
    }
396
    pRsp->blockNum++;
117,957✔
397
  }
398

399
  taosArrayDestroy(pBlocks);
117,712✔
400
  taosArrayDestroy(pSchemas);
118,021✔
401
  return;
118,032✔
402

403
END:
140✔
404
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
140✔
405
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
140✔
406
}
407

408
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
122,068✔
409
                       int8_t sourceExcluded) {
410
  STqExecHandle* pExec = &pHandle->execHandle;
122,068✔
411
  int32_t        code = 0;
122,068✔
412
  STqReader* pReader = pExec->pTqReader;
122,068✔
413
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
122,068✔
414
  if (code != 0) {
122,059!
415
    return code;
×
416
  }
417

418
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
122,059✔
419
    while (tqNextBlockImpl(pReader, NULL)) {
69,949✔
420
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
33,227✔
421
    }
422
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
85,328!
423
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
170,261✔
424
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
84,867✔
425
    }
426
  }
427

428
  return code;
121,992✔
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc