• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.3
/source/libs/executor/src/dataDispatcher.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDispatchBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDispatchBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t rawLen;
35
  int32_t dataLen;
36
  int32_t numOfRows;
37
  int32_t numOfCols;
38
  int8_t  compressed;
39
  char    data[];
40
} SDataCacheEntry;
41

42
typedef struct SDataDispatchHandle {
43
  SDataSinkHandle     sink;
44
  SDataSinkManager*   pManager;
45
  SDataBlockDescNode* pSchema;
46
  SDataSinkNode*      pSinkNode;
47
  STaosQueue*         pDataBlocks;
48
  SDataDispatchBuf    nextOutput;
49
  int32_t             outPutColCounts;
50
  int32_t             status;
51
  bool                queryEnd;
52
  uint64_t            useconds;
53
  uint64_t            cachedSize;
54
  uint64_t            flags;
55
  void*               pCompressBuf;
56
  int32_t             bufSize;
57
  TdThreadMutex       mutex;
58
} SDataDispatchHandle;
59

60
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput)  {
17,296,669✔
61
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
17,296,669!
62
    return TSDB_CODE_SUCCESS;
×
63
  }
64
  if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
17,296,669!
65
    qError("invalid input data");
×
66
    return TSDB_CODE_QRY_INVALID_INPUT;
×
67
  }
68
  SDataBlockDescNode* pSchema = pHandle->pSchema;
17,297,216✔
69
  if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
17,297,216!
70
    qError("invalid schema");
×
71
    return TSDB_CODE_QRY_INVALID_INPUT;
×
72
  }
73

74
  if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
17,297,793✔
75
    qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
125!
76
    return TSDB_CODE_QRY_INVALID_INPUT;
×
77
  }
78

79
  SNode*  pNode;
80
  int32_t colNum = 0;
17,294,566✔
81
  FOREACH(pNode, pHandle->pSchema->pSlots) {
66,995,233!
82
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
49,714,009✔
83
    if (pSlotDesc->output) {
49,714,009✔
84
      SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
49,057,228✔
85
      if (pColInfoData == NULL) {
49,041,239!
86
        return -1;
×
87
      }
88
      if (pColInfoData->info.bytes < 0) {
49,041,239!
89
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
90
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
91
      }
92
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
49,041,239!
93
          TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
40,129,022!
94
        qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
×
95
               pColInfoData->info.bytes);
96
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
97
      }
98
      if (pColInfoData->info.type != pSlotDesc->dataType.type) {
49,041,239!
99
        qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
×
100
        return TSDB_CODE_QRY_INVALID_INPUT;
×
101
      }
102
      if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
49,041,239!
103
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
104
        return TSDB_CODE_QRY_INVALID_INPUT;
×
105
      }
106

107
      if (IS_INVALID_TYPE(pColInfoData->info.type)) {
49,041,239!
108
        qError("invalid column type, type:%d", pColInfoData->info.type);
×
109
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
110
      }
111
      ++colNum;
49,043,886✔
112
    }
113
  }
114

115

116
  return TSDB_CODE_SUCCESS;
17,281,224✔
117
}
118

119
// clang-format off
120
// data format:
121
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
122
// |SDataCacheEntry |  version         | total length | numOfRows    |     group id     | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data |
123
// |                |  sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols        | actual size |           |                         |
124
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
125
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
126
// recorded in the first segment, next to the struct header
127
// clang-format on
128
static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
17,297,397✔
129
  int32_t numOfCols = 0;
17,297,397✔
130
  SNode*  pNode;
131

132
  int32_t code = inputSafetyCheck(pHandle, pInput);
17,297,397✔
133
  if (code) {
17,259,247!
134
    qError("failed to check input data, code:%d", code);
×
135
    return code;
×
136
  }
137

138
  FOREACH(pNode, pHandle->pSchema->pSlots) {
67,076,402!
139
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
49,816,242✔
140
    if (pSlotDesc->output) {
49,816,242✔
141
      ++numOfCols;
49,155,777✔
142
    }
143
  }
144

145
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
17,260,160✔
146
  pEntry->compressed = 0;
17,260,160✔
147
  pEntry->numOfRows = pInput->pData->info.rows;
17,260,160✔
148
  pEntry->numOfCols = numOfCols;
17,260,160✔
149
  pEntry->dataLen = 0;
17,260,160✔
150
  pEntry->rawLen = 0;
17,260,160✔
151

152
  pBuf->useSize = sizeof(SDataCacheEntry);
17,260,160✔
153

154
  {
155
    // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
156
    size_t dataEncodeBufSize = pBuf->allocSize + 8;
17,260,160✔
157
    if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
17,260,160!
158
      if (pHandle->pCompressBuf == NULL) {
×
159
        pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
×
160
        if (NULL == pHandle->pCompressBuf) {
×
161
          QRY_RET(terrno);
×
162
        }
163
        pHandle->bufSize = dataEncodeBufSize;
×
164
      } else {
165
        if (pHandle->bufSize < dataEncodeBufSize) {
×
166
          pHandle->bufSize = dataEncodeBufSize;
×
167
          void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
×
168
          if (p != NULL) {
×
169
            pHandle->pCompressBuf = p;
×
170
          } else {
171
            qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno);
×
172
            return terrno;
×
173
          }
174
        }
175
      }
176

177
      int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
×
178
      if(dataLen < 0) {
×
179
        qError("failed to encode data block, code: %d", dataLen);
×
180
        return terrno;
×
181
      }
182
      int32_t len =
183
          tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
×
184
      if (len < dataLen) {
×
185
        pEntry->compressed = 1;
×
186
        pEntry->dataLen = len;
×
187
        pEntry->rawLen = dataLen;
×
188
      } else {  // no need to compress data
189
        pEntry->compressed = 0;
×
190
        pEntry->dataLen = dataLen;
×
191
        pEntry->rawLen = dataLen;
×
192
        TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
×
193
      }
194
    } else {
195
      pEntry->dataLen = blockEncode(pInput->pData, pEntry->data,  pBuf->allocSize, numOfCols);
17,260,160✔
196
      if(pEntry->dataLen < 0) {
17,271,974!
197
        qError("failed to encode data block, code: %d", pEntry->dataLen);
×
198
        return terrno;
×
199
      }
200
      pEntry->rawLen = pEntry->dataLen;
17,274,527✔
201
    }
202
  }
203

204
  pBuf->useSize += pEntry->dataLen;
17,274,527✔
205

206
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
17,274,527✔
207
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
17,304,263✔
208

209
  return TSDB_CODE_SUCCESS;
17,306,721✔
210
}
211

212
static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
17,296,876✔
213
  /*
214
    uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
215
    if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
216
      qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
217
             taosQueueItemSize(pDispatcher->pDataBlocks));
218
      return false;
219
    }
220
  */
221

222
  pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
17,296,876✔
223

224
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
17,251,307!
225
  if (pBuf->pData == NULL) {
17,299,307!
226
    qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno);
×
227
    return terrno;
×
228
  }
229

230
  return TSDB_CODE_SUCCESS;
17,299,307✔
231
}
232

233
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
34,135,817✔
234
  (void)taosThreadMutexLock(&pDispatcher->mutex);
34,135,817✔
235
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
34,153,000✔
236
  int32_t status =
34,159,062✔
237
      (0 == blockNums ? DS_BUF_EMPTY
238
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
34,159,062✔
239
  pDispatcher->status = status;
34,159,062✔
240
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
34,159,062✔
241
  return status;
34,160,089✔
242
}
243

244
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
×
245
  (void)taosThreadMutexLock(&pDispatcher->mutex);
×
246
  int32_t status = pDispatcher->status;
×
247
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
×
248
  return status;
×
249
}
250

251
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
17,288,253✔
252
  int32_t              code = 0;
17,288,253✔
253
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
17,288,253✔
254
  SDataDispatchBuf*    pBuf = NULL;
17,288,253✔
255

256
  code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf);
17,288,253✔
257
  if (code) {
17,298,820!
258
    return code;
×
259
  }
260

261
  code = allocBuf(pDispatcher, pInput, pBuf);
17,298,820✔
262
  if (code) {
17,298,790!
263
    taosFreeQitem(pBuf);
×
264
    return code;
×
265
  }
266

267
  QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf));
17,298,790!
268
  QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf));
17,305,458!
269

270
  int32_t status = updateStatus(pDispatcher);
17,305,766✔
271
  *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY) && !(pDispatcher->flags & DS_FLAG_PROCESS_ONE_BLOCK);
17,306,260!
272
  return TSDB_CODE_SUCCESS;
17,306,260✔
273

274
_return:
×
275

276
  taosMemoryFreeClear(pBuf->pData);
×
277
  taosFreeQitem(pBuf);
×
278
  return code;
×
279
}
280

281
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
8,009,609✔
282
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
8,009,609✔
283
  (void)taosThreadMutexLock(&pDispatcher->mutex);
8,009,609✔
284
  pDispatcher->queryEnd = true;
8,011,523✔
285
  pDispatcher->useconds = useconds;
8,011,523✔
286
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
8,011,523✔
287
}
8,011,495✔
288

289
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
973✔
290
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
973✔
291
  (void)taosThreadMutexLock(&pDispatcher->mutex);
973✔
292
  pDispatcher->queryEnd = false;
974✔
293
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
974✔
294
}
974✔
295

296
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
19,184,324✔
297
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
19,184,324✔
298
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
19,184,324✔
299
    *pQueryEnd = pDispatcher->queryEnd;
2,362,842✔
300
    *pLen = 0;
2,362,842✔
301
    return;
2,362,842✔
302
  }
303

304
  SDataDispatchBuf* pBuf = NULL;
16,858,509✔
305
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
16,858,509✔
306
  if (pBuf != NULL) {
16,857,943!
307
    TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
16,858,286✔
308
    taosFreeQitem(pBuf);
16,858,286✔
309
  }
310

311
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
16,852,872✔
312
  *pLen = pEntry->dataLen;
16,852,872✔
313
  *pRowLen = pEntry->rawLen;
16,852,872✔
314

315
  *pQueryEnd = pDispatcher->queryEnd;
16,852,872✔
316
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
16,852,872✔
317
         ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
318
}
319

320
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
19,047,777✔
321
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
19,047,777✔
322
  if (NULL == pDispatcher->nextOutput.pData) {
19,047,777✔
323
    if (!pDispatcher->queryEnd) {
2,221,757!
324
      qError("empty res while query not end in data dispatcher");
×
325
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
326
    }
327

328
    pOutput->useconds = pDispatcher->useconds;
2,221,757✔
329
    pOutput->precision = pDispatcher->pSchema->precision;
2,221,757✔
330
    pOutput->bufStatus = DS_BUF_EMPTY;
2,221,757✔
331
    pOutput->queryEnd = pDispatcher->queryEnd;
2,221,757✔
332
    return TSDB_CODE_SUCCESS;
2,221,757✔
333
  }
334

335
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
16,826,020✔
336
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
16,826,020✔
337
  pOutput->numOfRows = pEntry->numOfRows;
16,826,020✔
338
  pOutput->numOfCols = pEntry->numOfCols;
16,826,020✔
339
  pOutput->compressed = pEntry->compressed;
16,826,020✔
340

341
  (void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
16,826,020✔
342
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
16,861,796✔
343

344
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
16,854,803!
345
  pOutput->bufStatus = updateStatus(pDispatcher);
16,850,266✔
346
  
347
  (void)taosThreadMutexLock(&pDispatcher->mutex);
16,853,538✔
348
  pOutput->queryEnd = pDispatcher->queryEnd;
16,856,763✔
349
  pOutput->useconds = pDispatcher->useconds;
16,856,763✔
350
  pOutput->precision = pDispatcher->pSchema->precision;
16,856,763✔
351
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
16,856,763✔
352

353
  return TSDB_CODE_SUCCESS;
16,856,685✔
354
}
355

356
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
8,058,470✔
357
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
8,058,470✔
358
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
8,058,470✔
359
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
8,061,379!
360
  nodesDestroyNode((SNode*)pDispatcher->pSinkNode);
8,061,379✔
361

362
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
8,491,867✔
363
    SDataDispatchBuf* pBuf = NULL;
431,863✔
364
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
431,863✔
365
    if (pBuf != NULL) {
431,810!
366
      taosMemoryFreeClear(pBuf->pData);
431,884!
367
      taosFreeQitem(pBuf);
430,913✔
368
    }
369
  }
370

371
  taosCloseQueue(pDispatcher->pDataBlocks);
8,060,249✔
372
  taosMemoryFreeClear(pDispatcher->pCompressBuf);
8,060,792!
373
  pDispatcher->bufSize = 0;
8,060,792✔
374

375
  (void)taosThreadMutexDestroy(&pDispatcher->mutex);
8,060,792✔
376
  taosMemoryFree(pDispatcher->pManager);
8,059,088!
377
  return TSDB_CODE_SUCCESS;
8,060,638✔
378
}
379

380
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
381
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
×
382

383
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
384
  return TSDB_CODE_SUCCESS;
×
385
}
386

387
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
8,578,738✔
388
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
8,578,738✔
389

390
  *pFlags = atomic_load_64(&pDispatcher->flags);
8,578,738✔
391
  return TSDB_CODE_SUCCESS;
8,577,033✔
392
}
393

394
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc)  {
8,044,736✔
395
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
8,044,736!
396
    return TSDB_CODE_SUCCESS;
×
397
  }
398

399
  if (pInputDataBlockDesc == NULL) {
8,044,736!
400
    qError("invalid schema");
×
401
    return TSDB_CODE_QRY_INVALID_INPUT;
×
402
  }
403

404
  SNode*  pNode;
405
  int32_t realOutputRowSize = 0;
8,044,736✔
406
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
41,769,503!
407
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
34,076,367✔
408
    if (pSlotDesc->output) {
34,076,367✔
409
      realOutputRowSize += pSlotDesc->dataType.bytes;
33,724,767✔
410
    } else {
411
      // Slots must be sorted, and slots with 'output' set to true must come first
412
      break;
351,600✔
413
    }
414
  }
415
  if (realOutputRowSize !=  pInputDataBlockDesc->outputRowSize) {
8,044,736!
416
    qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
×
417
    return TSDB_CODE_QRY_INVALID_INPUT;
×
418
  }
419
  return TSDB_CODE_SUCCESS;
8,044,736✔
420
}
421

422
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
8,053,621✔
423
  if (pInputDataBlockDesc == NULL) {
8,053,621!
424
    qError("invalid schema");
×
425
    return 0;
×
426
  }
427
  SNode*  pNode;
428
  int32_t numOfCols = 0;
8,053,621✔
429
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
41,782,831!
430
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
34,080,787✔
431
    if (pSlotDesc->output) {
34,080,787✔
432
      ++numOfCols;
33,729,210✔
433
    } else {
434
      // Slots must be sorted, and slots with 'output' set to true must come first
435
      break;
351,577✔
436
    }
437
  }
438
  return numOfCols;
8,053,621✔
439
}
440

441
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, bool processOneBlock) {
8,044,560✔
442
  int32_t code;
443
  SDataSinkNode* pDataSink = *ppDataSink;
8,044,560✔
444
  code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
8,044,560✔
445
  if (code) {
8,051,775!
446
    qError("failed to check input data block desc, code:%d", code);
×
447
    goto _return;
×
448
  }
449

450
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
8,051,775✔
451
  if (NULL == dispatcher) {
8,044,441!
452
    goto _return;
×
453
  }
454

455
  dispatcher->sink.fPut = putDataBlock;
8,044,441✔
456
  dispatcher->sink.fEndPut = endPut;
8,044,441✔
457
  dispatcher->sink.fReset = resetDispatcher;
8,044,441✔
458
  dispatcher->sink.fGetLen = getDataLength;
8,044,441✔
459
  dispatcher->sink.fGetData = getDataBlock;
8,044,441✔
460
  dispatcher->sink.fDestroy = destroyDataSinker;
8,044,441✔
461
  dispatcher->sink.fGetCacheSize = getCacheSize;
8,044,441✔
462
  dispatcher->sink.fGetFlags = getSinkFlags;
8,044,441✔
463
  dispatcher->pManager = pManager;
8,044,441✔
464
  pManager = NULL;
8,044,441✔
465
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
8,044,441✔
466
  dispatcher->pSinkNode = pDataSink;
8,044,441✔
467
  *ppDataSink = NULL;
8,044,441✔
468
  dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
8,044,441✔
469
  dispatcher->status = DS_BUF_EMPTY;
8,049,529✔
470
  dispatcher->queryEnd = false;
8,049,529✔
471
  code = taosOpenQueue(&dispatcher->pDataBlocks);
8,049,529✔
472
  if (code) {
8,051,573!
473
    terrno = code;
×
474
    goto _return;
×
475
  }
476
  code = taosThreadMutexInit(&dispatcher->mutex, NULL);
8,051,573✔
477
  if (code) {
8,053,778!
478
    terrno = code;
×
479
    goto _return;
×
480
  }
481

482
  dispatcher->flags = DS_FLAG_USE_MEMPOOL;
8,056,373✔
483
  if (processOneBlock) {
8,056,373!
UNCOV
484
    dispatcher->flags |= DS_FLAG_PROCESS_ONE_BLOCK;
×
485
  }
486

487
  *pHandle = dispatcher;
8,056,373✔
488
  return TSDB_CODE_SUCCESS;
8,056,373✔
489

UNCOV
490
_return:
×
491

492
  taosMemoryFree(pManager);
×
493
  
UNCOV
494
  if (dispatcher) {
×
495
    dsDestroyDataSinker(dispatcher);
×
496
  }
497

498
  nodesDestroyNode((SNode *)*ppDataSink);
×
UNCOV
499
  *ppDataSink = NULL;
×
500
  
UNCOV
501
  return terrno;
×
502
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc