• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5014

03 Apr 2026 03:59PM UTC coverage: 72.256% (-0.06%) from 72.317%
#5014

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4054 of 5985 new or added lines in 68 files covered. (67.74%)

13285 existing lines in 168 files now uncovered.

257272 of 356056 relevant lines covered (72.26%)

133154720.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.56
/source/libs/executor/src/groupoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "query.h"
20
#include "tname.h"
21
#include "tutil.h"
22

23
#include "tdatablock.h"
24
#include "tmsg.h"
25

26
#include "executorInt.h"
27
#include "operator.h"
28
#include "querytask.h"
29
#include "tcompare.h"
30
#include "thash.h"
31
#include "ttypes.h"
32

33
typedef struct SGroupbyOperatorInfo {
34
  SOptrBasicInfo binfo;
35
  SAggSupporter  aggSup;
36
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
37
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
38
  bool           isInit;         // denote if current val is initialized or not
39
  char*          keyBuf;         // group by keys for hash
40
  int32_t        groupKeyLen;    // total group by column width
41
  SGroupResInfo  groupResInfo;
42
  SExprSupp      scalarSup;
43
  SOperatorInfo  *pOperator;
44
  SLimitInfo     limitInfo;
45
} SGroupbyOperatorInfo;
46

47
// The sort in partition may be needed later.
48
typedef struct SPartitionOperatorInfo {
49
  SOptrBasicInfo binfo;
50
  SArray*        pGroupCols;
51
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
52
  char*          keyBuf;         // group by keys for hash
53
  int32_t        groupKeyLen;    // total group by column width
54
  SHashObj*      pGroupSet;      // quick locate the window object for each result
55

56
  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
57
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
58
  int32_t*       columnOffset;      // start position for each column data
59
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
60
  int32_t        groupIndex;        // group index
61
  int32_t        pageIndex;         // page index of current group
62
  SExprSupp      scalarSup;
63

64
  int32_t remainRows;
65
  int32_t orderedRows;
66
  SArray* pOrderInfoArr;
67
} SPartitionOperatorInfo;
68

69
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
70
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
71
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
72
                                        int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
73
static int32_t  extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes);
74
static int32_t  getPartitionPageRowCapacity(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize);
75

76
static void freeGroupKey(void* param) {
55,901,605✔
77
  SGroupKeys* pKey = (SGroupKeys*)param;
55,901,605✔
78
  taosMemoryFree(pKey->pData);
55,901,605✔
79
}
55,902,644✔
80

81
static void destroyGroupOperatorInfo(void* param) {
31,778,179✔
82
  if (param == NULL) {
31,778,179✔
UNCOV
83
    return;
×
84
  }
85
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
31,778,179✔
86

87
  cleanupBasicInfo(&pInfo->binfo);
31,778,179✔
88
  taosMemoryFreeClear(pInfo->keyBuf);
31,778,712✔
89
  taosArrayDestroy(pInfo->pGroupCols);
31,777,163✔
90
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
31,780,816✔
91
  cleanupExprSupp(&pInfo->scalarSup);
31,780,407✔
92

93
  if (pInfo->pOperator != NULL) {
31,780,487✔
94
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
31,689,995✔
95
                      false);
96
    pInfo->pOperator = NULL;
31,684,278✔
97
  }
98

99
  cleanupGroupResInfo(&pInfo->groupResInfo);
31,776,431✔
100
  cleanupAggSup(&pInfo->aggSup);
31,772,796✔
101
  taosMemoryFreeClear(param);
31,777,778✔
102
}
103

104
static int32_t getPartitionPageRowCapacity(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
3,321,228✔
105
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,321,228✔
106
  int32_t payloadSize = pageSize - extraSize;
3,319,649✔
107
  int32_t payloadRowSize = 0;
3,319,649✔
108
  int32_t numVarCols = 0;
3,319,649✔
109
  int32_t numFixCols = 0;
3,319,649✔
110

111
  for (int32_t i = 0; i < numOfCols; ++i) {
14,209,212✔
112
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
10,885,741✔
113
    if (pCol == NULL) {
10,877,687✔
NEW
114
      terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
NEW
115
      return -1;
×
116
    }
117

118
    payloadRowSize += blockDataGetPagedColumnReservedBytes(pCol);
10,877,687✔
119
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
10,882,509✔
120
      ++numVarCols;
1,786,634✔
121
    } else {
122
      ++numFixCols;
9,102,929✔
123
    }
124
  }
125

126
  int32_t nRows = payloadSize / payloadRowSize;
3,323,471✔
127
  if (nRows < 1) {
3,323,471✔
NEW
128
    uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, payloadRowSize);
×
NEW
129
    terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
NEW
130
    return -1;
×
131
  }
132

133
  int32_t result = -1;
3,323,471✔
134
  int32_t start = 1;
3,323,471✔
135
  int32_t end = nRows;
3,323,471✔
136
  while (start <= end) {
27,918,212✔
137
    int32_t mid = start + (end - start) / 2;
24,594,741✔
138
    int32_t midSize = payloadRowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
24,594,741✔
139
    if (midSize > payloadSize) {
24,594,741✔
140
      result = mid;
4,858,256✔
141
      end = mid - 1;
4,858,256✔
142
    } else {
143
      start = mid + 1;
19,736,485✔
144
    }
145
  }
146

147
  return (result != -1) ? result - 1 : nRows;
3,323,471✔
148
}
149

150
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
35,004,817✔
151
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
35,004,817✔
152
  if ((*pGroupColVals) == NULL) {
35,002,347✔
UNCOV
153
    return terrno;
×
154
  }
155

156
  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
35,001,695✔
157
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
95,088,052✔
158
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
60,089,321✔
159
    if (!pCol) {
60,092,423✔
UNCOV
160
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
161
      return terrno;
×
162
    }
163
    (*keyLen) += pCol->bytes;  // actual data + null_flag
60,092,423✔
164

165
    SGroupKeys key = {0};
60,089,083✔
166
    key.bytes = pCol->bytes;
60,088,630✔
167
    key.type = pCol->type;
60,083,719✔
168
    key.isNull = false;
60,077,616✔
169
    key.pData = taosMemoryCalloc(1, pCol->bytes);
60,077,616✔
170
    if (key.pData == NULL) {
60,072,463✔
UNCOV
171
      return terrno;
×
172
    }
173

174
    void* tmp = taosArrayPush((*pGroupColVals), &key);
60,072,463✔
175
    if (!tmp) {
60,095,763✔
176
      return terrno;
×
177
    }
178
  }
179

180
  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
34,998,731✔
181
  (*keyLen) += nullFlagSize;
34,998,731✔
182

183
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
34,999,893✔
184
  if ((*keyBuf) == NULL) {
34,991,550✔
UNCOV
185
    return terrno;
×
186
  }
187

188
  return TSDB_CODE_SUCCESS;
34,992,512✔
189
}
190

191
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
2,147,483,647✔
192
                            int32_t numOfGroupCols) {
193
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
194
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
2,147,483,647✔
195
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
2,147,483,647✔
196
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
197
    if (pBlock->pBlockAgg != NULL) {
2,147,483,647✔
UNCOV
198
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
199
    }
200

201
    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
2,147,483,647✔
202

203
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
2,147,483,647✔
204
    if (pkey->isNull && isNull) {
2,147,483,647✔
205
      continue;
2,147,483,647✔
206
    }
207

208
    if (isNull || pkey->isNull) {
2,147,483,647✔
209
      return false;
2,147,483,647✔
210
    }
211

212
    char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
213

214
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
215
      int32_t dataLen = getJsonValueLen(val);
6,888✔
216

217
      if (memcmp(pkey->pData, val, dataLen) == 0) {
6,888✔
218
        continue;
984✔
219
      } else {
220
        return false;
5,904✔
221
      }
222
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
2,147,483,647✔
223
      if (IS_STR_DATA_BLOB(pkey->type)) {
2,147,483,647✔
224
        int32_t len = blobDataLen(val);
13✔
UNCOV
225
        if (len == blobDataLen(pkey->pData) && memcmp(blobDataVal(pkey->pData), blobDataVal(val), len) == 0) {
×
UNCOV
226
          continue;
×
227
        } else {
UNCOV
228
          return false;
×
229
        }
230
      } else {
231
        int32_t len = varDataLen(val);
2,147,483,647✔
232
        if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
2,147,483,647✔
233
          continue;
2,147,483,647✔
234
        } else {
235
          return false;
1,180,735,234✔
236
        }
237
      }
238
    } else {
239
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
2,147,483,647✔
240
        return false;
2,147,483,647✔
241
      }
242
    }
243
  }
244

245
  return true;
2,147,483,647✔
246
}
247

248
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
2,147,483,647✔
249
  SColumnDataAgg* pColAgg = NULL;
2,147,483,647✔
250

251
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
2,147,483,647✔
252

253
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
2,147,483,647✔
254
    SColumn*         pCol = (SColumn*)taosArrayGet(pGroupCols, i);
2,147,483,647✔
255
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
2,147,483,647✔
256

257
    // valid range check. todo: return error code.
258
    if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
2,147,483,647✔
UNCOV
259
      continue;
×
260
    }
261

262
    if (pBlock->pBlockAgg != NULL) {
2,147,483,647✔
UNCOV
263
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
264
    }
265

266
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
2,147,483,647✔
267
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
2,147,483,647✔
268
      pkey->isNull = true;
2,147,483,647✔
269
    } else {
270
      pkey->isNull = false;
2,147,483,647✔
271
      char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
272
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
273
        // if (tTagIsJson(val)) {
274
        //   terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
275
        //   return;
276
        // }
277
        int32_t dataLen = getJsonValueLen(val);
40,966✔
278
        memcpy(pkey->pData, val, dataLen);
40,564✔
279
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
2,147,483,647✔
280
        if (IS_STR_DATA_BLOB(pkey->type)) {
2,147,483,647✔
281
          memcpy(pkey->pData, val, blobDataTLen(val));
497✔
282
        } else {
283
          memcpy(pkey->pData, val, varDataTLen(val));
2,147,483,647✔
284
        }
285
      } else {
286
        memcpy(pkey->pData, val, pkey->bytes);
2,147,483,647✔
287
      }
288
    }
289
  }
290
}
2,147,483,647✔
291

292
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
2,147,483,647✔
293
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
2,147,483,647✔
294

295
  char* isNull = (char*)pKey;
2,147,483,647✔
296
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
2,147,483,647✔
297
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
2,147,483,647✔
298
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
2,147,483,647✔
299
    if (pkey->isNull) {
2,147,483,647✔
300
      isNull[i] = 1;
2,147,483,647✔
301
      continue;
2,147,483,647✔
302
    }
303

304
    isNull[i] = 0;
2,147,483,647✔
305
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
306
      int32_t dataLen = getJsonValueLen(pkey->pData);
41,368✔
307
      memcpy(pStart, (pkey->pData), dataLen);
40,966✔
308
      pStart += dataLen;
40,966✔
309
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
2,147,483,647✔
310
      if (IS_STR_DATA_BLOB(pkey->type)) {
2,147,483,647✔
311
        blobDataCopy(pStart, pkey->pData);
319,248✔
UNCOV
312
        pStart += blobDataTLen(pkey->pData);
×
313
      } else {
314
        varDataCopy(pStart, pkey->pData);
2,147,483,647✔
315
        pStart += varDataTLen(pkey->pData);
2,147,483,647✔
316
      }
317
    } else {
318
      memcpy(pStart, pkey->pData, pkey->bytes);
2,147,483,647✔
319
      pStart += pkey->bytes;
2,147,483,647✔
320
    }
321
  }
322

323
  return (int32_t)(pStart - (char*)pKey);
2,147,483,647✔
324
}
325

326
// assign the group keys or user input constant values if required
327
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
2,147,483,647✔
328
  for (int32_t i = 0; i < numOfOutput; ++i) {
2,147,483,647✔
329
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
2,147,483,647✔
330
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
2,147,483,647✔
331

332
      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
2,147,483,647✔
333
      // todo OPT all/all not NULL
334
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
2,147,483,647✔
335
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
2,147,483,647✔
336
        char* data = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
337

338
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
339
          int32_t dataLen = getJsonValueLen(data);
15,832✔
340
          memcpy(dest, data, dataLen);
15,832✔
341
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
342
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
1,645,634,444✔
UNCOV
343
            blobDataCopy(dest, data);
×
344
          } else {
345
            varDataCopy(dest, data);
1,650,195,894✔
346
          }
347
        } else {
348
          memcpy(dest, data, pColInfoData->info.bytes);
2,147,483,647✔
349
        }
350
      } else {  // it is a NULL value
351
        pEntryInfo->isNullRes = 1;
2,147,483,647✔
352
      }
353

354
      pEntryInfo->numOfRes = 1;
2,147,483,647✔
355
    }
356
  }
357
}
2,147,483,647✔
358

359
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
270,603,694✔
360
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
270,603,694✔
361
  SGroupbyOperatorInfo* pInfo = pOperator->info;
270,611,393✔
362

363
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
270,607,285✔
364
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
270,612,075✔
365
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
366
  //  qError("QInfo:0x%" PRIx64 ", group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
367
  //    return;
368
  //  }
369

370
  int32_t len = 0;
270,605,999✔
371
  terrno = TSDB_CODE_SUCCESS;
270,605,999✔
372

373
  int32_t num = 0;
270,603,823✔
374
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,147,483,647✔
375
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
376
    if (!pInfo->isInit) {
2,147,483,647✔
377
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
23,750,923✔
378
      pInfo->isInit = true;
23,752,899✔
379
      num++;
23,753,375✔
380
      continue;
23,753,375✔
381
    }
382

383
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
2,147,483,647✔
384
    if (equal) {
2,147,483,647✔
385
      num++;
2,147,483,647✔
386
      continue;
2,147,483,647✔
387
    }
388

389
    // The first row of a new block does not belongs to the previous existed group
390
    if (j == 0) {
2,147,483,647✔
391
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
239,858,985✔
392
      num = 1;
239,857,657✔
393
      continue;
239,857,657✔
394
    }
395

396
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
2,147,483,647✔
397
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
2,147,483,647✔
398
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
399
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
2,147,483,647✔
UNCOV
400
      T_LONG_JMP(pTaskInfo->env, ret);
×
401
    }
402

403
    int32_t rowIndex = j - num;
2,147,483,647✔
404
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
2,147,483,647✔
405
                                          pOperator->exprSupp.numOfExprs);
406
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
407
      T_LONG_JMP(pTaskInfo->env, ret);
×
408
    }
409

410
    // assign the group keys or user input constant values if required
411
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
2,147,483,647✔
412
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2,147,483,647✔
413
    num = 1;
2,147,483,647✔
414
  }
415

416
  // The data of the last group is processed here, and if there is only one group, it is also processed here.
417
  if (num > 0) {
270,608,735✔
418
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
270,610,527✔
419
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
270,607,162✔
420
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
421
    if (ret != TSDB_CODE_SUCCESS) {
270,604,971✔
UNCOV
422
      T_LONG_JMP(pTaskInfo->env, ret);
×
423
    }
424

425
    int32_t rowIndex = pBlock->info.rows - num;
270,604,971✔
426
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
270,602,487✔
427
                                          pOperator->exprSupp.numOfExprs);
428
    if (ret != TSDB_CODE_SUCCESS) {
270,597,448✔
UNCOV
429
      T_LONG_JMP(pTaskInfo->env, ret);
×
430
    }
431
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
270,597,448✔
432
  }
433
}
270,595,111✔
434

435
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
2,147,483,647✔
436
  SGroupbyOperatorInfo* pInfo = pOperator->info;
2,147,483,647✔
437
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
2,147,483,647✔
438
  return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap);
2,147,483,647✔
439
}
440

441
void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
2,147,483,647✔
442
                                  SDiskbasedBuf* pBuf) {
443
  SGroupbyOperatorInfo* pInfo = pOperator->info;
2,147,483,647✔
444
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
2,147,483,647✔
445
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
446

447
  SSDataBlock* pBlock = pInfo->binfo.pRes;
2,147,483,647✔
448

449
  // set output datablock version
450
  pBlock->info.version = pTaskInfo->version;
2,147,483,647✔
451

452
  blockDataCleanup(pBlock);
2,147,483,647✔
453
  if (!hasRemainResultByHash(pOperator)) {
2,147,483,647✔
454
    return;
7,889,529✔
455
  }
456

457
  pBlock->info.id.groupId = 0;
2,147,483,647✔
458
  pBlock->info.id.baseGId = 0;
2,147,483,647✔
459
  if (!pInfo->binfo.mergeResultBlock) {
2,147,483,647✔
460
    doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
2,134,252,270✔
461
                             pHashmap, pOperator->resultInfo.threshold, false);
462
  } else {
463
    while (hasRemainResultByHash(pOperator)) {
34,988,440✔
464
      doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
17,502,693✔
465
                               pHashmap, pOperator->resultInfo.threshold, true);
466
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
17,502,695✔
467
        break;
16,576✔
468
      }
469
      pBlock->info.id.groupId = 0;
17,486,119✔
470
      pBlock->info.id.baseGId = 0;
17,486,119✔
471
    }
472

473
    // clear the group id info in SSDataBlock, since the client does not need it
474
    pBlock->info.id.groupId = 0;
17,502,695✔
475
    pBlock->info.id.baseGId = 0;
17,502,695✔
476
  }
477
}
478

479
static bool slimitReached(SLimitInfo* pLimitInfo) {
2,128,114,392✔
480
  if (pLimitInfo && pLimitInfo->slimit.limit >= 0 &&
2,128,114,392✔
481
      pLimitInfo->numOfOutputGroups >= pLimitInfo->slimit.limit) {
73,451✔
482
    return true;  // limit reached, stop processing further rows
24,251✔
483
  }
484
  return false;
2,128,090,141✔
485
}
486

487
static int32_t doGroupResultSlimit(SSDataBlock* pRes, SLimitInfo* pLimitInfo) {
2,147,483,647✔
488
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
489
  int32_t lino = 0;
2,147,483,647✔
490

491
  if (pRes == NULL || pRes->info.rows == 0 || !pLimitInfo) {
2,147,483,647✔
492
    return TSDB_CODE_SUCCESS;
7,998,123✔
493
  }
494

495
  if (pLimitInfo->remainGroupOffset > 0) {
2,147,483,647✔
496
    if (pRes->info.rows <= pLimitInfo->remainGroupOffset) {
65,075✔
497
      pLimitInfo->remainGroupOffset -= pRes->info.rows;
9,448✔
498
      blockDataCleanup(pRes);
9,448✔
499
      return TSDB_CODE_SUCCESS;
9,448✔
500
    } else {
501
      code = blockDataTrimFirstRows(pRes, pLimitInfo->remainGroupOffset);
55,627✔
502
      QUERY_CHECK_CODE(code, lino, _end);
55,627✔
503
      pLimitInfo->remainGroupOffset = 0;
55,627✔
504
    }
505
  }
506

507
  if (pLimitInfo->slimit.limit >= 0 && pRes->info.rows > 0) {
2,147,483,647✔
508
    int32_t remainRows = pLimitInfo->slimit.limit - pLimitInfo->numOfOutputGroups;
205,372✔
509
    if (pRes->info.rows > remainRows) {
205,372✔
510
      blockDataKeepFirstNRows(pRes, remainRows);
91,914✔
511
    }
512
    pLimitInfo->numOfOutputGroups += pRes->info.rows;
205,372✔
513
  }
514

515
_end:
2,147,483,647✔
516
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
517
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
518
  }
519
  return code;
2,147,483,647✔
520
}
521

522
static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) {
2,147,483,647✔
523
  int32_t               code = TSDB_CODE_SUCCESS;
2,147,483,647✔
524
  int32_t               lino = 0;
2,147,483,647✔
525
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
526
  SGroupbyOperatorInfo* pInfo = pOperator->info;
2,147,483,647✔
527
  SSDataBlock*          pRes = pInfo->binfo.pRes;
2,147,483,647✔
528
  SLimitInfo*           pLimitInfo = &pInfo->limitInfo;
2,147,483,647✔
529

530
  // after filter, if result block turn to null, get next from whole set
531
  while (1) {
532
    doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
2,147,483,647✔
533

534
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,147,483,647✔
535
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
536

537
    code = doGroupResultSlimit(pRes, pLimitInfo);
2,147,483,647✔
538
    QUERY_CHECK_CODE(code, lino, _end);
2,147,483,647✔
539

540
    if (!hasRemainResultByHash(pOperator) || slimitReached(pLimitInfo)) {
2,147,483,647✔
541
      setOperatorCompleted(pOperator);
31,561,842✔
542
      // clean hash after completed
543
      tSimpleHashCleanup(pInfo->aggSup.pResultRowHashTable);
31,562,633✔
544
      pInfo->aggSup.pResultRowHashTable = NULL;
31,563,109✔
545
      break;
31,561,221✔
546
    }
547

548
    if (pRes->info.rows > 0) {
2,128,089,758✔
549
      break;
2,128,088,449✔
550
    }
551
  }
552

553
_end:
2,147,483,647✔
554
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
556
    T_LONG_JMP(pTaskInfo->env, code);
×
557
  }
558
  return (pRes->info.rows == 0) ? NULL : pRes;
2,147,483,647✔
559
}
560

561
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,147,483,647✔
562
  int32_t               code = TSDB_CODE_SUCCESS;
2,147,483,647✔
563
  int32_t               lino = 0;
2,147,483,647✔
564
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
565
  SGroupbyOperatorInfo* pInfo = pOperator->info;
2,147,483,647✔
566
  SGroupResInfo*        pGroupResInfo = &pInfo->groupResInfo;
2,147,483,647✔
567
  int32_t               order = pInfo->binfo.inputTsOrder;
2,147,483,647✔
568

569
  QRY_PARAM_CHECK(ppRes);
2,147,483,647✔
570
  if (pOperator->status == OP_EXEC_DONE) {
2,147,483,647✔
571
    return code;
23,446,639✔
572
  }
573

574
  if (pOperator->status == OP_RES_TO_RETURN) {
2,147,483,647✔
575
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
2,127,993,591✔
576
    return code;
2,127,993,033✔
577
  }
578

579
  while (1) {
270,595,587✔
580
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
302,265,874✔
581
    if (pBlock == NULL) {
302,280,404✔
582
      break;
31,655,842✔
583
    }
584

585
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
270,624,562✔
586

587
    // the pDataBlock are always the same one, no need to call this again
588
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, order, pBlock->info.scanFlag, true);
270,631,409✔
589
    QUERY_CHECK_CODE(code, lino, _end);
270,624,724✔
590

591
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
592
    if (pInfo->scalarSup.pExprInfo != NULL) {
270,624,724✔
593
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
43,165,268✔
594
                                   pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
43,171,118✔
595
      QUERY_CHECK_CODE(code, lino, _end);
43,162,601✔
596
    }
597

598
    doHashGroupbyAgg(pOperator, pBlock);
270,597,198✔
599
  }
600

601
  pOperator->status = OP_RES_TO_RETURN;
31,655,842✔
602

603
  // initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
604
  if (pGroupResInfo->pRows != NULL) {
31,656,318✔
605
    taosArrayDestroy(pGroupResInfo->pRows);
×
606
  }
607

608
  if (pGroupResInfo->pBuf) {
31,655,842✔
UNCOV
609
    taosMemoryFree(pGroupResInfo->pBuf);
×
UNCOV
610
    pGroupResInfo->pBuf = NULL;
×
611
  }
612

613
  pGroupResInfo->index = 0;
31,655,842✔
614
  pGroupResInfo->iter = 0;
31,655,842✔
615
  pGroupResInfo->dataPos = NULL;
31,655,842✔
616

617
_end:
31,674,601✔
618
  if (code != TSDB_CODE_SUCCESS) {
31,674,601✔
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
19,285✔
620
    pTaskInfo->code = code;
19,285✔
621
    T_LONG_JMP(pTaskInfo->env, code);
19,285✔
622
  } else {
623
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
31,655,316✔
624
  }
625

626
  return code;
31,654,352✔
627
}
628

UNCOV
629
static int32_t resetGroupOperState(SOperatorInfo* pOper) {
×
UNCOV
630
  SGroupbyOperatorInfo* pInfo = pOper->info;
×
UNCOV
631
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
UNCOV
632
  SAggPhysiNode* pPhynode = (SAggPhysiNode*)pOper->pPhyNode;
×
UNCOV
633
  resetBasicOperatorState(&pInfo->binfo);
×
UNCOV
634
  pOper->status = OP_NOT_OPENED;
×
635

UNCOV
636
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
637
    false);
638

UNCOV
639
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
640

UNCOV
641
  qInfo("[group key] len use:%d", pInfo->groupKeyLen);
×
UNCOV
642
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pAggFuncs, pPhynode->pGroupKeys,
×
UNCOV
643
    pInfo->groupKeyLen + POINTER_BYTES, pTaskInfo->id.str, NULL,
×
644
    &pTaskInfo->storageAPI.functionStore);
645

UNCOV
646
  if (code == 0){
×
UNCOV
647
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
648
      &pTaskInfo->storageAPI.functionStore);
649
  }
650

UNCOV
651
  pInfo->isInit = false;
×
652

UNCOV
653
  return code;
×
654
}
655

656
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
31,775,046✔
657
                                SOperatorInfo** pOptrInfo) {
658
  QRY_PARAM_CHECK(pOptrInfo);
31,775,046✔
659

660
  int32_t               code = TSDB_CODE_SUCCESS;
31,778,848✔
661
  int32_t               lino = 0;
31,778,848✔
662
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
31,778,848✔
663
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
31,765,234✔
664
  if (pInfo == NULL || pOperator == NULL) {
31,770,259✔
UNCOV
665
    code = terrno;
×
UNCOV
666
    goto _error;
×
667
  }
668
  initOperatorCostInfo(pOperator);
31,774,541✔
669

670
  pOperator->pPhyNode = (SNode*)pAggNode;
31,780,033✔
671
  pOperator->exprSupp.hasWindowOrGroup = true;
31,780,559✔
672
  pOperator->exprSupp.hasWindow = false;
31,775,117✔
673

674
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
31,773,759✔
675
  if (pResBlock == NULL) {
31,779,555✔
UNCOV
676
    code = terrno;
×
UNCOV
677
    goto _error;
×
678
  }
679
  initBasicInfo(&pInfo->binfo, pResBlock);
31,779,555✔
680

681
  initLimitInfo(pAggNode->node.pLimit, pAggNode->node.pSlimit, &pInfo->limitInfo);
31,778,144✔
682

683
  pInfo->pGroupCols = NULL;
31,779,504✔
684
  code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
31,778,978✔
685
  QUERY_CHECK_CODE(code, lino, _error);
31,773,850✔
686

687
  int32_t    numOfScalarExpr = 0;
31,773,850✔
688
  SExprInfo* pScalarExprInfo = NULL;
31,776,574✔
689
  if (pAggNode->pExprs != NULL) {
31,777,692✔
690
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
15,102,544✔
691
    QUERY_CHECK_CODE(code, lino, _error);
15,100,771✔
692
  }
693

694
  code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
31,678,085✔
695
  QUERY_CHECK_CODE(code, lino, _error);
31,679,626✔
696

697
  initResultSizeInfo(&pOperator->resultInfo, 4096);
31,679,626✔
698
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
31,676,546✔
699
  QUERY_CHECK_CODE(code, lino, _error);
31,686,975✔
700

701
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
31,686,975✔
702
  QUERY_CHECK_CODE(code, lino, _error);
31,677,139✔
703

704
  int32_t    num = 0;
31,677,139✔
705
  SExprInfo* pExprInfo = NULL;
31,678,006✔
706

707
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
31,680,057✔
708
  QUERY_CHECK_CODE(code, lino, _error);
31,676,799✔
709

710
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
31,676,799✔
711
                    NULL, &pTaskInfo->storageAPI.functionStore);
712
  QUERY_CHECK_CODE(code, lino, _error);
31,675,380✔
713

714
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
31,679,226✔
715
                            pTaskInfo->pStreamRuntimeInfo);
31,675,380✔
716
  QUERY_CHECK_CODE(code, lino, _error);
31,673,763✔
717

718
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
31,673,763✔
719
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
31,674,501✔
720

721
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
31,681,811✔
722
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
31,668,939✔
723
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
31,674,907✔
724

725
  pInfo->pOperator = pOperator;
31,670,097✔
726

727
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo,
31,670,447✔
728
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
729
  setOperatorResetStateFn(pOperator, resetGroupOperState);
31,667,185✔
730
  code = appendDownstream(pOperator, &downstream, 1);
31,670,807✔
731
  QUERY_CHECK_CODE(code, lino, _error);
31,667,765✔
732

733
  *pOptrInfo = pOperator;
31,667,765✔
734
  return TSDB_CODE_SUCCESS;
31,665,245✔
735

736
_error:
91,466✔
737
  if (pInfo != NULL) destroyGroupOperatorInfo(pInfo);
91,466✔
738
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
91,466✔
739
  pTaskInfo->code = code;
91,466✔
740
  return code;
91,466✔
741
}
742

UNCOV
743
SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) {
×
UNCOV
744
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
745
  int32_t lino = 0;
×
UNCOV
746
  if (pDataBlock == NULL) {
×
UNCOV
747
    return NULL;
×
748
  }
749

UNCOV
750
  SSDataBlock* pDstBlock = NULL;
×
UNCOV
751
  code = createDataBlock(&pDstBlock);
×
UNCOV
752
  QUERY_CHECK_CODE(code, lino, _end);
×
753

UNCOV
754
  pDstBlock->info = pDataBlock->info;
×
UNCOV
755
  pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
×
UNCOV
756
  pDstBlock->info.capacity = 0;
×
UNCOV
757
  pDstBlock->info.rowSize = 0;
×
758

UNCOV
759
  size_t numOfCols = pOperator->exprSupp.numOfExprs;
×
UNCOV
760
  if (pDataBlock->pBlockAgg) {
×
UNCOV
761
    pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
×
UNCOV
762
    if (pDstBlock->pBlockAgg == NULL) {
×
UNCOV
763
      blockDataDestroy(pDstBlock);
×
UNCOV
764
      return NULL;
×
765
    }
UNCOV
766
    for (int i = 0; i < numOfCols; ++i) {
×
UNCOV
767
      pDstBlock->pBlockAgg[i].colId = -1;
×
768
    }
769
  }
770

UNCOV
771
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
×
UNCOV
772
    SExprInfo*       pExpr = &pOperator->exprSupp.pExprInfo[i];
×
UNCOV
773
    int32_t          slotId = pExpr->base.pParam[0].pCol->slotId;
×
UNCOV
774
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
×
UNCOV
775
    SColumnInfoData  colInfo = {.hasNull = true, .info = pSrc->info};
×
UNCOV
776
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
×
UNCOV
777
    QUERY_CHECK_CODE(code, lino, _end);
×
778

UNCOV
779
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
×
UNCOV
780
    if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) {
×
UNCOV
781
      pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
×
782
    } else {
UNCOV
783
      code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
×
UNCOV
784
      QUERY_CHECK_CODE(code, lino, _end);
×
785

UNCOV
786
      code = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
×
UNCOV
787
      QUERY_CHECK_CODE(code, lino, _end);
×
788
    }
789
  }
790

UNCOV
791
_end:
×
UNCOV
792
  if (code != TSDB_CODE_SUCCESS) {
×
793
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
794
    blockDataDestroy(pDstBlock);
×
795
    return NULL;
×
796
  }
797
  return pDstBlock;
×
798
}
799

800
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
22,148,738✔
801
  int32_t                 code = TSDB_CODE_SUCCESS;
22,148,738✔
802
  int32_t                 lino = 0;
22,148,738✔
803
  SPartitionOperatorInfo* pInfo = pOperator->info;
22,148,738✔
804
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
22,152,070✔
805

806
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,147,483,647✔
807
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2,147,483,647✔
808
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
2,147,483,647✔
809

810
    SDataGroupInfo* pGroupInfo = NULL;
2,147,483,647✔
811
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
2,147,483,647✔
812
    if (pPage == NULL) {
2,147,483,647✔
813
      T_LONG_JMP(pTaskInfo->env, terrno);
×
814
    }
815

816
    pGroupInfo->numOfRows += 1;
2,147,483,647✔
817

818
    // group id
819
    if (pGroupInfo->groupId == 0) {
2,147,483,647✔
820
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
85,266,764✔
821
    }
822

823
    if (pBlock->info.dataLoad) {
2,147,483,647✔
824
      // number of rows
825
      int32_t* rows = (int32_t*)pPage;
2,147,483,647✔
826

827
      size_t numOfCols = pOperator->exprSupp.numOfExprs;
2,147,483,647✔
828
      for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
829
        SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
2,147,483,647✔
830
        int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
2,147,483,647✔
831
        SColumnInfoData* pSrcColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
832
        SColumnInfoData* pDstColInfoData = taosArrayGet(pInfo->binfo.pRes->pDataBlock, i);
2,147,483,647✔
833
        QUERY_CHECK_NULL(pSrcColInfoData, code, lino, _end, terrno);
2,147,483,647✔
834
        QUERY_CHECK_NULL(pDstColInfoData, code, lino, _end, terrno);
2,147,483,647✔
835

836
        int32_t bytes = pDstColInfoData->info.bytes;
2,147,483,647✔
837
        int32_t startOffset = pInfo->columnOffset[i];
2,147,483,647✔
838
        int32_t reservedBytes = blockDataGetPagedColumnReservedBytes(pDstColInfoData);
2,147,483,647✔
839

840
        int32_t* columnLen = NULL;
2,147,483,647✔
841
        int32_t  contentLen = 0;
2,147,483,647✔
842

843
        if (IS_VAR_DATA_TYPE(pDstColInfoData->info.type)) {
2,147,483,647✔
844
          int32_t* offset = (int32_t*)((char*)pPage + startOffset);
2,147,483,647✔
845
          columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
2,147,483,647✔
846
          char* data = (char*)((char*)columnLen + sizeof(int32_t));
2,147,483,647✔
847

848
          if (colDataIsNull_s(pSrcColInfoData, j)) {
2,147,483,647✔
849
            offset[(*rows)] = -1;
2,147,483,647✔
850
            contentLen = 0;
2,147,483,647✔
851
          } else if (pSrcColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
852
            offset[*rows] = (*columnLen);
25,134✔
853
            char*   src = colDataGetData(pSrcColInfoData, j);
25,134✔
854
            int32_t dataLen = getJsonValueLen(src);
25,134✔
855

856
            memcpy(data + (*columnLen), src, dataLen);
25,536✔
857
            int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
25,536✔
858
            QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
25,134✔
859

860
            contentLen = dataLen;
25,134✔
861
          } else {
862
            if (IS_STR_DATA_BLOB(pSrcColInfoData->info.type)) {
2,147,483,647✔
863
              offset[*rows] = (*columnLen);
1,116✔
NEW
864
              char* src = colDataGetData(pSrcColInfoData, j);
×
UNCOV
865
              memcpy(data + (*columnLen), src, blobDataTLen(src));
×
866
              int32_t v = (data + (*columnLen) + blobDataTLen(src) - (char*)pPage);
×
UNCOV
867
              QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
868

UNCOV
869
              contentLen = blobDataTLen(src);
×
870
            } else {
871
              offset[*rows] = (*columnLen);
2,147,483,647✔
872
              char* src = colDataGetData(pSrcColInfoData, j);
2,147,483,647✔
873
              memcpy(data + (*columnLen), src, varDataTLen(src));
2,147,483,647✔
874
              int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
2,147,483,647✔
875
              QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2,147,483,647✔
876

877
              contentLen = varDataTLen(src);
2,147,483,647✔
878
            }
879
          }
880

881
          QUERY_CHECK_CONDITION((contentLen <= reservedBytes), code, lino, _end,
2,147,483,647✔
882
                                TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
883
          QUERY_CHECK_CONDITION(((*columnLen) + contentLen <= reservedBytes * pInfo->rowCapacity), code, lino, _end,
2,147,483,647✔
884
                                TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
885
        } else {
886
          char* bitmap = (char*)pPage + startOffset;
2,147,483,647✔
887
          columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
2,147,483,647✔
888
          char* data = (char*)columnLen + sizeof(int32_t);
2,147,483,647✔
889

890
          bool isNull = colDataIsNull_f(pSrcColInfoData, j);
2,147,483,647✔
891
          if (isNull) {
2,147,483,647✔
892
            colDataSetNull_f(bitmap, (*rows));
2,147,483,647✔
893
          } else {
894
            memcpy(data + (*columnLen), colDataGetData(pSrcColInfoData, j), bytes);
2,147,483,647✔
895
            QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
2,147,483,647✔
896
                                  lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
897
          }
898
          contentLen = bytes;
2,147,483,647✔
899
        }
900

901
        (*columnLen) += contentLen;
2,147,483,647✔
902
      }
903

904
      (*rows) += 1;
2,147,483,647✔
905

906
      setBufPageDirty(pPage, true);
2,147,483,647✔
907
      releaseBufPage(pInfo->pBuf, pPage);
2,147,483,647✔
908
    } else {
UNCOV
909
      SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock);
×
UNCOV
910
      if (dataNotLoadBlock == NULL) {
×
UNCOV
911
        T_LONG_JMP(pTaskInfo->env, terrno);
×
912
      }
UNCOV
913
      if (pGroupInfo->blockForNotLoaded == NULL) {
×
UNCOV
914
        pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
×
UNCOV
915
        QUERY_CHECK_NULL(pGroupInfo->blockForNotLoaded, code, lino, _end, terrno);
×
UNCOV
916
        pGroupInfo->offsetForNotLoaded = 0;
×
917
      }
UNCOV
918
      dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
×
UNCOV
919
      dataNotLoadBlock->info.dataLoad = 0;
×
UNCOV
920
      void* tmp = taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
×
UNCOV
921
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
922
      break;
×
923
    }
924
  }
925

926
_end:
24,340,696✔
927
  if (code != TSDB_CODE_SUCCESS) {
22,154,622✔
UNCOV
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
929
    T_LONG_JMP(pTaskInfo->env, code);
×
930
  }
931
}
22,154,622✔
932

933
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
2,147,483,647✔
934
  int32_t         code = TSDB_CODE_SUCCESS;
2,147,483,647✔
935
  int32_t         lino = 0;
2,147,483,647✔
936
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
2,147,483,647✔
937

938
  void* pPage = NULL;
2,147,483,647✔
939
  if (p == NULL) {  // it is a new group
2,147,483,647✔
940
    SDataGroupInfo gi = {0};
85,266,108✔
941
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
85,266,108✔
942
    QUERY_CHECK_NULL(gi.pPageList, code, lino, _end, terrno);
85,260,554✔
943

944
    code = taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
85,260,554✔
945
    if (code == TSDB_CODE_DUP_KEY) {
85,270,184✔
UNCOV
946
      code = TSDB_CODE_SUCCESS;
×
947
    }
948
    QUERY_CHECK_CODE(code, lino, _end);
85,270,184✔
949

950
    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
85,270,184✔
951

952
    int32_t pageId = 0;
85,271,384✔
953
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
85,271,384✔
954
    if (pPage == NULL) {
85,269,712✔
UNCOV
955
      return pPage;
×
956
    }
957

958
    void* tmp = taosArrayPush(p->pPageList, &pageId);
85,269,712✔
959
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
85,268,580✔
960

961
    *(int32_t*)pPage = 0;
85,268,580✔
962
  } else {
963
    int32_t* curId = taosArrayGetLast(p->pPageList);
2,147,483,647✔
964
    pPage = getBufPage(pInfo->pBuf, *curId);
2,147,483,647✔
965
    if (pPage == NULL) {
2,147,483,647✔
UNCOV
966
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
967
      return pPage;
×
968
    }
969

970
    int32_t* rows = (int32_t*)pPage;
2,147,483,647✔
971
    if (*rows >= pInfo->rowCapacity) {
2,147,483,647✔
972
      // release buffer
973
      releaseBufPage(pInfo->pBuf, pPage);
438,458,248✔
974

975
      // add a new page for current group
976
      int32_t pageId = 0;
438,462,912✔
977
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
438,462,436✔
978
      if (pPage == NULL) {
438,472,080✔
979
        qError("failed to get new buffer, code:%s", tstrerror(terrno));
×
980
        return NULL;
×
981
      }
982

983
      void* tmp = taosArrayPush(p->pPageList, &pageId);
438,472,080✔
984
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
438,475,864✔
985

986
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
438,475,864✔
987
    }
988
  }
989

990
  *pGroupInfo = p;
2,147,483,647✔
991

992
_end:
2,147,483,647✔
993
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
UNCOV
994
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
995
    return NULL;
×
996
  }
997

998
  return pPage;
2,147,483,647✔
999
}
1000

1001
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
3,320,820✔
1002
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,320,820✔
1003
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
3,320,134✔
1004
  if (!offset) {
3,320,416✔
UNCOV
1005
    return NULL;
×
1006
  }
1007

1008
  offset[0] = sizeof(int32_t) +
3,320,416✔
1009
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
1010

1011
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
10,883,333✔
1012
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
7,558,273✔
1013

1014
    int32_t payloadLen = blockDataGetPagedColumnReservedBytes(pColInfoData) * rowCapacity;
7,552,532✔
1015

1016
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
7,553,340✔
1017
      // offset segment + content length + payload
1018
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
1,682,935✔
1019
    } else {
1020
      // bitmap + content length + payload
1021
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
5,882,677✔
1022
    }
1023
  }
1024

1025
  return offset;
3,325,060✔
1026
}
1027

1028
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
3,186,396✔
1029
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
3,186,396✔
1030
  for (int32_t i = 0; i < size; i++) {
48,148,756✔
1031
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
44,961,160✔
1032
    if (pGp && pGp->blockForNotLoaded) {
44,961,958✔
UNCOV
1033
      for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
×
UNCOV
1034
        SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
×
UNCOV
1035
        if (pBlock) blockDataDestroy(*pBlock);
×
1036
      }
1037
      taosArrayClear(pGp->blockForNotLoaded);
×
UNCOV
1038
      pGp->offsetForNotLoaded = 0;
×
1039
    }
1040
    taosArrayDestroy(pGp->pPageList);
44,961,958✔
1041
  }
1042
  taosArrayClear(pInfo->sortedGroupArray);
3,187,596✔
1043
  clearDiskbasedBuf(pInfo->pBuf);
3,186,798✔
1044
}
3,186,798✔
1045

1046
static int compareDataGroupInfo(const void* group1, const void* group2) {
779,040,864✔
1047
  const SDataGroupInfo* pGroupInfo1 = group1;
779,040,864✔
1048
  const SDataGroupInfo* pGroupInfo2 = group2;
779,040,864✔
1049

1050
  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
779,040,864✔
1051
    return 0;
×
1052
  }
1053

1054
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
779,042,460✔
1055
}
1056

1057
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
45,836,658✔
1058
  if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
45,836,658✔
UNCOV
1059
    SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
×
UNCOV
1060
    if (!pBlock) {
×
UNCOV
1061
      return NULL;
×
1062
    }
UNCOV
1063
    pGroupInfo->offsetForNotLoaded++;
×
UNCOV
1064
    return *pBlock;
×
1065
  }
1066
  return NULL;
45,835,452✔
1067
}
1068

1069
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
486,600,456✔
1070
  int32_t                 code = TSDB_CODE_SUCCESS;
486,600,456✔
1071
  int32_t                 lino = 0;
486,600,456✔
1072
  SPartitionOperatorInfo* pInfo = pOperator->info;
486,600,456✔
1073
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
486,601,956✔
1074

1075
  if (pInfo->remainRows == 0) {
486,597,350✔
1076
    blockDataCleanup(pInfo->binfo.pRes);
460,842,996✔
1077
    SDataGroupInfo* pGroupInfo =
460,848,177✔
1078
        (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
460,849,184✔
1079
    if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
460,848,177✔
1080
      if (pGroupInfo != NULL) {
49,143,410✔
1081
        SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
45,838,406✔
1082
        if (ret != NULL) return ret;
45,836,402✔
1083
      }
1084
      // try next group data
1085
      if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
49,141,406✔
1086
        setOperatorCompleted(pOperator);
3,185,988✔
1087
        clearPartitionOperator(pInfo);
3,186,798✔
1088
        return NULL;
3,186,396✔
1089
      }
1090
      ++pInfo->groupIndex;
45,953,789✔
1091

1092
      pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
45,953,783✔
1093
      if (pGroupInfo == NULL) {
45,952,515✔
1094
        qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
1095
        T_LONG_JMP(pTaskInfo->env, terrno);
×
1096
      }
1097
      pInfo->pageIndex = 0;
45,952,515✔
1098
    }
1099

1100
    int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
457,656,492✔
1101
    if (pageId == NULL) {
457,653,704✔
UNCOV
1102
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
UNCOV
1103
      T_LONG_JMP(pTaskInfo->env, terrno);
×
1104
    }
1105
    void* page = getBufPage(pInfo->pBuf, *pageId);
457,653,704✔
1106
    if (page == NULL) {
457,656,902✔
UNCOV
1107
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
1108
      T_LONG_JMP(pTaskInfo->env, terrno);
×
1109
    }
1110
    if (*(int32_t*)page == 0) {
457,656,902✔
UNCOV
1111
      releaseBufPage(pInfo->pBuf, page);
×
UNCOV
1112
      SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
×
UNCOV
1113
      if (ret != NULL) return ret;
×
UNCOV
1114
      if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
×
UNCOV
1115
        pInfo->groupIndex++;
×
1116
        pInfo->pageIndex = 0;
×
1117
      } else {
1118
        setOperatorCompleted(pOperator);
×
UNCOV
1119
        clearPartitionOperator(pInfo);
×
1120
        return NULL;
×
1121
      }
UNCOV
1122
      return buildPartitionResult(pOperator);
×
1123
    }
1124

1125
    code = blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
457,569,683✔
1126
    QUERY_CHECK_CODE(code, lino, _end);
457,658,732✔
1127

1128
    code = blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
457,658,732✔
1129
    QUERY_CHECK_CODE(code, lino, _end);
457,657,858✔
1130

1131
    pInfo->pageIndex += 1;
457,657,858✔
1132
    releaseBufPage(pInfo->pBuf, page);
457,657,858✔
1133
    pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
457,654,030✔
1134
    pInfo->binfo.pRes->info.dataLoad = 1;
457,654,272✔
1135
    pInfo->orderedRows = 0;
457,655,078✔
1136
  } else if (pInfo->pOrderInfoArr == NULL) {
25,757,934✔
UNCOV
1137
    qError("Exception, remainRows not zero, but pOrderInfoArr is NULL");
×
1138
  }
1139

1140
  if (pInfo->pOrderInfoArr) {
483,411,806✔
1141
    pInfo->binfo.pRes->info.rows += pInfo->remainRows;
77,241,456✔
1142
    code = blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows);
77,241,528✔
1143
    QUERY_CHECK_CODE(code, lino, _end);
77,238,124✔
1144
    pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr);
77,238,124✔
1145
    pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows;
77,247,716✔
1146
    pInfo->binfo.pRes->info.rows = pInfo->orderedRows;
77,246,216✔
1147
  }
1148

1149
  code = blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
483,413,328✔
1150
  QUERY_CHECK_CODE(code, lino, _end);
483,406,410✔
1151

1152
_end:
483,406,410✔
1153
  if (code != TSDB_CODE_SUCCESS) {
483,406,410✔
UNCOV
1154
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1155
    T_LONG_JMP(pTaskInfo->env, code);
×
1156
  }
1157

1158
  return pInfo->binfo.pRes;
483,406,410✔
1159
}
1160

1161
static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
486,626,112✔
1162
  if (pOperator->status == OP_EXEC_DONE) {
486,626,112✔
1163
    (*ppRes) = NULL;
15,663✔
1164
    return TSDB_CODE_SUCCESS;
15,663✔
1165
  }
1166

1167
  int32_t                 code = TSDB_CODE_SUCCESS;
486,620,553✔
1168
  int32_t                 lino = 0;
486,620,553✔
1169
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
486,620,553✔
1170
  SPartitionOperatorInfo* pInfo = pOperator->info;
486,621,020✔
1171
  SSDataBlock*            pRes = pInfo->binfo.pRes;
486,607,534✔
1172

1173
  if (pOperator->status == OP_RES_TO_RETURN) {
486,610,705✔
1174
    (*ppRes) = buildPartitionResult(pOperator);
483,299,365✔
1175
    return code;
483,298,237✔
1176
  }
1177

1178
  while (1) {
22,155,174✔
1179
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
25,477,196✔
1180
    if (pBlock == NULL) {
25,454,985✔
1181
      break;
3,305,925✔
1182
    }
1183

1184
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
22,149,060✔
1185
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1186
    if (pInfo->scalarSup.pExprInfo != NULL) {
22,154,698✔
1187
      code =
1188
          projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
468,788✔
1189
                                pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
468,788✔
1190
      QUERY_CHECK_CODE(code, lino, _end);
468,788✔
1191
    }
1192

1193
    terrno = TSDB_CODE_SUCCESS;
22,152,640✔
1194
    doHashPartition(pOperator, pBlock);
22,153,338✔
1195
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
22,154,624✔
UNCOV
1196
      code = terrno;
×
UNCOV
1197
      QUERY_CHECK_CODE(code, lino, _end);
×
1198
    }
1199
  }
1200

1201
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
3,305,925✔
1202
  QUERY_CHECK_NULL(groupArray, code, lino, _end, terrno);
3,305,925✔
1203

1204
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
3,305,925✔
1205
  while (pGroupIter != NULL) {
88,575,719✔
1206
    SDataGroupInfo* pGroupInfo = pGroupIter;
85,269,392✔
1207
    void*           tmp = taosArrayPush(groupArray, pGroupInfo);
85,269,392✔
1208
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
85,269,392✔
1209
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
85,269,392✔
1210
  }
1211

1212
  taosArraySort(groupArray, compareDataGroupInfo);
3,306,327✔
1213
  pInfo->sortedGroupArray = groupArray;
3,304,731✔
1214
  pInfo->groupIndex = -1;
3,304,731✔
1215
  taosHashClear(pInfo->pGroupSet);
3,305,529✔
1216

1217
  pOperator->status = OP_RES_TO_RETURN;
3,304,725✔
1218
  code = blockDataEnsureCapacity(pRes, 4096);
3,304,725✔
1219
  QUERY_CHECK_CODE(code, lino, _end);
3,306,327✔
1220

1221
_end:
3,306,327✔
1222
  if (code != TSDB_CODE_SUCCESS) {
3,306,327✔
UNCOV
1223
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1224
    pTaskInfo->code = code;
×
UNCOV
1225
    T_LONG_JMP(pTaskInfo->env, code);
×
1226
  }
1227

1228
  (*ppRes) = buildPartitionResult(pOperator);
3,306,327✔
1229
  return code;
3,303,927✔
1230
}
1231

1232
static void destroyPartitionOperatorInfo(void* param) {
3,344,897✔
1233
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
3,344,897✔
1234
  cleanupBasicInfo(&pInfo->binfo);
3,344,897✔
1235
  taosArrayDestroy(pInfo->pGroupCols);
3,344,499✔
1236

1237
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
7,542,012✔
1238
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
4,197,513✔
1239
    taosMemoryFree(key.pData);
4,197,513✔
1240
  }
1241

1242
  taosArrayDestroy(pInfo->pGroupColVals);
3,344,897✔
1243
  taosMemoryFree(pInfo->keyBuf);
3,343,146✔
1244

1245
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
3,343,944✔
1246
  for (int32_t i = 0; i < size; i++) {
43,653,129✔
1247
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
40,308,628✔
1248
    if (pGp) {
40,308,628✔
1249
      taosArrayDestroy(pGp->pPageList);
40,308,628✔
1250
    }
1251
  }
1252
  taosArrayDestroy(pInfo->sortedGroupArray);
3,344,501✔
1253

1254
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
3,343,950✔
1255
  while (pGroupIter != NULL) {
3,344,099✔
UNCOV
1256
    SDataGroupInfo* pGroupInfo = pGroupIter;
×
UNCOV
1257
    taosArrayDestroy(pGroupInfo->pPageList);
×
UNCOV
1258
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
×
1259
  }
1260

1261
  taosHashCleanup(pInfo->pGroupSet);
3,344,099✔
1262
  taosMemoryFree(pInfo->columnOffset);
3,343,947✔
1263

1264
  cleanupExprSupp(&pInfo->scalarSup);
3,343,396✔
1265
  destroyDiskbasedBuf(pInfo->pBuf);
3,344,897✔
1266
  taosArrayDestroy(pInfo->pOrderInfoArr);
3,343,542✔
1267
  taosMemoryFreeClear(param);
3,343,542✔
1268
}
3,343,941✔
1269

UNCOV
1270
static int32_t resetPartitionOperState(SOperatorInfo* pOper) {
×
UNCOV
1271
  SPartitionOperatorInfo* pInfo = pOper->info;
×
UNCOV
1272
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
UNCOV
1273
  SPartitionPhysiNode* pPhynode = (SPartitionPhysiNode*)pOper->pPhyNode;
×
UNCOV
1274
  resetBasicOperatorState(&pInfo->binfo);
×
1275

UNCOV
1276
  int32_t code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
1277
    &pTaskInfo->storageAPI.functionStore);
1278

UNCOV
1279
  clearPartitionOperator(pInfo);
×
1280

1281
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
×
1282
  while (pGroupIter != NULL) {
×
UNCOV
1283
    SDataGroupInfo* pGroupInfo = pGroupIter;
×
UNCOV
1284
    taosArrayDestroy(pGroupInfo->pPageList);
×
UNCOV
1285
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
×
1286
  }
UNCOV
1287
  taosHashClear(pInfo->pGroupSet);
×
1288

UNCOV
1289
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
×
UNCOV
1290
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
1291
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
×
UNCOV
1292
    if (pGp) {
×
UNCOV
1293
      taosArrayDestroy(pGp->pPageList);
×
1294
    }
1295
  }
UNCOV
1296
  taosArrayDestroy(pInfo->sortedGroupArray);
×
UNCOV
1297
  pInfo->sortedGroupArray = NULL;
×
1298

UNCOV
1299
  pInfo->groupIndex = 0;
×
UNCOV
1300
  pInfo->pageIndex = 0;
×
UNCOV
1301
  pInfo->remainRows = 0;
×
UNCOV
1302
  pInfo->orderedRows = 0;
×
UNCOV
1303
  return 0;
×
1304
}
1305

1306
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
3,342,509✔
1307
                                           SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1308
  QRY_PARAM_CHECK(pOptrInfo);
3,342,509✔
1309

1310
  int32_t                 code = TSDB_CODE_SUCCESS;
3,343,709✔
1311
  int32_t                 lino = 0;
3,343,709✔
1312
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
3,343,709✔
1313
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,337,392✔
1314
  if (pInfo == NULL || pOperator == NULL) {
3,340,199✔
1315
    pTaskInfo->code = code = terrno;
×
UNCOV
1316
    goto _error;
×
1317
  }
1318
  initOperatorCostInfo(pOperator);
3,341,399✔
1319

1320
  pOperator->pPhyNode = pPartNode;
3,341,699✔
1321
  int32_t    numOfCols = 0;
3,342,503✔
1322
  SExprInfo* pExprInfo = NULL;
3,342,899✔
1323
  code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
3,341,699✔
1324
  QUERY_CHECK_CODE(code, lino, _error);
3,343,003✔
1325
  pOperator->exprSupp.numOfExprs = numOfCols;
3,343,003✔
1326
  pOperator->exprSupp.pExprInfo = pExprInfo;
3,342,297✔
1327

1328
  pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
3,342,452✔
1329

1330
  if (pPartNode->needBlockOutputTsOrder) {
3,336,993✔
1331
    SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
278,337✔
1332
    pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
278,337✔
1333
    if (!pInfo->pOrderInfoArr) {
278,337✔
UNCOV
1334
      pTaskInfo->code = terrno;
×
UNCOV
1335
      goto _error;
×
1336
    }
1337

1338
    void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
278,337✔
1339
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
278,337✔
1340
  }
1341

1342
  if (pPartNode->pExprs != NULL) {
3,340,884✔
1343
    int32_t    num = 0;
137,334✔
1344
    SExprInfo* pExprInfo1 = NULL;
137,334✔
1345
    code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num);
137,334✔
1346
    QUERY_CHECK_CODE(code, lino, _error);
137,334✔
1347

1348
    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
118,049✔
1349
    QUERY_CHECK_CODE(code, lino, _error);
118,049✔
1350
  }
1351

1352
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
3,323,210✔
1353
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
3,321,993✔
1354
  if (pInfo->pGroupSet == NULL) {
3,324,808✔
UNCOV
1355
    goto _error;
×
1356
  }
1357

1358
  uint32_t defaultPgsz = 0;
3,324,410✔
1359
  int64_t  defaultBufsz = 0;
3,323,859✔
1360

1361
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
3,324,812✔
1362
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
3,325,612✔
1363
  code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
3,324,410✔
1364
  if (code != TSDB_CODE_SUCCESS) {
3,322,671✔
UNCOV
1365
    goto _error;
×
1366
  }
1367

1368
  if (!osTempSpaceAvailable()) {
3,322,671✔
UNCOV
1369
    terrno = TSDB_CODE_NO_DISKSPACE;
×
UNCOV
1370
    qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
UNCOV
1371
    goto _error;
×
1372
  }
1373

1374
  code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
3,323,612✔
1375
  if (code != TSDB_CODE_SUCCESS) {
3,318,285✔
UNCOV
1376
    goto _error;
×
1377
  }
1378

1379
  pInfo->rowCapacity =
3,320,295✔
1380
      getPartitionPageRowCapacity(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
3,316,804✔
1381
                                  blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
3,318,285✔
1382
  if (pInfo->rowCapacity < 0) {
3,319,598✔
UNCOV
1383
    code = terrno;
×
UNCOV
1384
    goto _error;
×
1385
  }
1386

1387
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
3,317,502✔
1388
  QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
3,320,623✔
1389

1390
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
3,316,143✔
1391
  if (code != TSDB_CODE_SUCCESS) {
3,314,253✔
1392
    goto _error;
×
1393
  }
1394

1395
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
3,314,253✔
1396
                  pTaskInfo);
1397

1398
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo,
3,319,863✔
1399
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1400

1401
  setOperatorResetStateFn(pOperator, resetPartitionOperState);
3,318,891✔
1402
  code = appendDownstream(pOperator, &downstream, 1);
3,317,166✔
1403
  if (code != TSDB_CODE_SUCCESS) {
3,311,913✔
UNCOV
1404
    goto _error;
×
1405
  }
1406

1407
  *pOptrInfo = pOperator;
3,311,913✔
1408
  return TSDB_CODE_SUCCESS;
3,311,511✔
1409

1410
_error:
19,285✔
1411
  if (pInfo != NULL) {
19,285✔
1412
    destroyPartitionOperatorInfo(pInfo);
19,285✔
1413
  }
1414
  pTaskInfo->code = code;
19,285✔
1415
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
19,285✔
1416
  TAOS_RETURN(code);
19,285✔
1417
}
1418

1419
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
2,147,483,647✔
1420
                                int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
1421
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2,147,483,647✔
1422
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
2,147,483,647✔
1423
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,147,483,647✔
1424

1425
  SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
2,147,483,647✔
1426
                                                  false, pAggSup, false);
1427
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
1428
    return pTaskInfo->code;
225,807✔
1429
  }
1430

1431
  return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
2,147,483,647✔
1432
}
1433

UNCOV
1434
void freePartItem(void* ptr) {
×
UNCOV
1435
  SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
×
UNCOV
1436
  taosArrayDestroy(pPart->rowIds);
×
UNCOV
1437
}
×
1438

1439
int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
31,775,182✔
1440
  int32_t code = TSDB_CODE_SUCCESS;
31,775,182✔
1441
  int32_t lino = 0;
31,775,182✔
1442
  size_t  numOfCols = LIST_LENGTH(pNodeList);
31,775,182✔
1443
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
31,777,054✔
1444
  if (pList == NULL) {
31,773,424✔
UNCOV
1445
    code = terrno;
×
UNCOV
1446
    (*pArrayRes) = NULL;
×
UNCOV
1447
    QUERY_CHECK_CODE(code, lino, _end);
×
1448
  }
1449

1450
  for (int32_t i = 0; i < numOfCols; ++i) {
87,795,383✔
1451
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
56,028,111✔
1452
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
56,031,667✔
1453

1454
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
56,031,667✔
1455
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
56,031,565✔
1456

1457
      SColumn c = extractColumnFromColumnNode(pColNode);
56,031,742✔
1458
      void*   tmp = taosArrayPush(pList, &c);
56,030,476✔
1459
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
56,030,476✔
UNCOV
1460
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
×
1461
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
×
UNCOV
1462
      SColumn     c = {0};
×
UNCOV
1463
      c.slotId = pNode->slotId;
×
UNCOV
1464
      c.colId = pNode->slotId;
×
UNCOV
1465
      c.type = pValNode->node.type;
×
UNCOV
1466
      c.bytes = pValNode->node.resType.bytes;
×
UNCOV
1467
      c.scale = pValNode->node.resType.scale;
×
UNCOV
1468
      c.precision = pValNode->node.resType.precision;
×
1469

UNCOV
1470
      void* tmp = taosArrayPush(pList, &c);
×
UNCOV
1471
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1472
    }
1473
  }
1474

1475
  (*pArrayRes) = pList;
31,767,272✔
1476

1477
_end:
31,778,796✔
1478
  if (code != TSDB_CODE_SUCCESS) {
31,778,796✔
UNCOV
1479
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1480
  }
1481
  return code;
31,773,776✔
1482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc