• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/sortoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortOpGroupIdCalc {
23
  STupleHandle* pSavedTuple;
24
  SArray*       pSortColsArr;
25
  char*         keyBuf;
26
  int32_t       lastKeysLen; // default to be 0
27
  uint64_t      lastGroupId;
28
  bool          excludePKCol;
29
} SSortOpGroupIdCalc;
30

31
typedef struct SSortOperatorInfo {
32
  SOptrBasicInfo      binfo;
33
  uint32_t            sortBufSize;  // max buffer size for in-memory sort
34
  SArray*             pSortInfo;
35
  SSortHandle*        pSortHandle;
36
  SColMatchInfo       matchInfo;
37
  int32_t             bufPageSize;
38
  int64_t             startTs;      // sort start time
39
  uint64_t            sortElapsed;  // sort elapsed time, time to flush to disk not included.
40
  SLimitInfo          limitInfo;
41
  uint64_t            maxTupleLength;
42
  int64_t             maxRows;
43
  SSortOpGroupIdCalc* pGroupIdCalc;
44
} SSortOperatorInfo;
45

46
static int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
47
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
48
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
49
static int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
50

51
static void destroySortOperatorInfo(void* param);
52
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
53

54
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
55

56
// todo add limit/offset impl
UNCOV
57
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
×
UNCOV
58
  QRY_PARAM_CHECK(pOptrInfo);
×
59

UNCOV
60
  int32_t code = 0;
×
UNCOV
61
  int32_t lino = 0;
×
62

UNCOV
63
  SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
×
UNCOV
64
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
65
  if (pInfo == NULL || pOperator == NULL) {
×
66
    code = terrno;
×
67
    goto _error;
×
68
  }
69

UNCOV
70
  pOperator->pTaskInfo = pTaskInfo;
×
UNCOV
71
  SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
×
72

UNCOV
73
  int32_t numOfCols = 0;
×
UNCOV
74
  code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
×
UNCOV
75
  QUERY_CHECK_CODE(code, lino, _error);
×
76

UNCOV
77
  pOperator->exprSupp.numOfExprs = numOfCols;
×
UNCOV
78
  int32_t numOfOutputCols = 0;
×
79
  code =
UNCOV
80
      extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
×
UNCOV
81
  if (code != TSDB_CODE_SUCCESS) {
×
82
    goto _error;
×
83
  }
84
  
UNCOV
85
  calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
×
UNCOV
86
  pInfo->maxRows = -1;
×
UNCOV
87
  if (pSortNode->node.pLimit && ((SLimitNode*)pSortNode->node.pLimit)->limit) {
×
UNCOV
88
    SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
×
UNCOV
89
    if (pLimit->limit->datum.i > 0) {
×
UNCOV
90
      pInfo->maxRows = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
×
91
    }
92
  }
93

UNCOV
94
  pOperator->exprSupp.pCtx =
×
UNCOV
95
      createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
96
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
×
UNCOV
97
  initResultSizeInfo(&pOperator->resultInfo, 1024);
×
UNCOV
98
  code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
×
UNCOV
99
  if (code != TSDB_CODE_SUCCESS) {
×
100
    goto _error;
×
101
  }
102

UNCOV
103
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
×
UNCOV
104
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
105

UNCOV
106
  pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
×
UNCOV
107
  TSDB_CHECK_NULL(pInfo->pSortInfo, code, lino, _error, terrno);
×
108

UNCOV
109
  if (pSortNode->calcGroupId) {
×
110
    int32_t keyLen;
UNCOV
111
    SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
×
UNCOV
112
    if (!pGroupIdCalc) {
×
113
      code = terrno;
×
114
      goto _error;
×
115
    }
UNCOV
116
    SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
×
UNCOV
117
    if (!pSortColsNodeArr) code = terrno;
×
UNCOV
118
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
119
      pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
×
UNCOV
120
      if (!pGroupIdCalc->pSortColsArr) code = terrno;
×
UNCOV
121
      nodesClearList(pSortColsNodeArr);
×
122
    }
UNCOV
123
    if (TSDB_CODE_SUCCESS == code) {
×
124
      // PK ts col should always at last, see partColOptCreateSort
UNCOV
125
      if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
×
UNCOV
126
      code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
×
UNCOV
127
      QUERY_CHECK_CODE(code, lino, _error);
×
128
    }
UNCOV
129
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
130
      pGroupIdCalc->lastKeysLen = 0;
×
UNCOV
131
      pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
×
UNCOV
132
      if (!pGroupIdCalc->keyBuf) {
×
133
        code = terrno;
×
134
      }
135
    }
136
  }
UNCOV
137
  if (code != TSDB_CODE_SUCCESS) goto _error;
×
138

UNCOV
139
  pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
×
UNCOV
140
  pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
×
UNCOV
141
  initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
×
142

UNCOV
143
  setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
×
144

145

146
  // lazy evaluation for the following parameter since the input datablock is not known till now.
147
  //  pInfo->bufPageSize  = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
148
  //  there are headers, so pageSize = rowSize + header pInfo->sortBufSize  = pInfo->bufPageSize * 16;
149
  // TODO dynamic set the available sort buffer
150

151
  pOperator->fpSet =
UNCOV
152
      createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL);
×
153

UNCOV
154
  code = appendDownstream(pOperator, &downstream, 1);
×
UNCOV
155
  if (code != TSDB_CODE_SUCCESS) {
×
156
    goto _error;
×
157
  }
158

UNCOV
159
  *pOptrInfo = pOperator;
×
UNCOV
160
  return TSDB_CODE_SUCCESS;
×
161

162
_error:
×
163
  if (pInfo != NULL) {
×
164
    destroySortOperatorInfo(pInfo);
×
165
  }
166
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
167
  pTaskInfo->code = code;
×
168
  return code;
×
169
}
170

UNCOV
171
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
×
UNCOV
172
  int32_t code = 0;
×
UNCOV
173
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
×
UNCOV
174
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
×
UNCOV
175
    if (pColInfo == NULL) {
×
176
      return terrno;
×
177
    }
178

UNCOV
179
    bool isNull = tsortIsNullVal(pTupleHandle, i);
×
UNCOV
180
    if (isNull) {
×
UNCOV
181
      colDataSetNULL(pColInfo, pBlock->info.rows);
×
182
    } else {
UNCOV
183
      char* pData = NULL;
×
UNCOV
184
      tsortGetValue(pTupleHandle, i, (void**) &pData);
×
185

UNCOV
186
      if (pData != NULL) {
×
UNCOV
187
        code = colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
×
UNCOV
188
        if (code) {
×
189
          return code;
×
190
        }
191
      }
192
    }
193
  }
194

UNCOV
195
  pBlock->info.dataLoad = 1;
×
196

UNCOV
197
  SDataBlockInfo info = {0};
×
UNCOV
198
  tsortGetBlockInfo(pTupleHandle, &info);
×
199

UNCOV
200
  pBlock->info.scanFlag = info.scanFlag;
×
UNCOV
201
  pBlock->info.rows += 1;
×
UNCOV
202
  return code;
×
203
}
204

205
/**
206
 * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
207
 * @param [in, out] pBlock the output block, the group id will be saved in it
208
 * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
209
 */
UNCOV
210
static int32_t nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock,
×
211
                                    STupleHandle** pTupleHandle) {
UNCOV
212
  QRY_PARAM_CHECK(pTupleHandle);
×
213

UNCOV
214
  int32_t       code = 0;
×
UNCOV
215
  STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
×
UNCOV
216
  if (!retTuple) {
×
UNCOV
217
    code = tsortNextTuple(pHandle, &retTuple);
×
UNCOV
218
    if (code) {
×
UNCOV
219
      qError("failed to get next tuple, code:%s", tstrerror(code));
×
220
      return code;
×
221
    }
222
  }
223

UNCOV
224
  if (retTuple) {
×
225
    int32_t newGroup;
UNCOV
226
    if (pInfo->pGroupIdCalc->pSavedTuple) {
×
UNCOV
227
      newGroup = true;
×
UNCOV
228
      pInfo->pGroupIdCalc->pSavedTuple = NULL;
×
229
    } else {
UNCOV
230
      newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
×
UNCOV
231
                                       &pInfo->pGroupIdCalc->lastKeysLen, retTuple);
×
232
    }
233

UNCOV
234
    bool emptyBlock = (pBlock->info.rows == 0);
×
UNCOV
235
    if (newGroup) {
×
UNCOV
236
      if (!emptyBlock) {
×
237
        // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
238
        // NULL. Note that the keyBuf and lastKeysLen has been updated to new value
UNCOV
239
        pInfo->pGroupIdCalc->pSavedTuple = retTuple;
×
UNCOV
240
        retTuple = NULL;
×
241
      } else {
242
        // new group with empty block
UNCOV
243
        pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
×
UNCOV
244
            calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
×
245
      }
246
    } else {
UNCOV
247
      if (emptyBlock) {
×
248
        // new block but not new group, assign last group id to it
UNCOV
249
        pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
×
250
      } else {
251
        // not new group and not empty block and ret NOT NULL, just return the tuple
252
      }
253
    }
254
  }
255

UNCOV
256
  *pTupleHandle = retTuple;
×
UNCOV
257
  return code;
×
258
}
259

UNCOV
260
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
×
261
                                SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
UNCOV
262
  QRY_PARAM_CHECK(pResBlock);
×
UNCOV
263
  blockDataCleanup(pDataBlock);
×
264

UNCOV
265
  int32_t       lino = 0;
×
UNCOV
266
  int32_t       code = 0;
×
UNCOV
267
  STupleHandle* pTupleHandle = NULL;
×
UNCOV
268
  SSDataBlock*  p = NULL;
×
269

UNCOV
270
  code = tsortGetSortedDataBlock(pHandle, &p);
×
UNCOV
271
  if (p == NULL || (code != 0)) {
×
UNCOV
272
    return code;
×
273
  }
274

UNCOV
275
  code = blockDataEnsureCapacity(p, capacity);
×
UNCOV
276
  QUERY_CHECK_CODE(code, lino, _error);
×
277

278
  while (1) {
UNCOV
279
    if (pInfo->pGroupIdCalc) {
×
UNCOV
280
      code = nextTupleWithGroupId(pHandle, pInfo, p, &pTupleHandle);
×
281
    } else {
UNCOV
282
      code = tsortNextTuple(pHandle, &pTupleHandle);
×
283
    }
284

UNCOV
285
    TSDB_CHECK_CODE(code, lino, _error);
×
UNCOV
286
    if (pTupleHandle == NULL) {
×
UNCOV
287
      break;
×
288
    }
289

UNCOV
290
    code = appendOneRowToDataBlock(p, pTupleHandle);
×
UNCOV
291
    QUERY_CHECK_CODE(code, lino, _error);
×
292

UNCOV
293
    if (p->info.rows >= capacity) {
×
UNCOV
294
      break;
×
295
    }
296
  }
297

UNCOV
298
  QUERY_CHECK_CODE(code, lino, _error);
×
299

UNCOV
300
  if (p->info.rows > 0) {
×
UNCOV
301
    code = blockDataEnsureCapacity(pDataBlock, capacity);
×
UNCOV
302
    QUERY_CHECK_CODE(code, lino, _error);
×
303

304
    // todo extract function to handle this
UNCOV
305
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
×
UNCOV
306
    for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
307
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
×
UNCOV
308
      QUERY_CHECK_NULL(pmInfo, code, lino, _error, terrno);
×
309

UNCOV
310
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
×
UNCOV
311
      QUERY_CHECK_NULL(pSrc, code, lino, _error, terrno);
×
312

UNCOV
313
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
×
UNCOV
314
      QUERY_CHECK_NULL(pDst, code, lino, _error, terrno);
×
315

UNCOV
316
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
×
UNCOV
317
      QUERY_CHECK_CODE(code, lino, _error);
×
318
    }
319

UNCOV
320
    pDataBlock->info.dataLoad = 1;
×
UNCOV
321
    pDataBlock->info.rows = p->info.rows;
×
UNCOV
322
    pDataBlock->info.scanFlag = p->info.scanFlag;
×
UNCOV
323
    pDataBlock->info.id.groupId = p->info.id.groupId;
×
324
  }
325

UNCOV
326
  blockDataDestroy(p);
×
UNCOV
327
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
×
UNCOV
328
  return code;
×
329

330
  _error:
×
331
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
332

333
  blockDataDestroy(p);
×
334
  return code;
×
335
}
336

UNCOV
337
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
×
UNCOV
338
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
×
UNCOV
339
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
×
UNCOV
340
  if (code) {
×
341
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
342
  } else {
UNCOV
343
    code = blockDataCheck(*ppBlock);
×
UNCOV
344
    if (code) {
×
345
      qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
×
346
    }
347
  }
UNCOV
348
  return code;
×
349
}
350

351
// todo refactor: merged with fetch fp
UNCOV
352
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
×
UNCOV
353
  SOperatorInfo*     pOperator = param;
×
UNCOV
354
  SSortOperatorInfo* pSort = pOperator->info;
×
UNCOV
355
  if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
×
UNCOV
356
    int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
×
357
                                         pOperator->exprSupp.numOfExprs, NULL);
UNCOV
358
    if (code != TSDB_CODE_SUCCESS) {
×
359
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
360
    }
361
  }
UNCOV
362
}
×
363

UNCOV
364
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
×
UNCOV
365
  SSortOperatorInfo* pInfo = pOperator->info;
×
UNCOV
366
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
367
  int32_t            code = TSDB_CODE_SUCCESS;
×
UNCOV
368
  int32_t            lino = 0;
×
UNCOV
369
  SSortSource* pSource =NULL;
×
370

UNCOV
371
  if (OPTR_IS_OPENED(pOperator)) {
×
UNCOV
372
    return code;
×
373
  }
374

UNCOV
375
  pInfo->startTs = taosGetTimestampUs();
×
376
  //  pInfo->binfo.pRes is not equalled to the input datablock.
UNCOV
377
  pInfo->pSortHandle = NULL;
×
378
  code =
UNCOV
379
      tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
×
UNCOV
380
                            pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
×
UNCOV
381
  QUERY_CHECK_CODE(code, lino, _end);
×
382

UNCOV
383
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
×
384

UNCOV
385
  pSource = taosMemoryCalloc(1, sizeof(SSortSource));
×
UNCOV
386
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
×
387

UNCOV
388
  pSource->param = pOperator->pDownstream[0];
×
UNCOV
389
  pSource->onlyRef = true;
×
390

UNCOV
391
  code = tsortAddSource(pInfo->pSortHandle, pSource);
×
UNCOV
392
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
393
  pSource = NULL;
×
394

UNCOV
395
  code = tsortOpen(pInfo->pSortHandle);
×
UNCOV
396
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
397
  pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
×
UNCOV
398
  pOperator->status = OP_RES_TO_RETURN;
×
UNCOV
399
  OPTR_SET_OPENED(pOperator);
×
400

UNCOV
401
_end:
×
UNCOV
402
  if (pSource) {
×
403
    taosMemoryFree(pSource);
×
404
  }
UNCOV
405
  if (code != TSDB_CODE_SUCCESS) {
×
406
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
407
    pTaskInfo->code = code;
×
408
    T_LONG_JMP(pTaskInfo->env, code);
×
409
  }
UNCOV
410
  return code;
×
411
}
412

UNCOV
413
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
×
UNCOV
414
  QRY_PARAM_CHECK(pResBlock);
×
UNCOV
415
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
416
  int32_t lino = 0;
×
UNCOV
417
  if (pOperator->status == OP_EXEC_DONE) {
×
UNCOV
418
    return code;
×
419
  }
420

UNCOV
421
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
422
  SSortOperatorInfo* pInfo = pOperator->info;
×
423

UNCOV
424
  code = pOperator->fpSet._openFn(pOperator);
×
UNCOV
425
  QUERY_CHECK_CODE(code, lino, _end);
×
426

427
  // multi-group case not handle here
UNCOV
428
  SSDataBlock* pBlock = NULL;
×
UNCOV
429
  while (1) {
×
UNCOV
430
    if (tsortIsClosed(pInfo->pSortHandle)) {
×
431
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
432
      QUERY_CHECK_CODE(code, lino, _end);
×
433
    }
434

UNCOV
435
    code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
×
436
                                pInfo->matchInfo.pList, pInfo, &pBlock);
UNCOV
437
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
438
    if (pBlock == NULL) {
×
UNCOV
439
      setOperatorCompleted(pOperator);
×
UNCOV
440
      return code;
×
441
    }
442

UNCOV
443
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
×
UNCOV
444
    QUERY_CHECK_CODE(code, lino, _end);
×
445

UNCOV
446
    if (blockDataGetNumOfRows(pBlock) == 0) {
×
447
      continue;
×
448
    }
449

450
    // there are bugs?
UNCOV
451
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
×
UNCOV
452
    if (limitReached) {
×
UNCOV
453
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
×
454
    }
455

UNCOV
456
    pOperator->resultInfo.totalRows += pBlock->info.rows;
×
UNCOV
457
    if (pBlock->info.rows > 0) {
×
UNCOV
458
      break;
×
459
    }
460
  }
461

UNCOV
462
  *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
×
UNCOV
463
_end:
×
UNCOV
464
  if (code != TSDB_CODE_SUCCESS) {
×
465
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
466
    pTaskInfo->code = code;
×
467
    T_LONG_JMP(pTaskInfo->env, code);
×
468
  }
UNCOV
469
  return code;
×
470
}
471

UNCOV
472
void destroySortOperatorInfo(void* param) {
×
UNCOV
473
  SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
×
UNCOV
474
  blockDataDestroy(pInfo->binfo.pRes);
×
UNCOV
475
  pInfo->binfo.pRes = NULL;
×
476

UNCOV
477
  tsortDestroySortHandle(pInfo->pSortHandle);
×
UNCOV
478
  taosArrayDestroy(pInfo->pSortInfo);
×
UNCOV
479
  taosArrayDestroy(pInfo->matchInfo.pList);
×
UNCOV
480
  destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
×
UNCOV
481
  taosMemoryFreeClear(param);
×
UNCOV
482
}
×
483

UNCOV
484
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
UNCOV
485
  SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
×
UNCOV
486
  if (pInfo == NULL) {
×
487
    return terrno;
×
488
  }
489

UNCOV
490
  SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
×
491

UNCOV
492
  *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
×
UNCOV
493
  *pOptrExplain = pInfo;
×
UNCOV
494
  *len = sizeof(SSortExecInfo);
×
UNCOV
495
  return TSDB_CODE_SUCCESS;
×
496
}
497

UNCOV
498
static void calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
×
UNCOV
499
  SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
×
UNCOV
500
  size_t         size = taosArrayGetSize(pColItem->pList);
×
UNCOV
501
  for (size_t i = 0; i < size; ++i) {
×
UNCOV
502
    SColMatchItem* pInfo = taosArrayGet(pColItem->pList, i);
×
UNCOV
503
    if (pInfo == NULL) {
×
504
      continue;
×
505
    }
506

UNCOV
507
    pSortOperInfo->maxTupleLength += pInfo->dataType.bytes;
×
508
  }
509

UNCOV
510
  size = LIST_LENGTH(pSortKeys);
×
UNCOV
511
  for (size_t i = 0; i < size; ++i) {
×
UNCOV
512
    SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
×
UNCOV
513
    pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
×
514
  }
UNCOV
515
}
×
516

UNCOV
517
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
×
UNCOV
518
  if (pCalc) {
×
UNCOV
519
    taosArrayDestroy(pCalc->pSortColsArr);
×
UNCOV
520
    taosMemoryFree(pCalc->keyBuf);
×
UNCOV
521
    taosMemoryFree(pCalc);
×
522
  }
UNCOV
523
}
×
524

525
//=====================================================================================
526
// Group Sort Operator
527
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
528

529
typedef struct SGroupSortOperatorInfo {
530
  SOptrBasicInfo       binfo;
531
  SArray*              pSortInfo;
532
  SColMatchInfo        matchInfo;
533
  int64_t              startTs;
534
  uint64_t             sortElapsed;
535
  bool                 hasGroupId;
536
  uint64_t             currGroupId;
537
  SSDataBlock*         prefetchedSortInput;
538
  SSortHandle*         pCurrSortHandle;
539
  EChildOperatorStatus childOpStatus;
540
  SSortExecInfo        sortExecInfo;
541
} SGroupSortOperatorInfo;
542

UNCOV
543
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
×
544
                                SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
UNCOV
545
  QRY_PARAM_CHECK(pResBlock);
×
546

UNCOV
547
  blockDataCleanup(pDataBlock);
×
UNCOV
548
  int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
×
UNCOV
549
  if (code) {
×
550
    return code;
×
551
  }
552

UNCOV
553
  SSDataBlock* p = NULL;
×
UNCOV
554
  code = tsortGetSortedDataBlock(pHandle, &p);
×
UNCOV
555
  if (p == NULL || (code != 0)) {
×
556
    return code;
×
557
  }
558

UNCOV
559
  code = blockDataEnsureCapacity(p, capacity);
×
UNCOV
560
  if (code) {
×
561
    return code;
×
562
  }
563

UNCOV
564
  while (1) {
×
UNCOV
565
    STupleHandle* pTupleHandle = NULL;
×
UNCOV
566
    code = tsortNextTuple(pHandle, &pTupleHandle);
×
UNCOV
567
    if (pTupleHandle == NULL || code != 0) {
×
568
      break;
569
    }
570

UNCOV
571
    code = appendOneRowToDataBlock(p, pTupleHandle);
×
UNCOV
572
    if (code) {
×
573
      break;
×
574
    }
575

UNCOV
576
    if (p->info.rows >= capacity) {
×
577
      break;
×
578
    }
579
  }
580

UNCOV
581
  if (p->info.rows > 0) {
×
UNCOV
582
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
×
UNCOV
583
    for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
584
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
×
UNCOV
585
      if (pmInfo == NULL) {
×
586
        return terrno;
×
587
      }
588

UNCOV
589
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
×
UNCOV
590
      if (pSrc == NULL) {
×
591
        return terrno;
×
592
      }
593

UNCOV
594
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
×
UNCOV
595
      if (pDst == NULL) {
×
596
        return terrno;
×
597
      }
598

UNCOV
599
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
×
UNCOV
600
      if (code) {
×
601
        return code;
×
602
      }
603
    }
604

UNCOV
605
    pDataBlock->info.rows = p->info.rows;
×
UNCOV
606
    pDataBlock->info.capacity = p->info.rows;
×
UNCOV
607
    pDataBlock->info.scanFlag = p->info.scanFlag;
×
608
  }
609

UNCOV
610
  blockDataDestroy(p);
×
UNCOV
611
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
×
UNCOV
612
  return code;
×
613
}
614

615
typedef struct SGroupSortSourceParam {
616
  SOperatorInfo*          childOpInfo;
617
  SGroupSortOperatorInfo* grpSortOpInfo;
618
} SGroupSortSourceParam;
619

UNCOV
620
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
×
UNCOV
621
  int32_t                 code = 0;
×
UNCOV
622
  int32_t                 lino = 0;
×
UNCOV
623
  SGroupSortSourceParam*  source = param;
×
UNCOV
624
  SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
×
UNCOV
625
  SSDataBlock*            block = NULL;
×
626

UNCOV
627
  QRY_PARAM_CHECK(ppBlock);
×
628

UNCOV
629
  if (grpSortOpInfo->prefetchedSortInput) {
×
UNCOV
630
    block = grpSortOpInfo->prefetchedSortInput;
×
UNCOV
631
    grpSortOpInfo->prefetchedSortInput = NULL;
×
UNCOV
632
    *ppBlock = block;
×
633
  } else {
UNCOV
634
    SOperatorInfo* childOp = source->childOpInfo;
×
UNCOV
635
    code = childOp->fpSet.getNextFn(childOp, &block);
×
UNCOV
636
    QUERY_CHECK_CODE(code, lino, _end);
×
637

UNCOV
638
    if (block != NULL) {
×
UNCOV
639
      code = blockDataCheck(block);
×
UNCOV
640
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
641
      if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
×
UNCOV
642
        grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
×
UNCOV
643
        *ppBlock = block;
×
644
      } else {
645
        grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
646
        grpSortOpInfo->prefetchedSortInput = block;
×
647
      }
648
    } else {
UNCOV
649
      grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
×
650
    }
651
  }
652

UNCOV
653
  return code;
×
654
_end:
×
655
  if (code != 0) {
×
656
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
657
  }
658
  return code;
×
659
}
660

UNCOV
661
int32_t beginSortGroup(SOperatorInfo* pOperator) {
×
UNCOV
662
  SGroupSortOperatorInfo* pInfo = pOperator->info;
×
UNCOV
663
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
×
664

665
  //  pInfo->binfo.pRes is not equalled to the input datablock.
UNCOV
666
  pInfo->pCurrSortHandle = NULL;
×
667

UNCOV
668
  int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0,
×
669
                                       0, &pInfo->pCurrSortHandle);
UNCOV
670
  if (code) {
×
671
    return code;
×
672
  }
673

UNCOV
674
  tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
×
675

UNCOV
676
  SSortSource*           ps = taosMemoryCalloc(1, sizeof(SSortSource));
×
UNCOV
677
  SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
×
UNCOV
678
  if (ps == NULL || param == NULL) {
×
679
    taosMemoryFree(ps);
×
680
    taosMemoryFree(param);
×
681
    return terrno;
×
682
  }
683

UNCOV
684
  param->childOpInfo = pOperator->pDownstream[0];
×
UNCOV
685
  param->grpSortOpInfo = pInfo;
×
686

UNCOV
687
  ps->param = param;
×
UNCOV
688
  ps->onlyRef = false;
×
UNCOV
689
  code = tsortAddSource(pInfo->pCurrSortHandle, ps);
×
UNCOV
690
  if (code != 0) {
×
691
    return code;
×
692
  }
693

UNCOV
694
  code = tsortOpen(pInfo->pCurrSortHandle);
×
UNCOV
695
  return code;
×
696
}
697

UNCOV
698
int32_t finishSortGroup(SOperatorInfo* pOperator) {
×
UNCOV
699
  SGroupSortOperatorInfo* pInfo = pOperator->info;
×
700

UNCOV
701
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
×
702

UNCOV
703
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
×
UNCOV
704
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
×
UNCOV
705
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
×
UNCOV
706
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
×
UNCOV
707
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
×
708

UNCOV
709
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
UNCOV
710
  pInfo->pCurrSortHandle = NULL;
×
711

UNCOV
712
  return TSDB_CODE_SUCCESS;
×
713
}
714

UNCOV
715
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
×
UNCOV
716
  QRY_PARAM_CHECK(pResBlock);
×
UNCOV
717
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
718
  SGroupSortOperatorInfo* pInfo = pOperator->info;
×
UNCOV
719
  int32_t                 code = TSDB_CODE_SUCCESS;
×
UNCOV
720
  int32_t                 lino = 0;
×
721

UNCOV
722
  if (pOperator->status == OP_EXEC_DONE) {
×
723
    return code;
×
724
  }
725

UNCOV
726
  code = pOperator->fpSet._openFn(pOperator);
×
UNCOV
727
  QUERY_CHECK_CODE(code, lino, _end);
×
728

UNCOV
729
  if (!pInfo->hasGroupId) {
×
UNCOV
730
    pInfo->hasGroupId = true;
×
731

UNCOV
732
    pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0);
×
UNCOV
733
    if (pInfo->prefetchedSortInput == NULL) {
×
UNCOV
734
      setOperatorCompleted(pOperator);
×
UNCOV
735
      return code;
×
736
    }
737

UNCOV
738
    pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
×
UNCOV
739
    pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
×
UNCOV
740
    code = beginSortGroup(pOperator);
×
UNCOV
741
    QUERY_CHECK_CODE(code, lino, _end);
×
742
  }
743

UNCOV
744
  SSDataBlock* pBlock = NULL;
×
UNCOV
745
  while (pInfo->pCurrSortHandle != NULL) {
×
UNCOV
746
    if (tsortIsClosed(pInfo->pCurrSortHandle)) {
×
747
      code = TSDB_CODE_TSC_QUERY_CANCELLED;
×
748
      QUERY_CHECK_CODE(code, lino, _end);
×
749
    }
750

751
    // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
UNCOV
752
    if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
×
753
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
754
      QUERY_CHECK_CODE(code, lino, _end);
×
755
    }
756

UNCOV
757
    code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
×
758
                                     pInfo->matchInfo.pList, pInfo, &pBlock);
UNCOV
759
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
760
    if (pBlock != NULL) {
×
UNCOV
761
      pBlock->info.id.groupId = pInfo->currGroupId;
×
UNCOV
762
      pOperator->resultInfo.totalRows += pBlock->info.rows;
×
UNCOV
763
      *pResBlock = pBlock;
×
UNCOV
764
      return code;
×
765
    } else {
UNCOV
766
      if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
×
767
        (void) finishSortGroup(pOperator);
×
768
        pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
×
769
        code = beginSortGroup(pOperator);
×
770
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
771
      } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
×
UNCOV
772
        (void) finishSortGroup(pOperator);
×
UNCOV
773
        setOperatorCompleted(pOperator);
×
UNCOV
774
        return code;
×
775
      }
776
    }
777
  }
778

779
_end:
×
780
  if (code != TSDB_CODE_SUCCESS) {
×
781
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
782
    pTaskInfo->code = code;
×
783
    T_LONG_JMP(pTaskInfo->env, code);
×
784
  }
785
  return code;
×
786
}
787

788
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
789
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
×
790
  *pOptrExplain = &pInfo->sortExecInfo;
×
791
  *len = sizeof(SSortExecInfo);
×
792
  return TSDB_CODE_SUCCESS;
×
793
}
794

UNCOV
795
void destroyGroupSortOperatorInfo(void* param) {
×
UNCOV
796
  SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
×
UNCOV
797
  blockDataDestroy(pInfo->binfo.pRes);
×
UNCOV
798
  pInfo->binfo.pRes = NULL;
×
799

UNCOV
800
  taosArrayDestroy(pInfo->pSortInfo);
×
UNCOV
801
  taosArrayDestroy(pInfo->matchInfo.pList);
×
802

UNCOV
803
  tsortDestroySortHandle(pInfo->pCurrSortHandle);
×
UNCOV
804
  pInfo->pCurrSortHandle = NULL;
×
805

UNCOV
806
  taosMemoryFreeClear(param);
×
UNCOV
807
}
×
808

UNCOV
809
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
×
810
                                    SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
UNCOV
811
  QRY_PARAM_CHECK(pOptrInfo);
×
UNCOV
812
  int32_t code = 0;
×
UNCOV
813
  int32_t lino = 0;
×
814

UNCOV
815
  SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo));
×
UNCOV
816
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
817
  if (pInfo == NULL || pOperator == NULL) {
×
818
    code = terrno;
×
819
    goto _error;
×
820
  }
821

UNCOV
822
  SExprSupp*          pSup = &pOperator->exprSupp;
×
UNCOV
823
  SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
×
824

UNCOV
825
  int32_t    numOfCols = 0;
×
UNCOV
826
  SExprInfo* pExprInfo = NULL;
×
UNCOV
827
  code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
×
UNCOV
828
  QUERY_CHECK_CODE(code, lino, _error);
×
829

UNCOV
830
  pSup->pExprInfo = pExprInfo;
×
UNCOV
831
  pSup->numOfExprs = numOfCols;
×
832

UNCOV
833
  initResultSizeInfo(&pOperator->resultInfo, 1024);
×
UNCOV
834
  pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
×
835
                                                  &pTaskInfo->storageAPI.functionStore);
UNCOV
836
  QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
×
837

UNCOV
838
  pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
×
UNCOV
839
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
840

UNCOV
841
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
UNCOV
842
  TSDB_CHECK_CODE(code, lino, _error);
×
843

UNCOV
844
  pInfo->binfo.inputTsOrder = pSortPhyNode->node.inputTsOrder;
×
UNCOV
845
  pInfo->binfo.outputTsOrder = pSortPhyNode->node.outputTsOrder;
×
846

UNCOV
847
  int32_t numOfOutputCols = 0;
×
UNCOV
848
  code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
×
849
                             &pInfo->matchInfo);
UNCOV
850
  TSDB_CHECK_CODE(code, lino, _error);
×
851

UNCOV
852
  pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
×
UNCOV
853
  setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo,
×
854
                  pTaskInfo);
UNCOV
855
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
×
856
                                         optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL);
857

UNCOV
858
  code = appendDownstream(pOperator, &downstream, 1);
×
UNCOV
859
  if (code != TSDB_CODE_SUCCESS) {
×
860
    goto _error;
×
861
  }
862

UNCOV
863
  *pOptrInfo = pOperator;
×
UNCOV
864
  return TSDB_CODE_SUCCESS;
×
865

866
_error:
×
867
  pTaskInfo->code = code;
×
868
  if (pInfo != NULL) {
×
869
    destroyGroupSortOperatorInfo(pInfo);
×
870
  }
871
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
872
  return code;
×
873
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc