• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.36
/source/libs/executor/src/dataDispatcher.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDispatchBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDispatchBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t rawLen;
35
  int32_t dataLen;
36
  int32_t numOfRows;
37
  int32_t numOfCols;
38
  int8_t  compressed;
39
  char    data[];
40
} SDataCacheEntry;
41

42
typedef struct SDataDispatchHandle {
43
  SDataSinkHandle     sink;
44
  SDataSinkManager*   pManager;
45
  SDataBlockDescNode* pSchema;
46
  STaosQueue*         pDataBlocks;
47
  SDataDispatchBuf    nextOutput;
48
  int32_t             outPutColCounts;
49
  int32_t             status;
50
  bool                queryEnd;
51
  uint64_t            useconds;
52
  uint64_t            cachedSize;
53
  void*               pCompressBuf;
54
  int32_t             bufSize;
55
  TdThreadMutex       mutex;
56
} SDataDispatchHandle;
57

58
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput)  {
15,469,781✔
59
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
15,469,781!
60
    return TSDB_CODE_SUCCESS;
×
61
  }
62
  if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
15,469,781!
63
    qError("invalid input data");
×
64
    return TSDB_CODE_QRY_INVALID_INPUT;
×
65
  }
66
  SDataBlockDescNode* pSchema = pHandle->pSchema;
15,469,853✔
67
  if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
15,469,853!
UNCOV
68
    qError("invalid schema");
×
69
    return TSDB_CODE_QRY_INVALID_INPUT;
×
70
  }
71

72
  if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
15,469,956✔
73
    qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
507!
74
    return TSDB_CODE_QRY_INVALID_INPUT;
×
75
  }
76

77
  SNode*  pNode;
78
  int32_t colNum = 0;
15,466,465✔
79
  FOREACH(pNode, pHandle->pSchema->pSlots) {
76,975,398!
80
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
61,525,263✔
81
    if (pSlotDesc->output) {
61,525,263✔
82
      SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
60,854,947✔
83
      if (pColInfoData == NULL) {
60,833,304!
84
        return -1;
×
85
      }
86
      if (pColInfoData->info.bytes < 0) {
60,833,304!
87
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
88
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
89
      }
90
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
60,833,304!
91
          TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
46,313,022!
92
        qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
×
93
               pColInfoData->info.bytes);
94
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
95
      }
96
      if (pColInfoData->info.type != pSlotDesc->dataType.type) {
60,833,304!
97
        qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
×
98
        return TSDB_CODE_QRY_INVALID_INPUT;
×
99
      }
100
      if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
60,833,304!
101
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
102
        return TSDB_CODE_QRY_INVALID_INPUT;
×
103
      }
104

105
      if (IS_INVALID_TYPE(pColInfoData->info.type)) {
60,833,304!
106
        qError("invalid column type, type:%d", pColInfoData->info.type);
×
107
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
108
      }
109
      ++colNum;
60,838,617✔
110
    }
111
  }
112

113

114
  return TSDB_CODE_SUCCESS;
15,450,135✔
115
}
116

117
// clang-format off
118
// data format:
119
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
120
// |SDataCacheEntry |  version         | total length | numOfRows    |     group id     | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data |
121
// |                |  sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols        | actual size |           |                         |
122
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
123
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
124
// recorded in the first segment, next to the struct header
125
// clang-format on
126
static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
15,470,913✔
127
  int32_t numOfCols = 0;
15,470,913✔
128
  SNode*  pNode;
129

130
  int32_t code = inputSafetyCheck(pHandle, pInput);
15,470,913✔
131
  if (code) {
15,428,772!
UNCOV
132
    qError("failed to check input data, code:%d", code);
×
133
    return code;
×
134
  }
135

136
  FOREACH(pNode, pHandle->pSchema->pSlots) {
77,304,610!
137
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
61,874,883✔
138
    if (pSlotDesc->output) {
61,874,883✔
139
      ++numOfCols;
61,199,863✔
140
    }
141
  }
142

143
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
15,429,727✔
144
  pEntry->compressed = 0;
15,429,727✔
145
  pEntry->numOfRows = pInput->pData->info.rows;
15,429,727✔
146
  pEntry->numOfCols = numOfCols;
15,429,727✔
147
  pEntry->dataLen = 0;
15,429,727✔
148
  pEntry->rawLen = 0;
15,429,727✔
149

150
  pBuf->useSize = sizeof(SDataCacheEntry);
15,429,727✔
151

152
  {
153
    // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
154
    size_t dataEncodeBufSize = pBuf->allocSize + 8;
15,429,727✔
155
    if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
15,429,727!
156
      if (pHandle->pCompressBuf == NULL) {
×
157
        pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
×
158
        if (NULL == pHandle->pCompressBuf) {
×
159
          QRY_RET(terrno);
×
160
        }
161
        pHandle->bufSize = dataEncodeBufSize;
×
162
      } else {
163
        if (pHandle->bufSize < dataEncodeBufSize) {
×
164
          pHandle->bufSize = dataEncodeBufSize;
×
165
          void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
×
166
          if (p != NULL) {
×
167
            pHandle->pCompressBuf = p;
×
168
          } else {
169
            qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno);
×
170
            return terrno;
×
171
          }
172
        }
173
      }
174

175
      int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
×
176
      if(dataLen < 0) {
×
177
        qError("failed to encode data block, code: %d", dataLen);
×
178
        return terrno;
×
179
      }
180
      int32_t len =
181
          tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
×
182
      if (len < dataLen) {
×
183
        pEntry->compressed = 1;
×
184
        pEntry->dataLen = len;
×
185
        pEntry->rawLen = dataLen;
×
186
      } else {  // no need to compress data
187
        pEntry->compressed = 0;
×
188
        pEntry->dataLen = dataLen;
×
189
        pEntry->rawLen = dataLen;
×
190
        TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
×
191
      }
192
    } else {
193
      pEntry->dataLen = blockEncode(pInput->pData, pEntry->data,  pBuf->allocSize, numOfCols);
15,429,727✔
194
      if(pEntry->dataLen < 0) {
15,442,369!
195
        qError("failed to encode data block, code: %d", pEntry->dataLen);
×
196
        return terrno;
×
197
      }
198
      pEntry->rawLen = pEntry->dataLen;
15,443,666✔
199
    }
200
  }
201

202
  pBuf->useSize += pEntry->dataLen;
15,443,666✔
203

204
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
15,443,666✔
205
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
15,479,272✔
206

207
  return TSDB_CODE_SUCCESS;
15,475,605✔
208
}
209

210
static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
15,477,846✔
211
  /*
212
    uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
213
    if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
214
      qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
215
             taosQueueItemSize(pDispatcher->pDataBlocks));
216
      return false;
217
    }
218
  */
219

220
  pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
15,477,846✔
221

222
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
15,418,233✔
223
  if (pBuf->pData == NULL) {
15,472,973!
224
    qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno);
×
225
    return terrno;
×
226
  }
227

228
  return TSDB_CODE_SUCCESS;
15,472,973✔
229
}
230

231
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
30,437,748✔
232
  (void)taosThreadMutexLock(&pDispatcher->mutex);
30,437,748✔
233
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
30,458,265✔
234
  int32_t status =
30,483,667✔
235
      (0 == blockNums ? DS_BUF_EMPTY
236
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
30,483,667✔
237
  pDispatcher->status = status;
30,483,667✔
238
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
30,483,667✔
239
  return status;
30,480,170✔
240
}
241

242
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
×
243
  (void)taosThreadMutexLock(&pDispatcher->mutex);
×
244
  int32_t status = pDispatcher->status;
×
245
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
×
246
  return status;
×
247
}
248

249
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
15,458,719✔
250
  int32_t              code = 0;
15,458,719✔
251
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
15,458,719✔
252
  SDataDispatchBuf*    pBuf = NULL;
15,458,719✔
253

254
  code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf);
15,458,719✔
255
  if (code) {
15,478,928!
256
    return code;
×
257
  }
258

259
  code = allocBuf(pDispatcher, pInput, pBuf);
15,478,928✔
260
  if (code) {
15,472,361!
261
    taosFreeQitem(pBuf);
×
262
    return code;
×
263
  }
264

265
  QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf));
15,472,361!
266
  QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf));
15,475,086!
267

268
  int32_t status = updateStatus(pDispatcher);
15,481,482✔
269
  *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY);
15,480,166✔
270
  return TSDB_CODE_SUCCESS;
15,480,166✔
271

272
_return:
×
273

274
  taosMemoryFreeClear(pBuf->pData);
×
275
  taosFreeQitem(pBuf);
×
276
  return code;
×
277
}
278

279
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
7,279,193✔
280
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
7,279,193✔
281
  (void)taosThreadMutexLock(&pDispatcher->mutex);
7,279,193✔
282
  pDispatcher->queryEnd = true;
7,279,960✔
283
  pDispatcher->useconds = useconds;
7,279,960✔
284
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
7,279,960✔
285
}
7,279,941✔
286

287
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
1,064✔
288
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
1,064✔
289
  (void)taosThreadMutexLock(&pDispatcher->mutex);
1,064✔
290
  pDispatcher->queryEnd = false;
1,064✔
291
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
1,064✔
292
}
1,064✔
293

294
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
17,404,907✔
295
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
17,404,907✔
296
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
17,404,907✔
297
    *pQueryEnd = pDispatcher->queryEnd;
2,472,591✔
298
    *pLen = 0;
2,472,591✔
299
    return;
2,472,591✔
300
  }
301

302
  SDataDispatchBuf* pBuf = NULL;
14,994,789✔
303
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
14,994,789✔
304
  if (pBuf != NULL) {
14,996,050✔
305
    TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
14,994,831✔
306
    taosFreeQitem(pBuf);
14,994,831✔
307
  }
308

309
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
15,021,475✔
310
  *pLen = pEntry->dataLen;
15,021,475✔
311
  *pRowLen = pEntry->rawLen;
15,021,475✔
312

313
  *pQueryEnd = pDispatcher->queryEnd;
15,021,475✔
314
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
15,021,475✔
315
         ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
316
}
317

318
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
17,282,576✔
319
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
17,282,576✔
320
  if (NULL == pDispatcher->nextOutput.pData) {
17,282,576✔
321
    if (!pDispatcher->queryEnd) {
2,350,041!
322
      qError("empty res while query not end in data dispatcher");
×
323
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
324
    }
325

326
    pOutput->useconds = pDispatcher->useconds;
2,350,041✔
327
    pOutput->precision = pDispatcher->pSchema->precision;
2,350,041✔
328
    pOutput->bufStatus = DS_BUF_EMPTY;
2,350,041✔
329
    pOutput->queryEnd = pDispatcher->queryEnd;
2,350,041✔
330
    return TSDB_CODE_SUCCESS;
2,350,041✔
331
  }
332

333
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
14,932,535✔
334
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
14,932,535✔
335
  pOutput->numOfRows = pEntry->numOfRows;
14,932,535✔
336
  pOutput->numOfCols = pEntry->numOfCols;
14,932,535✔
337
  pOutput->compressed = pEntry->compressed;
14,932,535✔
338

339
  (void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
14,932,535✔
340
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
14,978,873✔
341

342
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
14,989,884!
343
  pOutput->bufStatus = updateStatus(pDispatcher);
14,984,332✔
344
  
345
  (void)taosThreadMutexLock(&pDispatcher->mutex);
14,999,871✔
346
  pOutput->queryEnd = pDispatcher->queryEnd;
15,000,674✔
347
  pOutput->useconds = pDispatcher->useconds;
15,000,674✔
348
  pOutput->precision = pDispatcher->pSchema->precision;
15,000,674✔
349
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
15,000,674✔
350

351
  return TSDB_CODE_SUCCESS;
15,004,685✔
352
}
353

354
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
7,326,474✔
355
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
7,326,474✔
356
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
7,326,474✔
357
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
7,326,931!
358

359
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
7,771,041✔
360
    SDataDispatchBuf* pBuf = NULL;
444,067✔
361
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
444,067✔
362
    if (pBuf != NULL) {
444,043!
363
      taosMemoryFreeClear(pBuf->pData);
444,044!
364
      taosFreeQitem(pBuf);
444,066✔
365
    }
366
  }
367

368
  taosCloseQueue(pDispatcher->pDataBlocks);
7,326,704✔
369
  taosMemoryFreeClear(pDispatcher->pCompressBuf);
7,326,987!
370
  pDispatcher->bufSize = 0;
7,326,987✔
371

372
  (void)taosThreadMutexDestroy(&pDispatcher->mutex);
7,326,987✔
373
  taosMemoryFree(pDispatcher->pManager);
7,326,752✔
374
  return TSDB_CODE_SUCCESS;
7,327,100✔
375
}
376

377
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
378
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
×
379

380
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
381
  return TSDB_CODE_SUCCESS;
×
382
}
383

384
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc)  {
7,309,168✔
385
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
7,309,168!
386
    return TSDB_CODE_SUCCESS;
×
387
  }
388

389
  if (pInputDataBlockDesc == NULL) {
7,309,168!
390
    qError("invalid schema");
×
391
    return TSDB_CODE_QRY_INVALID_INPUT;
×
392
  }
393

394
  SNode*  pNode;
395
  int32_t realOutputRowSize = 0;
7,309,168✔
396
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
43,687,565!
397
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
36,744,124✔
398
    if (pSlotDesc->output) {
36,744,124✔
399
      realOutputRowSize += pSlotDesc->dataType.bytes;
36,378,397✔
400
    } else {
401
      // Slots must be sorted, and slots with 'output' set to true must come first
402
      break;
365,727✔
403
    }
404
  }
405
  if (realOutputRowSize !=  pInputDataBlockDesc->outputRowSize) {
7,309,168!
406
    qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
×
407
    return TSDB_CODE_QRY_INVALID_INPUT;
×
408
  }
409
  return TSDB_CODE_SUCCESS;
7,309,168✔
410
}
411

412
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
7,317,898✔
413
  if (pInputDataBlockDesc == NULL) {
7,317,898!
414
    qError("invalid schema");
×
415
    return 0;
×
416
  }
417
  SNode*  pNode;
418
  int32_t numOfCols = 0;
7,317,898✔
419
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
43,706,649!
420
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
36,754,476✔
421
    if (pSlotDesc->output) {
36,754,476✔
422
      ++numOfCols;
36,388,751✔
423
    } else {
424
      // Slots must be sorted, and slots with 'output' set to true must come first
425
      break;
365,725✔
426
    }
427
  }
428
  return numOfCols;
7,317,898✔
429
}
430

431
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
7,309,119✔
432
  int32_t code;
433
  code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
7,309,119✔
434
  if (code) {
7,324,380!
435
    qError("failed to check input data block desc, code:%d", code);
×
436
    return code;
×
437
  }
438

439
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
7,324,380✔
440
  if (NULL == dispatcher) {
7,316,889!
UNCOV
441
    goto _return;
×
442
  }
443

444
  dispatcher->sink.fPut = putDataBlock;
7,316,889✔
445
  dispatcher->sink.fEndPut = endPut;
7,316,889✔
446
  dispatcher->sink.fReset = resetDispatcher;
7,316,889✔
447
  dispatcher->sink.fGetLen = getDataLength;
7,316,889✔
448
  dispatcher->sink.fGetData = getDataBlock;
7,316,889✔
449
  dispatcher->sink.fDestroy = destroyDataSinker;
7,316,889✔
450
  dispatcher->sink.fGetCacheSize = getCacheSize;
7,316,889✔
451

452
  dispatcher->pManager = pManager;
7,316,889✔
453
  pManager = NULL;
7,316,889✔
454
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
7,316,889✔
455
  dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
7,316,889✔
456
  dispatcher->status = DS_BUF_EMPTY;
7,323,295✔
457
  dispatcher->queryEnd = false;
7,323,295✔
458
  code = taosOpenQueue(&dispatcher->pDataBlocks);
7,323,295✔
459
  if (code) {
7,322,732!
UNCOV
460
    terrno = code;
×
UNCOV
461
    goto _return;
×
462
  }
463
  code = taosThreadMutexInit(&dispatcher->mutex, NULL);
7,322,732✔
464
  if (code) {
7,318,016!
UNCOV
465
    terrno = code;
×
466
    goto _return;
×
467
  }
468

469
  *pHandle = dispatcher;
7,320,013✔
470
  return TSDB_CODE_SUCCESS;
7,320,013✔
471

UNCOV
472
_return:
×
473

UNCOV
474
  taosMemoryFree(pManager);
×
475
  
UNCOV
476
  if (dispatcher) {
×
UNCOV
477
    dsDestroyDataSinker(dispatcher);
×
478
  }
UNCOV
479
  return terrno;
×
480
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc