• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4939

26 Jan 2026 07:26AM UTC coverage: 66.931% (+0.1%) from 66.8%
#4939

push

travis-ci

web-flow
fix: interp func order (#34402)

* fix: interp func order

* fix: iterp sort

5 of 5 new or added lines in 1 file covered. (100.0%)

464 existing lines in 112 files now uncovered.

204592 of 305676 relevant lines covered (66.93%)

126181350.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.67
/source/libs/executor/src/dataDispatcher.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDispatchBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDispatchBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t rawLen;
35
  int32_t dataLen;
36
  int32_t numOfRows;
37
  int32_t numOfCols;
38
  int8_t  compressed;
39
  char    data[];
40
} SDataCacheEntry;
41

42
typedef struct SDataDispatchHandle {
43
  SDataSinkHandle     sink;
44
  SDataSinkManager*   pManager;
45
  SDataBlockDescNode* pSchema;
46
  SDataSinkNode*      pSinkNode;
47
  STaosQueue*         pDataBlocks;
48
  SDataDispatchBuf    nextOutput;
49
  int32_t             outPutColCounts;
50
  int32_t             status;
51
  bool                queryEnd;
52
  uint64_t            useconds;
53
  uint64_t            cachedSize;
54
  uint64_t            flags;
55
  void*               pCompressBuf;
56
  int32_t             bufSize;
57
  TdThreadMutex       mutex;
58
} SDataDispatchHandle;
59

60
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput)  {
523,250,560✔
61
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
523,250,560✔
62
    return TSDB_CODE_SUCCESS;
×
63
  }
64
  if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
523,250,560✔
65
    qError("invalid input data");
29,833✔
66
    return TSDB_CODE_QRY_INVALID_INPUT;
×
67
  }
68
  SDataBlockDescNode* pSchema = pHandle->pSchema;
523,220,024✔
69
  if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
523,243,269✔
70
    qError("invalid schema");
9,549✔
71
    return TSDB_CODE_QRY_INVALID_INPUT;
×
72
  }
73

74
  if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
523,242,349✔
UNCOV
75
    qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
×
76
    return TSDB_CODE_QRY_INVALID_INPUT;
×
77
  }
78

79
  SNode*  pNode;
80
  int32_t colNum = 0;
523,207,380✔
81
  FOREACH(pNode, pHandle->pSchema->pSlots) {
2,147,483,647✔
82
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
2,147,483,647✔
83
    if (pSlotDesc->output) {
2,147,483,647✔
84
      SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
2,147,483,647✔
85
      if (pColInfoData == NULL) {
2,147,483,647✔
86
        return -1;
×
87
      }
88
      if (pColInfoData->info.bytes < 0) {
2,147,483,647✔
89
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
90
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
91
      }
92
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
2,147,483,647✔
93
          TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
2,147,483,647✔
94
        qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
×
95
               pColInfoData->info.bytes);
96
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
97
      }
98
      if (pColInfoData->info.type != pSlotDesc->dataType.type) {
2,147,483,647✔
99
        qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
×
100
        return TSDB_CODE_QRY_INVALID_INPUT;
×
101
      }
102
      if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
2,147,483,647✔
103
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
104
        return TSDB_CODE_QRY_INVALID_INPUT;
×
105
      }
106

107
      if (IS_INVALID_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
108
        qError("invalid column type, type:%d", pColInfoData->info.type);
56,277✔
109
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
110
      }
111
      ++colNum;
2,147,483,647✔
112
    }
113
  }
114

115

116
  return TSDB_CODE_SUCCESS;
523,284,834✔
117
}
118

119
// clang-format off
120
// data format:
121
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
122
// |SDataCacheEntry |  version         | total length | numOfRows    |     group id     | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data |
123
// |                |  sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols        | actual size |           |                         |
124
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
125
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
126
// recorded in the first segment, next to the struct header
127
// clang-format on
128
static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
523,249,561✔
129
  int32_t numOfCols = 0;
523,249,561✔
130
  SNode*  pNode;
131

132
  int32_t code = inputSafetyCheck(pHandle, pInput);
523,249,561✔
133
  if (code) {
523,243,057✔
134
    qError("failed to check input data, code:%d", code);
20,728✔
135
    return code;
×
136
  }
137

138
  FOREACH(pNode, pHandle->pSchema->pSlots) {
2,147,483,647✔
139
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
2,147,483,647✔
140
    if (pSlotDesc->output) {
2,147,483,647✔
141
      ++numOfCols;
2,147,483,647✔
142
    }
143
  }
144

145
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
523,173,356✔
146
  pEntry->compressed = 0;
523,175,491✔
147
  pEntry->numOfRows = pInput->pData->info.rows;
523,196,900✔
148
  pEntry->numOfCols = numOfCols;
523,186,195✔
149
  pEntry->dataLen = 0;
523,224,153✔
150
  pEntry->rawLen = 0;
523,204,171✔
151

152
  pBuf->useSize = sizeof(SDataCacheEntry);
523,195,477✔
153

154
  {
155
    // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
156
    size_t dataEncodeBufSize = pBuf->allocSize + 8;
523,189,345✔
157
    if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
523,173,918✔
158
      if (pHandle->pCompressBuf == NULL) {
×
159
        pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
×
160
        if (NULL == pHandle->pCompressBuf) {
×
161
          QRY_RET(terrno);
×
162
        }
163
        pHandle->bufSize = dataEncodeBufSize;
×
164
      } else {
165
        if (pHandle->bufSize < dataEncodeBufSize) {
×
166
          pHandle->bufSize = dataEncodeBufSize;
×
167
          void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
×
168
          if (p != NULL) {
×
169
            pHandle->pCompressBuf = p;
×
170
          } else {
171
            qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno);
×
172
            return terrno;
×
173
          }
174
        }
175
      }
176

177
      int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
×
178
      if(dataLen < 0) {
×
179
        qError("failed to encode data block, code: %d", dataLen);
×
180
        return terrno;
×
181
      }
182
      int32_t len =
183
          tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
×
184
      if (len < dataLen) {
×
185
        pEntry->compressed = 1;
×
186
        pEntry->dataLen = len;
×
187
        pEntry->rawLen = dataLen;
×
188
      } else {  // no need to compress data
189
        pEntry->compressed = 0;
×
190
        pEntry->dataLen = dataLen;
×
191
        pEntry->rawLen = dataLen;
×
192
        TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
×
193
      }
194
    } else {
195
      pEntry->dataLen = blockEncode(pInput->pData, pEntry->data,  pBuf->allocSize, numOfCols);
523,219,395✔
196
      if(pEntry->dataLen < 0) {
523,117,134✔
UNCOV
197
        qError("failed to encode data block, code: %d", pEntry->dataLen);
×
UNCOV
198
        return terrno;
×
199
      }
200
      pEntry->rawLen = pEntry->dataLen;
523,190,100✔
201
    }
202
  }
203

204
  pBuf->useSize += pEntry->dataLen;
523,146,074✔
205

206
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
523,138,064✔
207
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
523,252,932✔
208

209
  return TSDB_CODE_SUCCESS;
523,119,070✔
210
}
211

212
static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
523,289,980✔
213
  /*
214
    uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
215
    if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
216
      qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
217
             taosQueueItemSize(pDispatcher->pDataBlocks));
218
      return false;
219
    }
220
  */
221

222
  pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
523,289,980✔
223

224
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
523,264,244✔
225
  if (pBuf->pData == NULL) {
523,225,937✔
226
    qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno);
×
227
    return terrno;
×
228
  }
229

230
  return TSDB_CODE_SUCCESS;
523,246,239✔
231
}
232

233
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
1,033,885,642✔
234
  (void)taosThreadMutexLock(&pDispatcher->mutex);
1,033,885,642✔
235
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
1,034,122,505✔
236
  int32_t status =
1,034,035,298✔
237
      (0 == blockNums ? DS_BUF_EMPTY
238
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
1,034,020,806✔
239
  pDispatcher->status = status;
1,034,035,298✔
240
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
1,034,050,717✔
241
  return status;
1,034,132,067✔
242
}
243

244
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
×
245
  (void)taosThreadMutexLock(&pDispatcher->mutex);
×
246
  int32_t status = pDispatcher->status;
×
247
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
×
248
  return status;
×
249
}
250

251
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
523,300,795✔
252
  int32_t              code = 0;
523,300,795✔
253
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
523,300,795✔
254
  SDataDispatchBuf*    pBuf = NULL;
523,300,795✔
255

256
  code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf);
523,307,332✔
257
  if (code) {
523,313,769✔
258
    return code;
×
259
  }
260

261
  code = allocBuf(pDispatcher, pInput, pBuf);
523,313,769✔
262
  if (code) {
523,244,339✔
263
    taosFreeQitem(pBuf);
×
264
    return code;
×
265
  }
266

267
  QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf));
523,244,339✔
268
  QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf));
523,106,007✔
269

270
  int32_t status = updateStatus(pDispatcher);
523,194,714✔
271
  *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY) && !(pDispatcher->flags & DS_FLAG_PROCESS_ONE_BLOCK);
523,208,716✔
272
  return TSDB_CODE_SUCCESS;
523,205,333✔
273

274
_return:
×
275

276
  taosMemoryFreeClear(pBuf->pData);
×
277
  taosFreeQitem(pBuf);
×
278
  return code;
×
279
}
280

281
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
289,402,422✔
282
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
289,402,422✔
283
  (void)taosThreadMutexLock(&pDispatcher->mutex);
289,402,422✔
284
  pDispatcher->queryEnd = true;
289,491,584✔
285
  pDispatcher->useconds = useconds;
289,498,424✔
286
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
289,469,760✔
287
}
289,494,138✔
288

289
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
33,256,352✔
290
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
33,256,352✔
291
  (void)taosThreadMutexLock(&pDispatcher->mutex);
33,256,352✔
292
  pDispatcher->queryEnd = false;
33,256,944✔
293
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
33,256,944✔
294
}
33,256,824✔
295

296
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
609,016,804✔
297
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
609,016,804✔
298
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
609,016,804✔
299
    *pQueryEnd = pDispatcher->queryEnd;
98,315,481✔
300
    *pLen = 0;
98,314,384✔
301
    return;
98,311,757✔
302
  }
303

304
  SDataDispatchBuf* pBuf = NULL;
510,783,894✔
305
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
510,794,901✔
306
  if (pBuf != NULL) {
510,781,973✔
307
    TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
510,802,231✔
308
    taosFreeQitem(pBuf);
510,770,882✔
309
  }
310

311
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
510,720,686✔
312
  *pLen = pEntry->dataLen;
510,729,316✔
313
  *pRowLen = pEntry->rawLen;
510,813,835✔
314

315
  *pQueryEnd = pDispatcher->queryEnd;
510,815,070✔
316
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
510,797,649✔
317
         ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
318
}
319

320
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
570,532,452✔
321
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
570,532,452✔
322
  if (NULL == pDispatcher->nextOutput.pData) {
570,532,452✔
323
    if (!pDispatcher->queryEnd) {
59,788,574✔
324
      qError("empty res while query not end in data dispatcher");
×
325
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
326
    }
327

328
    pOutput->useconds = pDispatcher->useconds;
59,788,724✔
329
    pOutput->precision = pDispatcher->pSchema->precision;
59,792,104✔
330
    pOutput->bufStatus = DS_BUF_EMPTY;
59,799,573✔
331
    pOutput->queryEnd = pDispatcher->queryEnd;
59,799,704✔
332
    return TSDB_CODE_SUCCESS;
59,800,040✔
333
  }
334

335
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
510,732,628✔
336
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
510,750,182✔
337
  pOutput->numOfRows = pEntry->numOfRows;
510,760,434✔
338
  pOutput->numOfCols = pEntry->numOfCols;
510,733,325✔
339
  pOutput->compressed = pEntry->compressed;
510,733,982✔
340

341
  (void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
510,731,358✔
342
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
510,774,612✔
343

344
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
510,806,211✔
345
  pOutput->bufStatus = updateStatus(pDispatcher);
510,744,942✔
346
  
347
  (void)taosThreadMutexLock(&pDispatcher->mutex);
510,783,936✔
348
  pOutput->queryEnd = pDispatcher->queryEnd;
510,829,356✔
349
  pOutput->useconds = pDispatcher->useconds;
510,774,108✔
350
  pOutput->precision = pDispatcher->pSchema->precision;
510,794,692✔
351
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
510,806,895✔
352

353
  return TSDB_CODE_SUCCESS;
510,832,502✔
354
}
355

356
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
293,427,305✔
357
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
293,427,305✔
358
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
293,427,305✔
359
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
293,458,568✔
360
  nodesDestroyNode((SNode*)pDispatcher->pSinkNode);
293,435,221✔
361

362
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
305,862,523✔
363
    SDataDispatchBuf* pBuf = NULL;
12,481,199✔
364
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
12,481,199✔
365
    if (pBuf != NULL) {
12,481,199✔
366
      taosMemoryFreeClear(pBuf->pData);
12,481,199✔
367
      taosFreeQitem(pBuf);
12,481,199✔
368
    }
369
  }
370

371
  taosCloseQueue(pDispatcher->pDataBlocks);
293,417,950✔
372
  taosMemoryFreeClear(pDispatcher->pCompressBuf);
293,336,763✔
373
  pDispatcher->bufSize = 0;
293,353,960✔
374

375
  (void)taosThreadMutexDestroy(&pDispatcher->mutex);
293,425,610✔
376
  taosMemoryFree(pDispatcher->pManager);
293,319,345✔
377
  return TSDB_CODE_SUCCESS;
293,208,070✔
378
}
379

380
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
381
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
×
382

383
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
384
  return TSDB_CODE_SUCCESS;
×
385
}
386

387
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
338,189,244✔
388
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
338,189,244✔
389

390
  *pFlags = atomic_load_64(&pDispatcher->flags);
338,189,244✔
391
  return TSDB_CODE_SUCCESS;
338,125,823✔
392
}
393

394
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc)  {
292,948,336✔
395
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
292,948,336✔
396
    return TSDB_CODE_SUCCESS;
×
397
  }
398

399
  if (pInputDataBlockDesc == NULL) {
292,948,336✔
400
    qError("invalid schema");
×
401
    return TSDB_CODE_QRY_INVALID_INPUT;
×
402
  }
403

404
  SNode*  pNode;
405
  int32_t realOutputRowSize = 0;
292,948,336✔
406
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
1,544,336,443✔
407
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
1,263,116,087✔
408
    if (pSlotDesc->output) {
1,263,116,087✔
409
      realOutputRowSize += pSlotDesc->dataType.bytes;
1,250,995,347✔
410
    } else {
411
      // Slots must be sorted, and slots with 'output' set to true must come first
412
      break;
11,344,071✔
413
    }
414
  }
415
  if (realOutputRowSize !=  pInputDataBlockDesc->outputRowSize) {
292,597,355✔
416
    qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
×
417
    return TSDB_CODE_QRY_INVALID_INPUT;
×
418
  }
419
  return TSDB_CODE_SUCCESS;
292,926,040✔
420
}
421

422
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
292,908,041✔
423
  if (pInputDataBlockDesc == NULL) {
292,908,041✔
424
    qError("invalid schema");
×
425
    return 0;
×
426
  }
427
  SNode*  pNode;
428
  int32_t numOfCols = 0;
292,908,041✔
429
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
1,543,007,554✔
430
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
1,261,949,635✔
431
    if (pSlotDesc->output) {
1,261,949,635✔
432
      ++numOfCols;
1,250,099,513✔
433
    } else {
434
      // Slots must be sorted, and slots with 'output' set to true must come first
435
      break;
11,228,978✔
436
    }
437
  }
438
  return numOfCols;
292,269,818✔
439
}
440

441
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, bool processOneBlock) {
292,949,419✔
442
  int32_t code;
443
  SDataSinkNode* pDataSink = *ppDataSink;
292,949,419✔
444
  code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
293,097,957✔
445
  if (code) {
292,620,990✔
446
    qError("failed to check input data block desc, code:%d", code);
×
447
    goto _return;
×
448
  }
449

450
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
292,620,990✔
451
  if (NULL == dispatcher) {
292,113,297✔
452
    goto _return;
×
453
  }
454

455
  dispatcher->sink.fPut = putDataBlock;
292,113,297✔
456
  dispatcher->sink.fEndPut = endPut;
292,170,390✔
457
  dispatcher->sink.fReset = resetDispatcher;
292,335,825✔
458
  dispatcher->sink.fGetLen = getDataLength;
292,357,019✔
459
  dispatcher->sink.fGetData = getDataBlock;
292,439,377✔
460
  dispatcher->sink.fDestroy = destroyDataSinker;
292,770,330✔
461
  dispatcher->sink.fGetCacheSize = getCacheSize;
292,891,087✔
462
  dispatcher->sink.fGetFlags = getSinkFlags;
292,730,418✔
463
  dispatcher->pManager = pManager;
292,649,493✔
464
  pManager = NULL;
292,835,853✔
465
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
292,835,853✔
466
  dispatcher->pSinkNode = pDataSink;
292,404,316✔
467
  *ppDataSink = NULL;
292,505,818✔
468
  dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
292,388,895✔
469
  dispatcher->status = DS_BUF_EMPTY;
292,241,408✔
470
  dispatcher->queryEnd = false;
292,374,375✔
471
  code = taosOpenQueue(&dispatcher->pDataBlocks);
292,806,012✔
472
  if (code) {
292,801,227✔
473
    terrno = code;
×
474
    goto _return;
×
475
  }
476
  code = taosThreadMutexInit(&dispatcher->mutex, NULL);
292,801,227✔
477
  if (code) {
292,681,085✔
478
    terrno = code;
64,381✔
479
    goto _return;
×
480
  }
481

482
  dispatcher->flags = DS_FLAG_USE_MEMPOOL;
292,616,704✔
483
  if (processOneBlock) {
292,559,385✔
484
    dispatcher->flags |= DS_FLAG_PROCESS_ONE_BLOCK;
4,582,195✔
485
  }
486

487
  *pHandle = dispatcher;
292,563,253✔
488
  return TSDB_CODE_SUCCESS;
292,884,235✔
489

490
_return:
×
491

492
  taosMemoryFree(pManager);
×
493
  
494
  if (dispatcher) {
×
495
    dsDestroyDataSinker(dispatcher);
×
496
  }
497

498
  nodesDestroyNode((SNode *)*ppDataSink);
×
499
  *ppDataSink = NULL;
×
500
  
501
  return terrno;
×
502
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc