• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.11
/source/libs/executor/src/querytask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "os.h"
20
#include "querynodes.h"
21
#include "tfill.h"
22
#include "tname.h"
23

24
#include "tdatablock.h"
25
#include "tmsg.h"
26

27
#include "executorInt.h"
28
#include "index.h"
29
#include "operator.h"
30
#include "query.h"
31
#include "querytask.h"
32
#include "storageapi.h"
33
#include "thash.h"
34
#include "ttypes.h"
35

36
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
37

38
int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI,
8,130,045✔
39
                     SExecTaskInfo** pTaskInfo) {
40
  if (pTaskInfo == NULL) {
8,130,045!
41
    return TSDB_CODE_SUCCESS;
×
42
  }
43

44
  SExecTaskInfo* p = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
8,130,045!
45
  if (p == NULL) {
8,130,374!
46
    return terrno;
×
47
  }
48

49
  setTaskStatus(p, TASK_NOT_COMPLETED);
8,130,374✔
50
  p->cost.created = taosGetTimestampUs();
8,134,567✔
51

52
  p->execModel = model;
8,134,567✔
53
  p->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
8,134,567✔
54
  p->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
8,136,011✔
55
  if (p->stopInfo.pStopInfo == NULL || p->pResultBlockList == NULL) {
8,129,502!
56
    doDestroyTask(p);
×
57
    return terrno;
×
58
  }
59

60
  p->storageAPI = *pAPI;
8,130,469✔
61
  taosInitRWLatch(&p->lock);
8,130,469✔
62

63
  p->id.vgId = vgId;
8,136,101✔
64
  p->id.queryId = queryId;
8,136,101✔
65
  p->id.taskId = taskId;
8,136,101✔
66
  p->id.str = taosMemoryMalloc(64);
8,136,101!
67
  if (p->id.str == NULL) {
8,135,955!
68
    doDestroyTask(p);
×
69
    return terrno;
×
70
  }
71

72
  buildTaskId(taskId, queryId, p->id.str);
8,135,955✔
73
  p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
8,135,360✔
74
  if (p->id.str == NULL || p->schemaInfos == NULL) {
8,133,278!
75
    doDestroyTask(p);
×
76
    return terrno;
×
77
  }
78

79
  *pTaskInfo = p;
8,133,780✔
80
  return TSDB_CODE_SUCCESS;
8,133,780✔
81
}
82

83
bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); }
46,904,955✔
84

85
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
10,617✔
86
  pTaskInfo->code = rspCode;
10,617✔
87
  (void)stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI);
10,617✔
88
}
10,619✔
89

90
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
25,443,949✔
91
  if (status == TASK_NOT_COMPLETED) {
25,443,949✔
92
    pTaskInfo->status = status;
8,131,980✔
93
  } else {
94
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
95
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
17,311,969✔
96
    pTaskInfo->status |= status;
17,311,969✔
97
  }
98
}
25,443,949✔
99

100
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
8,129,766✔
101
                           int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
102
  int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
8,129,766✔
103
  if (*pTaskInfo == NULL || code != 0) {
8,133,736!
104
    nodesDestroyNode((SNode*)pPlan);
×
105
    return code;
×
106
  }
107

108
  if (pHandle) {
8,134,125✔
109
    if (pHandle->pStateBackend) {
8,132,739✔
110
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
7,530✔
111
      (*pTaskInfo)->streamInfo.pOtherState = pHandle->pOtherBackend;
7,530✔
112
    }
113
  }
114

115
  if (NULL != sql) {
8,134,125✔
116
    (*pTaskInfo)->sql = taosStrdup(sql);
7,986,426!
117
    if (NULL == (*pTaskInfo)->sql) {
7,979,341!
118
      code = terrno;
×
119
      nodesDestroyNode((SNode*)pPlan);
×
120
      doDestroyTask(*pTaskInfo);
×
121
      (*pTaskInfo) = NULL;
×
UNCOV
122
      return code;
×
123
    }
124
  }
125

126
  (*pTaskInfo)->pSubplan = pPlan;
8,127,040✔
127
  (*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb;
8,127,040✔
128
  code = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user,
8,127,040✔
129
                        pPlan->dbFName, &((*pTaskInfo)->pRoot), model);
8,127,040✔
130

131
  if (NULL == (*pTaskInfo)->pRoot || code != 0) {
8,125,485!
UNCOV
132
    doDestroyTask(*pTaskInfo);
×
133
    (*pTaskInfo) = NULL;
328✔
134
  }
135
  return code;
8,133,280✔
136
}
137

138
void cleanupQueriedTableScanInfo(void* p) {
4,769,001✔
139
  SSchemaInfo* pSchemaInfo = p;
4,769,001✔
140

141
  taosMemoryFreeClear(pSchemaInfo->dbname);
4,769,001!
142
  taosMemoryFreeClear(pSchemaInfo->tablename);
4,768,882!
143
  tDeleteSchemaWrapper(pSchemaInfo->sw);
4,769,095!
144
  tDeleteSchemaWrapper(pSchemaInfo->qsw);
4,769,459✔
145
}
4,769,346✔
146

147
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
4,765,364✔
148
                                   SExecTaskInfo* pTaskInfo) {
149
  SMetaReader mr = {0};
4,765,364✔
150
  if (pHandle == NULL) {
4,765,364!
UNCOV
151
    return TSDB_CODE_INVALID_PARA;
×
152
  }
153

154
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
4,765,364✔
155

156
  pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pAPI->metaFn);
4,765,364✔
157
  int32_t code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, pScanNode->uid);
4,768,623✔
158
  if (code != TSDB_CODE_SUCCESS) {
4,756,748✔
159
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
120!
160
           GET_TASKID(pTaskInfo));
161

162
    pAPI->metaReaderFn.clearReader(&mr);
120✔
163
    return code;
120✔
164
  }
165

166
  SSchemaInfo schemaInfo = {0};
4,756,628✔
167

168
  schemaInfo.tablename = taosStrdup(mr.me.name);
4,756,628✔
169
  schemaInfo.dbname = taosStrdup(dbName);
4,761,187✔
170
  if (schemaInfo.tablename == NULL || schemaInfo.dbname == NULL) {
4,765,777!
171
    pAPI->metaReaderFn.clearReader(&mr);
×
172
    cleanupQueriedTableScanInfo(&schemaInfo);
×
UNCOV
173
    return terrno;
×
174
  }
175

176
  if (mr.me.type == TSDB_SUPER_TABLE) {
4,765,824✔
177
    schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4,037,670✔
178
    schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
4,037,670✔
179
  } else if (mr.me.type == TSDB_CHILD_TABLE || mr.me.type == TSDB_VIRTUAL_CHILD_TABLE) {
727,139✔
180
    tDecoderClear(&mr.coder);
450,306✔
181

182
    tb_uid_t suid = mr.me.ctbEntry.suid;
444,905✔
183
    code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
444,905✔
184
    if (code != TSDB_CODE_SUCCESS) {
444,862!
185
      pAPI->metaReaderFn.clearReader(&mr);
×
186
      cleanupQueriedTableScanInfo(&schemaInfo);
×
UNCOV
187
      return code;
×
188
    }
189

190
    schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
444,881✔
191
    schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
444,881✔
192
  } else {
193
    schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
276,914✔
194
  }
195

196
  pAPI->metaReaderFn.clearReader(&mr);
4,759,465✔
197

198
  if (schemaInfo.sw == NULL) {
4,766,399!
199
    cleanupQueriedTableScanInfo(&schemaInfo);
×
UNCOV
200
    return terrno;
×
201
  }
202

203
  schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
4,766,399✔
204
  if (schemaInfo.qsw == NULL) {
4,763,501!
205
    cleanupQueriedTableScanInfo(&schemaInfo);
×
UNCOV
206
    return terrno;
×
207
  }
208

209
  void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
4,763,501✔
210
  if (p == NULL) {
4,762,402!
211
    cleanupQueriedTableScanInfo(&schemaInfo);
×
UNCOV
212
    return terrno;
×
213
  }
214

215
  return code;
4,762,402✔
216
}
217

218
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
4,760,773✔
219
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
4,760,773!
220
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);
4,760,773✔
221

222
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
4,760,773!
223
  if (pqSw == NULL) {
4,767,282!
UNCOV
224
    return NULL;
×
225
  }
226

227
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
4,767,282!
228
  if (pqSw->pSchema == NULL) {
4,760,391!
229
    taosMemoryFree(pqSw);
×
UNCOV
230
    return NULL;
×
231
  }
232

233
  for (int32_t i = 0; i < numOfCols; ++i) {
27,832,989✔
234
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
23,071,590✔
235
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
23,072,598✔
236

237
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
23,072,598✔
238
    pSchema->colId = pColNode->colId;
23,072,598✔
239
    pSchema->type = pColNode->node.resType.type;
23,072,598✔
240
    pSchema->bytes = pColNode->node.resType.bytes;
23,072,598✔
241
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
23,072,598✔
242
  }
243

244
  // this the tags and pseudo function columns, we only keep the tag columns
245
  for (int32_t i = 0; i < numOfTags; ++i) {
11,650,676✔
246
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);
6,890,402✔
247

248
    int32_t type = nodeType(pNode->pExpr);
6,889,277✔
249
    if (type == QUERY_NODE_COLUMN) {
6,889,277✔
250
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
6,265,275✔
251

252
      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
6,265,275✔
253
      pSchema->colId = pColNode->colId;
6,265,275✔
254
      pSchema->type = pColNode->node.resType.type;
6,265,275✔
255
      pSchema->bytes = pColNode->node.resType.bytes;
6,265,275✔
256
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
6,265,275✔
257
    }
258
  }
259

260
  return pqSw;
4,760,274✔
261
}
262

263
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
8,138,187✔
264
  tDeleteSchemaWrapper(pStreamInfo->schema);
8,138,187✔
265
  tOffsetDestroy(&pStreamInfo->currentOffset);
8,138,057✔
266
  tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
8,138,654!
267
  taosMemoryFree(pStreamInfo->stbFullName);
8,137,585✔
268
}
8,138,282✔
269

270
static void freeBlock(void* pParam) {
10,561,151✔
271
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
10,561,151✔
272
  blockDataDestroy(pBlock);
10,561,151✔
273
}
10,561,359✔
274

275
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
8,136,259✔
276
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
8,136,259✔
277
  destroyOperator(pTaskInfo->pRoot);
8,136,259✔
278
  pTaskInfo->pRoot = NULL;
8,139,075✔
279

280
  taosArrayDestroyEx(pTaskInfo->schemaInfos, cleanupQueriedTableScanInfo);
8,139,075✔
281
  cleanupStreamInfo(&pTaskInfo->streamInfo);
8,139,290✔
282

283
  if (!pTaskInfo->localFetch.localExec) {
8,137,989✔
284
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
8,066,702✔
285
    pTaskInfo->pSubplan = NULL;
8,067,706✔
286
  }
287

288
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
8,138,993✔
289
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
8,139,203✔
290
  taosMemoryFreeClear(pTaskInfo->sql);
8,139,448!
291
  taosMemoryFreeClear(pTaskInfo->id.str);
8,139,343!
292
  taosMemoryFreeClear(pTaskInfo);
8,139,434!
293
}
8,139,358✔
294

295
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) {
8,394,045✔
296
  char* p = dst;
8,394,045✔
297

298
  int32_t offset = 6;
8,394,045✔
299
  memcpy(p, "TID:0x", offset);
8,394,045✔
300
  offset += tintToHex(taskId, &p[offset]);
8,394,045✔
301

302
  memcpy(&p[offset], " QID:0x", 7);
8,397,435✔
303
  offset += 7;
8,397,435✔
304
  offset += tintToHex(queryId, &p[offset]);
8,397,435✔
305

306
  p[offset] = 0;
8,396,683✔
307
}
8,396,683✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc