• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5056

17 May 2026 01:15AM UTC coverage: 73.384% (+0.03%) from 73.355%
#5056

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281643 of 383795 relevant lines covered (73.38%)

135942701.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.18
/source/libs/executor/src/dataDispatcher.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDispatchBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDispatchBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t rawLen;
35
  int32_t dataLen;
36
  int32_t numOfRows;
37
  int32_t numOfCols;
38
  int8_t  compressed;
39
  char    data[];
40
} SDataCacheEntry;
41

42
typedef struct SDataDispatchHandle {
43
  SDataSinkHandle     sink;
44
  SDataSinkManager*   pManager;
45
  SDataBlockDescNode* pSchema;
46
  SDataSinkNode*      pSinkNode;
47
  STaosQueue*         pDataBlocks;
48
  SDataDispatchBuf    nextOutput;
49
  int32_t             outPutColCounts;
50
  int32_t             status;
51
  bool                queryEnd;
52
  uint64_t            useconds;
53
  uint64_t            cachedSize;
54
  uint64_t            flags;
55
  void*               pCompressBuf;
56
  int32_t             bufSize;
57
  bool                dynamicSchema;
58
  TdThreadMutex       mutex;
59
} SDataDispatchHandle;
60

61
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput)  {
851,688,548✔
62
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
851,688,548✔
63
    return TSDB_CODE_SUCCESS;
×
64
  }
65
  if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
851,688,548✔
66
    qError("invalid input data");
×
67
    return TSDB_CODE_QRY_INVALID_INPUT;
×
68
  }
69
  SDataBlockDescNode* pSchema = pHandle->pSchema;
851,696,326✔
70
  if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
851,704,147✔
71
    qError("invalid schema");
35,197✔
72
    return TSDB_CODE_QRY_INVALID_INPUT;
×
73
  }
74

75
  if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
851,669,913✔
76
    qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
×
77
    return TSDB_CODE_QRY_INVALID_INPUT;
×
78
  }
79

80
  SNode*  pNode;
81
  int32_t colNum = 0;
851,699,833✔
82
  FOREACH(pNode, pHandle->pSchema->pSlots) {
2,147,483,647✔
83
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
2,147,483,647✔
84
    if (pSlotDesc->output) {
2,147,483,647✔
85
      SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
2,147,483,647✔
86
      if (pColInfoData == NULL) {
2,147,483,647✔
87
        return -1;
×
88
      }
89
      if (pColInfoData->info.bytes < 0) {
2,147,483,647✔
90
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
91
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
92
      }
93
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
2,147,483,647✔
94
          TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
2,147,483,647✔
95
        qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
×
96
               pColInfoData->info.bytes);
97
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
98
      }
99
      if (pColInfoData->info.type != pSlotDesc->dataType.type) {
2,147,483,647✔
100
        qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
×
101
        return TSDB_CODE_QRY_INVALID_INPUT;
×
102
      }
103
      if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
2,147,483,647✔
104
        qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
×
105
        return TSDB_CODE_QRY_INVALID_INPUT;
×
106
      }
107

108
      if (IS_INVALID_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
109
        qError("invalid column type, type:%d", pColInfoData->info.type);
50,432✔
110
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
111
      }
112
      ++colNum;
2,147,483,647✔
113
    }
114
  }
115

116

117
  return TSDB_CODE_SUCCESS;
851,674,271✔
118
}
119

120
static int32_t inputSafetyCheckForDynamicSchema(const SInputData* pInput)  {
34,407,634✔
121
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
34,407,634✔
122
    return TSDB_CODE_SUCCESS;
×
123
  }
124
  if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
34,407,634✔
125
    qError("invalid input data");
×
126
    return TSDB_CODE_QRY_INVALID_INPUT;
×
127
  }
128

129
  for (int32_t colNum = 0; colNum < taosArrayGetSize(pInput->pData->pDataBlock); colNum++) {
128,779,154✔
130
    SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
94,371,520✔
131
    if (pColInfoData == NULL) {
94,371,520✔
132
      return -1;
×
133
    }
134
    if (pColInfoData->info.bytes < 0) {
94,371,520✔
135
      qError("invalid column bytes, input:%d", pColInfoData->info.bytes);
×
136
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
137
    }
138
    if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
94,371,520✔
139
        TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
78,350,120✔
140
      qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
×
141
             pColInfoData->info.bytes);
142
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
143
    }
144

145
    if (IS_INVALID_TYPE(pColInfoData->info.type)) {
94,371,520✔
146
      qError("invalid column type, type:%d", pColInfoData->info.type);
×
147
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
148
    }
149
  }
150

151
  return TSDB_CODE_SUCCESS;
34,407,634✔
152
}
153

154
// clang-format off
155
// data format:
156
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
157
// |SDataCacheEntry |  version         | total length | numOfRows    |     group id     | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data |
158
// |                |  sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols        | actual size |           |                         |
159
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
160
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
161
// recorded in the first segment, next to the struct header
162
// clang-format on
163
static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
886,097,397✔
164
  int32_t numOfCols = 0;
886,097,397✔
165
  SNode*  pNode;
166
  int32_t code = TSDB_CODE_SUCCESS;
886,097,397✔
167

168
  if (pHandle->dynamicSchema) {
886,097,397✔
169
    code = inputSafetyCheckForDynamicSchema(pInput);
34,407,634✔
170
    if (code) {
34,407,634✔
171
      qError("failed to check input data for dynamic schema, code:%d", code);
×
172
      return code;
×
173
    }
174
    numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
34,407,634✔
175
  } else {
176
    code = inputSafetyCheck(pHandle, pInput);
851,697,816✔
177
    if (code) {
851,668,517✔
178
      qError("failed to check input data, code:%d", code);
×
179
      return code;
×
180
    }
181

182
    FOREACH(pNode, pHandle->pSchema->pSlots) {
2,147,483,647✔
183
      SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
2,147,483,647✔
184
      if (pSlotDesc->output) {
2,147,483,647✔
185
        ++numOfCols;
2,147,483,647✔
186
      }
187
    }
188
  }
189

190
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
886,046,605✔
191
  pEntry->compressed = 0;
886,049,243✔
192
  pEntry->numOfRows = pInput->pData->info.rows;
886,058,166✔
193
  pEntry->numOfCols = numOfCols;
886,081,343✔
194
  pEntry->dataLen = 0;
886,096,858✔
195
  pEntry->rawLen = 0;
886,084,373✔
196

197
  pBuf->useSize = sizeof(SDataCacheEntry);
886,048,747✔
198

199
  {
200
    // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
201
    size_t dataEncodeBufSize = pBuf->allocSize + 8;
886,044,798✔
202
    if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
886,072,978✔
203
      if (pHandle->pCompressBuf == NULL) {
×
204
        pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
×
205
        if (NULL == pHandle->pCompressBuf) {
×
206
          QRY_RET(terrno);
×
207
        }
208
        pHandle->bufSize = dataEncodeBufSize;
×
209
      } else {
210
        if (pHandle->bufSize < dataEncodeBufSize) {
×
211
          pHandle->bufSize = dataEncodeBufSize;
×
212
          void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
×
213
          if (p != NULL) {
×
214
            pHandle->pCompressBuf = p;
×
215
          } else {
216
            qError("failed to prepare compress buf:%d, code: %x", pHandle->bufSize, terrno);
×
217
            return terrno;
×
218
          }
219
        }
220
      }
221

222
      int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
×
223
      if(dataLen < 0) {
×
224
        qError("failed to encode data block, code: %d", dataLen);
×
225
        return terrno;
×
226
      }
227
      int32_t len =
228
          tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
×
229
      if (len < dataLen) {
×
230
        pEntry->compressed = 1;
×
231
        pEntry->dataLen = len;
×
232
        pEntry->rawLen = dataLen;
×
233
      } else {  // no need to compress data
234
        pEntry->compressed = 0;
×
235
        pEntry->dataLen = dataLen;
×
236
        pEntry->rawLen = dataLen;
×
237
        TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
×
238
      }
239
    } else {
240
      pEntry->dataLen =
886,006,777✔
241
          pHandle->dynamicSchema
886,079,497✔
242
              ? blockEncodeInternal(pInput->pData, pEntry->data,  pBuf->allocSize, numOfCols)
34,407,634✔
243
              : blockEncode(pInput->pData, pEntry->data,  pBuf->allocSize, numOfCols);
886,023,442✔
244
      if(pEntry->dataLen < 0) {
886,018,606✔
245
        qError("failed to encode data block, code: %d", pEntry->dataLen);
×
246
        return terrno;
×
247
      }
248
      pEntry->rawLen = pEntry->dataLen;
886,018,450✔
249
    }
250
  }
251

252
  pBuf->useSize += pEntry->dataLen;
886,059,168✔
253

254
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
886,085,579✔
255
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
886,100,305✔
256

257
  return TSDB_CODE_SUCCESS;
886,006,296✔
258
}
259

260
static int32_t allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
886,106,458✔
261
  /*
262
    uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
263
    if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
264
      qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
265
             taosQueueItemSize(pDispatcher->pDataBlocks));
266
      return false;
267
    }
268
  */
269

270
  pBuf->allocSize = sizeof(SDataCacheEntry) + (pDispatcher->dynamicSchema
1,772,205,006✔
271
                                                   ? blockGetInternalEncodeSize(pInput->pData)
34,407,634✔
272
                                                   : blockGetEncodeSize(pInput->pData));
851,711,994✔
273

274
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
886,115,177✔
275
  if (pBuf->pData == NULL) {
886,058,700✔
276
    qError("SinkNode failed to malloc memory, size:%d, code:%x", pBuf->allocSize, terrno);
×
277
    return terrno;
×
278
  }
279

280
  return TSDB_CODE_SUCCESS;
886,093,318✔
281
}
282

283
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
1,762,515,918✔
284
  (void)taosThreadMutexLock(&pDispatcher->mutex);
1,762,515,918✔
285
  int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
1,762,670,326✔
286
  int32_t status =
1,762,625,622✔
287
      (0 == blockNums ? DS_BUF_EMPTY
288
                      : (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
1,762,594,851✔
289
  pDispatcher->status = status;
1,762,625,622✔
290
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
1,762,640,884✔
291
  return status;
1,762,683,189✔
292
}
293

294
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
×
295
  (void)taosThreadMutexLock(&pDispatcher->mutex);
×
296
  int32_t status = pDispatcher->status;
×
297
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
×
298
  return status;
×
299
}
300

301
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
886,117,577✔
302
  int32_t              code = 0;
886,117,577✔
303
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
886,117,577✔
304
  SDataDispatchBuf*    pBuf = NULL;
886,117,577✔
305

306
  code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf);
886,136,796✔
307
  if (code) {
886,116,337✔
308
    return code;
×
309
  }
310

311
  code = allocBuf(pDispatcher, pInput, pBuf);
886,116,337✔
312
  if (code) {
886,092,019✔
313
    taosFreeQitem(pBuf);
×
314
    return code;
×
315
  }
316

317
  QRY_ERR_JRET(toDataCacheEntry(pDispatcher, pInput, pBuf));
886,092,019✔
318
  QRY_ERR_JRET(taosWriteQitem(pDispatcher->pDataBlocks, pBuf));
886,003,712✔
319

320
  int32_t status = updateStatus(pDispatcher);
886,065,546✔
321
  *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY) && !(pDispatcher->flags & DS_FLAG_PROCESS_ONE_BLOCK);
886,090,026✔
322
  return TSDB_CODE_SUCCESS;
886,115,855✔
323

324
_return:
×
325

326
  taosMemoryFreeClear(pBuf->pData);
×
327
  taosFreeQitem(pBuf);
×
328
  return code;
×
329
}
330

331
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
402,625,417✔
332
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
402,625,417✔
333
  (void)taosThreadMutexLock(&pDispatcher->mutex);
402,625,417✔
334
  pDispatcher->queryEnd = true;
402,643,221✔
335
  pDispatcher->useconds = useconds;
402,643,435✔
336
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
402,627,957✔
337
}
402,621,351✔
338

339
static void resetDispatcher(struct SDataSinkHandle* pHandle) {
53,753,714✔
340
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
53,753,714✔
341
  (void)taosThreadMutexLock(&pDispatcher->mutex);
53,753,714✔
342
  pDispatcher->queryEnd = false;
53,751,195✔
343
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
53,751,653✔
344
}
53,753,943✔
345

346
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
1,035,091,335✔
347
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
1,035,091,335✔
348
  if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
1,035,091,335✔
349
    *pQueryEnd = pDispatcher->queryEnd;
158,704,307✔
350
    *pLen = 0;
158,700,463✔
351
    return;
158,700,613✔
352
  }
353

354
  SDataDispatchBuf* pBuf = NULL;
876,499,968✔
355
  taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
876,509,155✔
356
  if (pBuf != NULL) {
876,507,403✔
357
    TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
876,545,146✔
358
    taosFreeQitem(pBuf);
876,522,471✔
359
  }
360

361
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
876,437,395✔
362
  *pLen = pEntry->dataLen;
876,442,035✔
363
  *pRowLen = pEntry->rawLen;
876,520,115✔
364

365
  *pQueryEnd = pDispatcher->queryEnd;
876,524,262✔
366
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
876,510,233✔
367
         ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
368
}
369

370
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
977,172,763✔
371
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
977,172,763✔
372
  if (NULL == pDispatcher->nextOutput.pData) {
977,172,763✔
373
    if (!pDispatcher->queryEnd) {
100,685,483✔
374
      qError("empty res while query not end in data dispatcher");
×
375
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
376
    }
377

378
    pOutput->useconds = pDispatcher->useconds;
100,689,734✔
379
    pOutput->precision = pDispatcher->pSchema->precision;
100,684,535✔
380
    pOutput->bufStatus = DS_BUF_EMPTY;
100,692,500✔
381
    pOutput->queryEnd = pDispatcher->queryEnd;
100,694,226✔
382
    return TSDB_CODE_SUCCESS;
100,690,671✔
383
  }
384

385
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
876,492,265✔
386
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
876,500,720✔
387
  pOutput->numOfRows = pEntry->numOfRows;
876,517,247✔
388
  pOutput->numOfCols = pEntry->numOfCols;
876,466,831✔
389
  pOutput->compressed = pEntry->compressed;
876,487,584✔
390

391
  (void)atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
876,487,939✔
392
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
876,529,649✔
393

394
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);  // todo persistent
876,544,173✔
395
  pOutput->bufStatus = updateStatus(pDispatcher);
876,500,174✔
396
  
397
  (void)taosThreadMutexLock(&pDispatcher->mutex);
876,525,642✔
398
  pOutput->queryEnd = pDispatcher->queryEnd;
876,554,797✔
399
  pOutput->useconds = pDispatcher->useconds;
876,503,436✔
400
  pOutput->precision = pDispatcher->pSchema->precision;
876,519,822✔
401
  (void)taosThreadMutexUnlock(&pDispatcher->mutex);
876,533,897✔
402

403
  return TSDB_CODE_SUCCESS;
876,556,842✔
404
}
405

406
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
407,202,744✔
407
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
407,202,744✔
408
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
407,202,744✔
409
  taosMemoryFreeClear(pDispatcher->nextOutput.pData);
407,228,879✔
410
  nodesDestroyNode((SNode*)pDispatcher->pSinkNode);
407,211,619✔
411

412
  while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
416,741,712✔
413
    SDataDispatchBuf* pBuf = NULL;
9,582,374✔
414
    taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
9,582,374✔
415
    if (pBuf != NULL) {
9,582,374✔
416
      taosMemoryFreeClear(pBuf->pData);
9,582,374✔
417
      taosFreeQitem(pBuf);
9,582,374✔
418
    }
419
  }
420

421
  taosCloseQueue(pDispatcher->pDataBlocks);
407,180,307✔
422
  taosMemoryFreeClear(pDispatcher->pCompressBuf);
407,109,998✔
423
  pDispatcher->bufSize = 0;
407,116,483✔
424

425
  (void)taosThreadMutexDestroy(&pDispatcher->mutex);
407,164,806✔
426
  taosMemoryFree(pDispatcher->pManager);
407,129,096✔
427
  return TSDB_CODE_SUCCESS;
407,024,649✔
428
}
429

430
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
431
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
×
432

433
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
434
  return TSDB_CODE_SUCCESS;
×
435
}
436

437
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
476,621,668✔
438
  SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
476,621,668✔
439

440
  *pFlags = atomic_load_64(&pDispatcher->flags);
476,621,668✔
441
  return TSDB_CODE_SUCCESS;
476,574,767✔
442
}
443

444
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc)  {
406,908,841✔
445
  if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
406,908,841✔
446
    return TSDB_CODE_SUCCESS;
×
447
  }
448

449
  if (pInputDataBlockDesc == NULL) {
406,908,841✔
450
    qError("invalid schema");
×
451
    return TSDB_CODE_QRY_INVALID_INPUT;
×
452
  }
453

454
  SNode*  pNode;
455
  int32_t realOutputRowSize = 0;
406,908,841✔
456
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
2,147,483,647✔
457
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
1,772,560,546✔
458
    if (pSlotDesc->output) {
1,772,560,546✔
459
      realOutputRowSize += pSlotDesc->dataType.bytes;
1,750,528,472✔
460
    } else {
461
      // Slots must be sorted, and slots with 'output' set to true must come first
462
      break;
21,017,895✔
463
    }
464
  }
465
  if (realOutputRowSize !=  pInputDataBlockDesc->outputRowSize) {
406,823,253✔
466
    qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
×
467
    return TSDB_CODE_QRY_INVALID_INPUT;
×
468
  }
469
  return TSDB_CODE_SUCCESS;
406,918,782✔
470
}
471

472
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
406,855,598✔
473
  if (pInputDataBlockDesc == NULL) {
406,855,598✔
474
    qError("invalid schema");
×
475
    return 0;
×
476
  }
477
  SNode*  pNode;
478
  int32_t numOfCols = 0;
406,855,598✔
479
  FOREACH(pNode, pInputDataBlockDesc->pSlots) {
2,147,483,647✔
480
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
1,772,050,680✔
481
    if (pSlotDesc->output) {
1,772,050,680✔
482
      ++numOfCols;
1,750,625,672✔
483
    } else {
484
      // Slots must be sorted, and slots with 'output' set to true must come first
485
      break;
20,879,465✔
486
    }
487
  }
488
  return numOfCols;
406,481,918✔
489
}
490

491
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, bool processOneBlock) {
406,892,286✔
492
  int32_t code;
493
  SDataSinkNode* pDataSink = *ppDataSink;
406,892,286✔
494
  code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
407,028,568✔
495
  if (code) {
406,840,185✔
496
    qError("failed to check input data block desc, code:%d", code);
×
497
    goto _return;
×
498
  }
499

500
  SDataDispatcherNode* pDispatherNode = (SDataDispatcherNode*)pDataSink;
406,840,185✔
501
  SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
406,840,185✔
502
  if (NULL == dispatcher) {
406,413,307✔
503
    goto _return;
×
504
  }
505

506
  dispatcher->sink.fPut = putDataBlock;
406,413,307✔
507
  dispatcher->sink.fEndPut = endPut;
406,436,181✔
508
  dispatcher->sink.fReset = resetDispatcher;
406,521,124✔
509
  dispatcher->sink.fGetLen = getDataLength;
406,770,224✔
510
  dispatcher->sink.fGetData = getDataBlock;
406,913,510✔
511
  dispatcher->sink.fDestroy = destroyDataSinker;
406,896,696✔
512
  dispatcher->sink.fGetCacheSize = getCacheSize;
406,825,167✔
513
  dispatcher->sink.fGetFlags = getSinkFlags;
406,899,137✔
514
  dispatcher->pManager = pManager;
407,079,839✔
515
  pManager = NULL;
406,726,687✔
516
  dispatcher->dynamicSchema = pDispatherNode->dynamicSchema;
406,726,687✔
517
  dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
406,851,377✔
518
  dispatcher->pSinkNode = pDataSink;
406,938,061✔
519
  *ppDataSink = NULL;
406,535,206✔
520
  dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
406,744,941✔
521
  dispatcher->status = DS_BUF_EMPTY;
406,516,415✔
522
  dispatcher->queryEnd = false;
406,594,365✔
523
  code = taosOpenQueue(&dispatcher->pDataBlocks);
406,808,002✔
524
  if (code) {
406,840,575✔
525
    terrno = code;
×
526
    goto _return;
×
527
  }
528
  code = taosThreadMutexInit(&dispatcher->mutex, NULL);
406,840,575✔
529
  if (code) {
406,724,498✔
530
    terrno = code;
×
531
    goto _return;
×
532
  }
533

534
  dispatcher->flags = DS_FLAG_USE_MEMPOOL;
406,724,498✔
535
  if (processOneBlock) {
406,708,503✔
536
    dispatcher->flags |= DS_FLAG_PROCESS_ONE_BLOCK;
10,105,683✔
537
  }
538

539
  *pHandle = dispatcher;
406,715,209✔
540
  return TSDB_CODE_SUCCESS;
406,860,324✔
541

542
_return:
×
543

544
  taosMemoryFree(pManager);
×
545
  
546
  if (dispatcher) {
×
547
    dsDestroyDataSinker(dispatcher);
×
548
  }
549

550
  nodesDestroyNode((SNode *)*ppDataSink);
×
551
  *ppDataSink = NULL;
×
552
  
553
  return terrno;
×
554
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc