• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4523

17 Jul 2025 02:02AM UTC coverage: 56.768% (+0.3%) from 56.447%
#4523

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

140094 of 313745 branches covered (44.65%)

Branch coverage included in aggregate %.

212455 of 307292 relevant lines covered (69.14%)

18276193.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.26
/source/libs/executor/src/groupoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "query.h"
20
#include "tname.h"
21
#include "tutil.h"
22

23
#include "tdatablock.h"
24
#include "tmsg.h"
25

26
#include "executorInt.h"
27
#include "operator.h"
28
#include "querytask.h"
29
#include "tcompare.h"
30
#include "thash.h"
31
#include "ttypes.h"
32

33
typedef struct SGroupbyOperatorInfo {
34
  SOptrBasicInfo binfo;
35
  SAggSupporter  aggSup;
36
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
37
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
38
  bool           isInit;         // denote if current val is initialized or not
39
  char*          keyBuf;         // group by keys for hash
40
  int32_t        groupKeyLen;    // total group by column width
41
  SGroupResInfo  groupResInfo;
42
  SExprSupp      scalarSup;
43
  SOperatorInfo  *pOperator;
44
} SGroupbyOperatorInfo;
45

46
// The sort in partition may be needed later.
47
typedef struct SPartitionOperatorInfo {
48
  SOptrBasicInfo binfo;
49
  SArray*        pGroupCols;
50
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
51
  char*          keyBuf;         // group by keys for hash
52
  int32_t        groupKeyLen;    // total group by column width
53
  SHashObj*      pGroupSet;      // quick locate the window object for each result
54

55
  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
56
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
57
  int32_t*       columnOffset;      // start position for each column data
58
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
59
  int32_t        groupIndex;        // group index
60
  int32_t        pageIndex;         // page index of current group
61
  SExprSupp      scalarSup;
62

63
  int32_t remainRows;
64
  int32_t orderedRows;
65
  SArray* pOrderInfoArr;
66
} SPartitionOperatorInfo;
67

68
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
69
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
70
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
71
                                        int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
72
static int32_t  extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes);
73

74
static void freeGroupKey(void* param) {
683,562✔
75
  SGroupKeys* pKey = (SGroupKeys*)param;
683,562✔
76
  taosMemoryFree(pKey->pData);
683,562!
77
}
683,660✔
78

79
static void destroyGroupOperatorInfo(void* param) {
413,676✔
80
  if (param == NULL) {
413,676!
81
    return;
×
82
  }
83
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
413,676✔
84

85
  cleanupBasicInfo(&pInfo->binfo);
413,676✔
86
  taosMemoryFreeClear(pInfo->keyBuf);
413,765!
87
  taosArrayDestroy(pInfo->pGroupCols);
413,761✔
88
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
413,758✔
89
  cleanupExprSupp(&pInfo->scalarSup);
413,756✔
90

91
  if (pInfo->pOperator != NULL) {
413,704!
92
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
413,704✔
93
                      false);
94
    pInfo->pOperator = NULL;
413,686✔
95
  }
96

97
  cleanupGroupResInfo(&pInfo->groupResInfo);
413,686✔
98
  cleanupAggSup(&pInfo->aggSup);
413,695✔
99
  taosMemoryFreeClear(param);
413,756!
100
}
101

102
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
431,362✔
103
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
431,362✔
104
  if ((*pGroupColVals) == NULL) {
431,586!
105
    return terrno;
×
106
  }
107

108
  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
431,586✔
109
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
1,139,177✔
110
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
707,558✔
111
    if (!pCol) {
707,116!
112
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
113
      return terrno;
×
114
    }
115
    (*keyLen) += pCol->bytes;  // actual data + null_flag
707,116✔
116

117
    SGroupKeys key = {0};
707,116✔
118
    key.bytes = pCol->bytes;
707,116✔
119
    key.type = pCol->type;
707,116✔
120
    key.isNull = false;
707,116✔
121
    key.pData = taosMemoryCalloc(1, pCol->bytes);
707,116✔
122
    if (key.pData == NULL) {
707,844!
123
      return terrno;
×
124
    }
125

126
    void* tmp = taosArrayPush((*pGroupColVals), &key);
707,844✔
127
    if (!tmp) {
707,754!
128
      return terrno;
×
129
    }
130
  }
131

132
  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
431,619✔
133
  (*keyLen) += nullFlagSize;
431,619✔
134

135
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
431,619✔
136
  if ((*keyBuf) == NULL) {
431,578!
137
    return terrno;
×
138
  }
139

140
  return TSDB_CODE_SUCCESS;
431,578✔
141
}
142

143
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
47,055,191✔
144
                            int32_t numOfGroupCols) {
145
  SColumnDataAgg* pColAgg = NULL;
47,055,191✔
146
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
72,294,535✔
147
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
50,419,818✔
148
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
50,392,272✔
149
    if (pBlock->pBlockAgg != NULL) {
50,360,617!
150
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
151
    }
152

153
    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
50,360,617✔
154

155
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
50,360,617✔
156
    if (pkey->isNull && isNull) {
50,325,215✔
157
      continue;
3,188,698✔
158
    }
159

160
    if (isNull || pkey->isNull) {
47,136,517✔
161
      return false;
1,333,404✔
162
    }
163

164
    char* val = colDataGetData(pColInfoData, rowIndex);
45,803,113!
165

166
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
45,803,113✔
167
      int32_t dataLen = getJsonValueLen(val);
28✔
168

169
      if (memcmp(pkey->pData, val, dataLen) == 0) {
28✔
170
        continue;
4✔
171
      } else {
172
        return false;
24✔
173
      }
174
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
45,803,085!
175
      int32_t len = varDataLen(val);
26,125,148✔
176
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
26,125,148✔
177
        continue;
19,764,066✔
178
      } else {
179
        return false;
6,361,082✔
180
      }
181
    } else {
182
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
19,677,937✔
183
        return false;
17,391,361✔
184
      }
185
    }
186
  }
187

188
  return true;
21,874,717✔
189
}
190

191
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
84,961,822✔
192
  SColumnDataAgg* pColAgg = NULL;
84,961,822✔
193

194
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
84,961,822✔
195

196
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
243,537,828✔
197
    SColumn*         pCol = (SColumn*)taosArrayGet(pGroupCols, i);
159,849,415✔
198
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
159,589,786✔
199

200
    // valid range check. todo: return error code.
201
    if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
159,066,306!
202
      continue;
×
203
    }
204

205
    if (pBlock->pBlockAgg != NULL) {
159,048,341!
206
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
207
    }
208

209
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
159,048,341✔
210
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
317,386,386✔
211
      pkey->isNull = true;
17,331,916✔
212
    } else {
213
      pkey->isNull = false;
141,361,277✔
214
      char* val = colDataGetData(pColInfoData, rowIndex);
141,361,277!
215
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
141,361,277✔
216
        if (tTagIsJson(val)) {
184✔
217
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
4✔
218
          return;
4✔
219
        }
220
        int32_t dataLen = getJsonValueLen(val);
180✔
221
        memcpy(pkey->pData, val, dataLen);
180✔
222
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
141,361,093!
223
        memcpy(pkey->pData, val, varDataTLen(val));
86,714,030✔
224
      } else {
225
        memcpy(pkey->pData, val, pkey->bytes);
54,647,063✔
226
      }
227
    }
228
  }
229
}
230

231
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
84,285,438✔
232
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
84,285,438✔
233

234
  char* isNull = (char*)pKey;
84,293,987✔
235
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
84,293,987✔
236
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
244,841,725✔
237
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
160,558,620✔
238
    if (pkey->isNull) {
160,547,738✔
239
      isNull[i] = 1;
17,512,093✔
240
      continue;
17,512,093✔
241
    }
242

243
    isNull[i] = 0;
143,035,645✔
244
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
143,035,645✔
245
      int32_t dataLen = getJsonValueLen(pkey->pData);
180✔
246
      memcpy(pStart, (pkey->pData), dataLen);
180✔
247
      pStart += dataLen;
180✔
248
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
143,035,465!
249
      varDataCopy(pStart, pkey->pData);
87,667,221✔
250
      pStart += varDataTLen(pkey->pData);
87,667,221✔
251
    } else {
252
      memcpy(pStart, pkey->pData, pkey->bytes);
55,368,244✔
253
      pStart += pkey->bytes;
55,368,244✔
254
    }
255
  }
256

257
  return (int32_t)(pStart - (char*)pKey);
84,283,105✔
258
}
259

260
// assign the group keys or user input constant values if required
261
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
25,756,176✔
262
  for (int32_t i = 0; i < numOfOutput; ++i) {
122,208,323✔
263
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
96,452,147✔
264
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
60,461,904✔
265

266
      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
60,461,904✔
267
      // todo OPT all/all not NULL
268
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
120,923,808✔
269
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
48,275,872✔
270
        char* data = colDataGetData(pColInfoData, rowIndex);
48,275,872!
271

272
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
48,275,872✔
273
          int32_t dataLen = getJsonValueLen(data);
52✔
274
          memcpy(dest, data, dataLen);
52✔
275
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
48,275,820!
276
          varDataCopy(dest, data);
11,413,806✔
277
        } else {
278
          memcpy(dest, data, pColInfoData->info.bytes);
36,862,014✔
279
        }
280
      } else {  // it is a NULL value
281
        pEntryInfo->isNullRes = 1;
12,186,032✔
282
      }
283

284
      pEntryInfo->numOfRes = 1;
60,461,904✔
285
    }
286
  }
287
}
25,756,176✔
288

289
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
4,156,534✔
290
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
4,156,534✔
291
  SGroupbyOperatorInfo* pInfo = pOperator->info;
4,156,534✔
292

293
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
4,156,534✔
294
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
4,156,534✔
295
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
296
  //  qError("QInfo:0x%" PRIx64 ", group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
297
  //    return;
298
  //  }
299

300
  int32_t len = 0;
4,157,007✔
301
  terrno = TSDB_CODE_SUCCESS;
4,157,007✔
302

303
  int32_t num = 0;
4,163,469✔
304
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
51,522,400✔
305
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
306
    if (!pInfo->isInit) {
47,366,044✔
307
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
352,413✔
308
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
352,320✔
309
        T_LONG_JMP(pTaskInfo->env, terrno);
4!
310
      }
311
      pInfo->isInit = true;
352,310✔
312
      num++;
352,310✔
313
      continue;
352,310✔
314
    }
315

316
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
47,013,631✔
317
    if (equal) {
47,052,189✔
318
      num++;
21,883,245✔
319
      continue;
21,883,245✔
320
    }
321

322
    // The first row of a new block does not belongs to the previous existed group
323
    if (j == 0) {
25,168,944✔
324
      num++;
3,675,167✔
325
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
3,675,167✔
326
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
3,674,653!
327
        T_LONG_JMP(pTaskInfo->env, terrno);
×
328
      }
329
      continue;
3,674,547✔
330
    }
331

332
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
21,493,777✔
333
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
21,484,641✔
334
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
335
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
21,545,389✔
336
      T_LONG_JMP(pTaskInfo->env, ret);
16!
337
    }
338

339
    int32_t rowIndex = j - num;
21,545,373✔
340
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
21,545,373✔
341
                                                   pOperator->exprSupp.numOfExprs);
342
    if (ret != TSDB_CODE_SUCCESS) {
21,607,250!
343
      T_LONG_JMP(pTaskInfo->env, ret);
×
344
    }
345

346
    // assign the group keys or user input constant values if required
347
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
21,607,250✔
348
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
21,679,044✔
349
    num = 1;
21,448,829✔
350
  }
351

352
  if (num > 0) {
4,156,356!
353
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
4,156,483✔
354
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
4,156,652✔
355
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
356
    if (ret != TSDB_CODE_SUCCESS) {
4,157,072✔
357
      T_LONG_JMP(pTaskInfo->env, ret);
2!
358
    }
359

360
    int32_t rowIndex = pBlock->info.rows - num;
4,157,070✔
361
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
4,157,070✔
362
                                          pOperator->exprSupp.numOfExprs);
363
    if (ret != TSDB_CODE_SUCCESS) {
4,156,950!
364
      T_LONG_JMP(pTaskInfo->env, ret);
×
365
    }
366
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
4,156,950✔
367
  }
368
}
4,156,792✔
369

370
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
13,905,770✔
371
  SGroupbyOperatorInfo* pInfo = pOperator->info;
13,905,770✔
372
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
13,905,770✔
373
  return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap);
13,905,770✔
374
}
375

376
void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
6,672,703✔
377
                                  SDiskbasedBuf* pBuf) {
378
  SGroupbyOperatorInfo* pInfo = pOperator->info;
6,672,703✔
379
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
6,672,703✔
380
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
6,672,703✔
381

382
  SSDataBlock* pBlock = pInfo->binfo.pRes;
6,672,703✔
383

384
  // set output datablock version
385
  pBlock->info.version = pTaskInfo->version;
6,672,703✔
386

387
  blockDataCleanup(pBlock);
6,672,703✔
388
  if (!hasRemainResultByHash(pOperator)) {
6,671,126✔
389
    return;
61,118✔
390
  }
391

392
  pBlock->info.id.groupId = 0;
6,610,896✔
393
  if (!pInfo->binfo.mergeResultBlock) {
6,610,896✔
394
    doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
6,313,920✔
395
                             pHashmap, pOperator->resultInfo.threshold, false);
396
  } else {
397
    while (hasRemainResultByHash(pOperator)) {
593,745✔
398
      doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
296,889✔
399
                               pHashmap, pOperator->resultInfo.threshold, true);
400
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
296,889✔
401
        break;
120✔
402
      }
403
      pBlock->info.id.groupId = 0;
296,769✔
404
    }
405

406
    // clear the group id info in SSDataBlock, since the client does not need it
407
    pBlock->info.id.groupId = 0;
296,928✔
408
  }
409
}
410

411
static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) {
6,674,517✔
412
  int32_t               code = TSDB_CODE_SUCCESS;
6,674,517✔
413
  int32_t               lino = 0;
6,674,517✔
414
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
6,674,517✔
415
  SGroupbyOperatorInfo* pInfo = pOperator->info;
6,674,517✔
416
  SSDataBlock*          pRes = pInfo->binfo.pRes;
6,674,517✔
417

418
  // after filter, if result block turn to null, get next from whole set
419
  while (1) {
420
    doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
6,674,517✔
421

422
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
6,652,013✔
423
    QUERY_CHECK_CODE(code, lino, _end);
6,650,766!
424

425
    if (!hasRemainResultByHash(pOperator)) {
6,650,766✔
426
      setOperatorCompleted(pOperator);
411,597✔
427
      // clean hash after completed
428
      tSimpleHashCleanup(pInfo->aggSup.pResultRowHashTable);
413,354✔
429
      pInfo->aggSup.pResultRowHashTable = NULL;
413,372✔
430
      break;
413,372✔
431
    }
432
    if (pRes->info.rows > 0) {
6,240,856!
433
      break;
6,240,856✔
434
    }
435
  }
436

437
  pOperator->resultInfo.totalRows += pRes->info.rows;
6,654,228✔
438

439
_end:
6,654,228✔
440
  if (code != TSDB_CODE_SUCCESS) {
6,654,228!
441
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
442
    T_LONG_JMP(pTaskInfo->env, code);
×
443
  }
444
  return (pRes->info.rows == 0) ? NULL : pRes;
6,654,228✔
445
}
446

447
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7,026,089✔
448
  int32_t               code = TSDB_CODE_SUCCESS;
7,026,089✔
449
  int32_t               lino = 0;
7,026,089✔
450
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
7,026,089✔
451
  SGroupbyOperatorInfo* pInfo = pOperator->info;
7,026,089✔
452
  SGroupResInfo*        pGroupResInfo = &pInfo->groupResInfo;
7,026,089✔
453
  int32_t               order = pInfo->binfo.inputTsOrder;
7,026,089✔
454
  int64_t               st = taosGetTimestampUs();
7,026,478✔
455

456
  QRY_PARAM_CHECK(ppRes);
7,026,478!
457
  if (pOperator->status == OP_EXEC_DONE) {
7,026,478✔
458
    return code;
350,934✔
459
  }
460

461
  if (pOperator->status == OP_RES_TO_RETURN) {
6,675,544✔
462
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
6,261,585✔
463
    return code;
6,239,745✔
464
  }
465

466
  while (1) {
4,156,694✔
467
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,570,653✔
468
    if (pBlock == NULL) {
4,569,801✔
469
      break;
413,554✔
470
    }
471

472
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
4,156,247✔
473

474
    // the pDataBlock are always the same one, no need to call this again
475
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, order, pBlock->info.scanFlag, true);
4,156,247✔
476
    QUERY_CHECK_CODE(code, lino, _end);
4,156,831!
477

478
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
479
    if (pInfo->scalarSup.pExprInfo != NULL) {
4,156,831✔
480
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
439,790✔
481
                                   pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
439,790!
482
      QUERY_CHECK_CODE(code, lino, _end);
439,533✔
483
    }
484

485
    doHashGroupbyAgg(pOperator, pBlock);
4,156,523✔
486
  }
487

488
  pOperator->status = OP_RES_TO_RETURN;
413,554✔
489

490
  // initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
491
  if (pGroupResInfo->pRows != NULL) {
413,554!
492
    taosArrayDestroy(pGroupResInfo->pRows);
×
493
  }
494

495
  if (pGroupResInfo->pBuf) {
413,554!
496
    taosMemoryFree(pGroupResInfo->pBuf);
×
497
    pGroupResInfo->pBuf = NULL;
×
498
  }
499

500
  pGroupResInfo->index = 0;
413,554✔
501
  pGroupResInfo->iter = 0;
413,554✔
502
  pGroupResInfo->dataPos = NULL;
413,554✔
503

504
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
413,506✔
505

506
_end:
413,557✔
507
  if (code != TSDB_CODE_SUCCESS) {
413,557✔
508
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
51!
509
    pTaskInfo->code = code;
51✔
510
    T_LONG_JMP(pTaskInfo->env, code);
51!
511
  } else {
512
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
413,506✔
513
  }
514

515
  return code;
413,580✔
516
}
517

518
static int32_t resetGroupOperState(SOperatorInfo* pOper) {
×
519
  SGroupbyOperatorInfo* pInfo = pOper->info;
×
520
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
521
  SAggPhysiNode* pPhynode = (SAggPhysiNode*)pOper->pPhyNode;
×
522
  resetBasicOperatorState(&pInfo->binfo);
×
523
  pOper->status = OP_NOT_OPENED;
×
524

525
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
526
    false);
527

528
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
529

530
  qInfo("[group key] len use:%d", pInfo->groupKeyLen);
×
531
  int32_t code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->pAggFuncs, pPhynode->pGroupKeys,
×
532
    pInfo->groupKeyLen + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
533
    &pTaskInfo->storageAPI.functionStore);
534

535
  if (code == 0){
×
536
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
537
      &pTaskInfo->storageAPI.functionStore);
538
  }
539

540
  pInfo->isInit = false;
×
541

542
  return code;
×
543
}
544

545
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
413,293✔
546
                                SOperatorInfo** pOptrInfo) {
547
  QRY_PARAM_CHECK(pOptrInfo);
413,293!
548

549
  int32_t               code = TSDB_CODE_SUCCESS;
413,293✔
550
  int32_t               lino = 0;
413,293✔
551
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
413,293!
552
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
413,438!
553
  if (pInfo == NULL || pOperator == NULL) {
413,509!
554
    code = terrno;
×
555
    goto _error;
×
556
  }
557

558
  pOperator->pPhyNode = (SNode*)pAggNode;
413,522✔
559
  pOperator->exprSupp.hasWindowOrGroup = true;
413,522✔
560

561
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
413,522✔
562
  if (pResBlock == NULL) {
413,589!
563
    code = terrno;
×
564
    goto _error;
×
565
  }
566
  initBasicInfo(&pInfo->binfo, pResBlock);
413,589✔
567

568
  pInfo->pGroupCols = NULL;
413,614✔
569
  code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
413,614✔
570
  QUERY_CHECK_CODE(code, lino, _error);
413,545!
571

572
  int32_t    numOfScalarExpr = 0;
413,545✔
573
  SExprInfo* pScalarExprInfo = NULL;
413,545✔
574
  if (pAggNode->pExprs != NULL) {
413,545✔
575
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
147,547✔
576
    QUERY_CHECK_CODE(code, lino, _error);
147,497!
577
  }
578

579
  code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
413,495✔
580
  QUERY_CHECK_CODE(code, lino, _error);
413,372!
581

582
  initResultSizeInfo(&pOperator->resultInfo, 4096);
413,372✔
583
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
413,433✔
584
  QUERY_CHECK_CODE(code, lino, _error);
413,607!
585

586
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
413,607✔
587
  QUERY_CHECK_CODE(code, lino, _error);
413,659!
588

589
  int32_t    num = 0;
413,659✔
590
  SExprInfo* pExprInfo = NULL;
413,659✔
591

592
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
413,659✔
593
  QUERY_CHECK_CODE(code, lino, _error);
413,603!
594

595
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
413,603✔
596
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
413,603✔
597
  QUERY_CHECK_CODE(code, lino, _error);
413,653!
598

599
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
413,653✔
600
                            pTaskInfo->pStreamRuntimeInfo);
413,653✔
601
  QUERY_CHECK_CODE(code, lino, _error);
413,628!
602

603
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
413,628✔
604
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
413,583✔
605

606
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
413,513✔
607
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
413,513✔
608
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
413,513✔
609

610
  pInfo->pOperator = pOperator;
413,513✔
611

612
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo,
413,513✔
613
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
614
  setOperatorResetStateFn(pOperator, resetGroupOperState);
413,495✔
615
  code = appendDownstream(pOperator, &downstream, 1);
413,555✔
616
  QUERY_CHECK_CODE(code, lino, _error);
413,624!
617

618
  *pOptrInfo = pOperator;
413,624✔
619
  return TSDB_CODE_SUCCESS;
413,624✔
620

621
_error:
×
622
  if (pInfo != NULL) destroyGroupOperatorInfo(pInfo);
×
623
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
624
  pTaskInfo->code = code;
×
625
  return code;
×
626
}
627

628
SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) {
×
629
  int32_t code = TSDB_CODE_SUCCESS;
×
630
  int32_t lino = 0;
×
631
  if (pDataBlock == NULL) {
×
632
    return NULL;
×
633
  }
634

635
  SSDataBlock* pDstBlock = NULL;
×
636
  code = createDataBlock(&pDstBlock);
×
637
  QUERY_CHECK_CODE(code, lino, _end);
×
638

639
  pDstBlock->info = pDataBlock->info;
×
640
  pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
×
641
  pDstBlock->info.capacity = 0;
×
642
  pDstBlock->info.rowSize = 0;
×
643

644
  size_t numOfCols = pOperator->exprSupp.numOfExprs;
×
645
  if (pDataBlock->pBlockAgg) {
×
646
    pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
×
647
    if (pDstBlock->pBlockAgg == NULL) {
×
648
      blockDataDestroy(pDstBlock);
×
649
      return NULL;
×
650
    }
651
    for (int i = 0; i < numOfCols; ++i) {
×
652
      pDstBlock->pBlockAgg[i].colId = -1;
×
653
    }
654
  }
655

656
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
×
657
    SExprInfo*       pExpr = &pOperator->exprSupp.pExprInfo[i];
×
658
    int32_t          slotId = pExpr->base.pParam[0].pCol->slotId;
×
659
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
×
660
    SColumnInfoData  colInfo = {.hasNull = true, .info = pSrc->info};
×
661
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
×
662
    QUERY_CHECK_CODE(code, lino, _end);
×
663

664
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
×
665
    if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) {
×
666
      pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
×
667
    } else {
668
      code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
×
669
      QUERY_CHECK_CODE(code, lino, _end);
×
670

671
      code = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
×
672
      QUERY_CHECK_CODE(code, lino, _end);
×
673
    }
674
  }
675

676
_end:
×
677
  if (code != TSDB_CODE_SUCCESS) {
×
678
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
679
    blockDataDestroy(pDstBlock);
×
680
    return NULL;
×
681
  }
682
  return pDstBlock;
×
683
}
684

685
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
217,305✔
686
  int32_t                 code = TSDB_CODE_SUCCESS;
217,305✔
687
  int32_t                 lino = 0;
217,305✔
688
  SPartitionOperatorInfo* pInfo = pOperator->info;
217,305✔
689
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
217,305✔
690

691
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
59,558,882✔
692
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
59,341,634✔
693
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
58,585,472✔
694

695
    SDataGroupInfo* pGroupInfo = NULL;
59,116,608✔
696
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
59,116,608✔
697
    if (pPage == NULL) {
59,836,067!
698
      T_LONG_JMP(pTaskInfo->env, terrno);
×
699
    }
700

701
    pGroupInfo->numOfRows += 1;
59,836,067✔
702

703
    // group id
704
    if (pGroupInfo->groupId == 0) {
59,836,067✔
705
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
130,858✔
706
    }
707

708
    if (pBlock->info.dataLoad) {
59,836,085!
709
      // number of rows
710
      int32_t* rows = (int32_t*)pPage;
59,836,085✔
711

712
      size_t numOfCols = pOperator->exprSupp.numOfExprs;
59,836,085✔
713
      for (int32_t i = 0; i < numOfCols; ++i) {
293,692,376✔
714
        SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
232,566,240✔
715
        int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
232,566,240✔
716

717
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
232,566,240✔
718

719
        int32_t bytes = pColInfoData->info.bytes;
232,381,022✔
720
        int32_t startOffset = pInfo->columnOffset[i];
232,381,022✔
721

722
        int32_t* columnLen = NULL;
232,381,022✔
723
        int32_t  contentLen = 0;
232,381,022✔
724

725
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
299,856,137!
726
          int32_t* offset = (int32_t*)((char*)pPage + startOffset);
67,475,115✔
727
          columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
67,475,115✔
728
          char* data = (char*)((char*)columnLen + sizeof(int32_t));
67,475,115✔
729

730
          if (colDataIsNull_s(pColInfoData, j)) {
134,950,230✔
731
            offset[(*rows)] = -1;
3,993,725✔
732
            contentLen = 0;
3,993,725✔
733
          } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
63,481,390✔
734
            offset[*rows] = (*columnLen);
128✔
735
            char*   src = colDataGetData(pColInfoData, j);
128!
736
            int32_t dataLen = getJsonValueLen(src);
128✔
737

738
            memcpy(data + (*columnLen), src, dataLen);
128✔
739
            int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
128✔
740
            QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
128!
741

742
            contentLen = dataLen;
128✔
743
          } else {
744
            offset[*rows] = (*columnLen);
63,481,262✔
745
            char* src = colDataGetData(pColInfoData, j);
63,481,262!
746
            memcpy(data + (*columnLen), src, varDataTLen(src));
63,481,262✔
747
            int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
63,481,262✔
748
            QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
63,481,262!
749

750
            contentLen = varDataTLen(src);
63,481,262✔
751
          }
752
        } else {
753
          char* bitmap = (char*)pPage + startOffset;
164,905,907✔
754
          columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
164,905,907✔
755
          char* data = (char*)columnLen + sizeof(int32_t);
164,905,907✔
756

757
          bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
164,905,907✔
758
          if (isNull) {
164,905,907✔
759
            colDataSetNull_f(bitmap, (*rows));
10,822,628✔
760
          } else {
761
            memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
154,083,279!
762
            QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
154,083,279!
763
                                  lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
764
          }
765
          contentLen = bytes;
166,381,176✔
766
        }
767

768
        (*columnLen) += contentLen;
233,856,291✔
769
      }
770

771
      (*rows) += 1;
61,126,136✔
772

773
      setBufPageDirty(pPage, true);
61,126,136✔
774
      releaseBufPage(pInfo->pBuf, pPage);
59,686,332✔
775
    } else {
776
      SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock);
×
777
      if (dataNotLoadBlock == NULL) {
×
778
        T_LONG_JMP(pTaskInfo->env, terrno);
×
779
      }
780
      if (pGroupInfo->blockForNotLoaded == NULL) {
×
781
        pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
×
782
        QUERY_CHECK_NULL(pGroupInfo->blockForNotLoaded, code, lino, _end, terrno);
×
783
        pGroupInfo->offsetForNotLoaded = 0;
×
784
      }
785
      dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
×
786
      dataNotLoadBlock->info.dataLoad = 0;
×
787
      void* tmp = taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
×
788
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
789
      break;
×
790
    }
791
  }
792

793
_end:
217,248✔
794
  if (code != TSDB_CODE_SUCCESS) {
217,248!
795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
796
    T_LONG_JMP(pTaskInfo->env, code);
×
797
  }
798
}
217,248✔
799

800
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
59,223,925✔
801
  int32_t         code = TSDB_CODE_SUCCESS;
59,223,925✔
802
  int32_t         lino = 0;
59,223,925✔
803
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
59,223,925✔
804

805
  void* pPage = NULL;
60,464,586✔
806
  if (p == NULL) {  // it is a new group
60,464,586✔
807
    SDataGroupInfo gi = {0};
130,818✔
808
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
130,818✔
809
    QUERY_CHECK_NULL(gi.pPageList, code, lino, _end, terrno);
130,861!
810

811
    code = taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
130,861✔
812
    if (code == TSDB_CODE_DUP_KEY) {
130,888!
813
      code = TSDB_CODE_SUCCESS;
×
814
    }
815
    QUERY_CHECK_CODE(code, lino, _end);
130,888!
816

817
    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
130,888✔
818

819
    int32_t pageId = 0;
130,834✔
820
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
130,834✔
821
    if (pPage == NULL) {
130,868!
822
      return pPage;
×
823
    }
824

825
    void* tmp = taosArrayPush(p->pPageList, &pageId);
130,868✔
826
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
130,870!
827

828
    *(int32_t*)pPage = 0;
130,870✔
829
  } else {
830
    int32_t* curId = taosArrayGetLast(p->pPageList);
60,333,768✔
831
    pPage = getBufPage(pInfo->pBuf, *curId);
60,272,710✔
832
    if (pPage == NULL) {
59,775,168!
833
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
834
      return pPage;
×
835
    }
836

837
    int32_t* rows = (int32_t*)pPage;
59,775,168✔
838
    if (*rows >= pInfo->rowCapacity) {
59,775,168✔
839
      // release buffer
840
      releaseBufPage(pInfo->pBuf, pPage);
1,229,609✔
841

842
      // add a new page for current group
843
      int32_t pageId = 0;
1,228,753✔
844
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
1,228,753✔
845
      if (pPage == NULL) {
1,231,956!
846
        qError("failed to get new buffer, code:%s", tstrerror(terrno));
×
847
        return NULL;
×
848
      }
849

850
      void* tmp = taosArrayPush(p->pPageList, &pageId);
1,231,956✔
851
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,231,590!
852

853
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
1,231,590✔
854
    }
855
  }
856

857
  *pGroupInfo = p;
59,907,819✔
858

859
_end:
59,907,819✔
860
  if (code != TSDB_CODE_SUCCESS) {
59,907,819!
861
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
862
    return NULL;
×
863
  }
864

865
  return pPage;
59,907,819✔
866
}
867

868
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
17,915✔
869
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
17,915✔
870
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
17,929✔
871
  if (!offset) {
17,933!
872
    return NULL;
×
873
  }
874

875
  offset[0] = sizeof(int32_t) +
17,933✔
876
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
877

878
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
101,648✔
879
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
83,722✔
880

881
    int32_t bytes = pColInfoData->info.bytes;
83,715✔
882
    int32_t payloadLen = bytes * rowCapacity;
83,715✔
883

884
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
83,715!
885
      // offset segment + content length + payload
886
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
13,499✔
887
    } else {
888
      // bitmap + content length + payload
889
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
70,216✔
890
    }
891
  }
892

893
  return offset;
17,926✔
894
}
895

896
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
17,740✔
897
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
17,740✔
898
  for (int32_t i = 0; i < size; i++) {
120,945✔
899
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
103,205✔
900
    if (pGp && pGp->blockForNotLoaded) {
103,205!
901
      for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
×
902
        SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
×
903
        if (pBlock) blockDataDestroy(*pBlock);
×
904
      }
905
      taosArrayClear(pGp->blockForNotLoaded);
×
906
      pGp->offsetForNotLoaded = 0;
×
907
    }
908
    taosArrayDestroy(pGp->pPageList);
103,205✔
909
  }
910
  taosArrayClear(pInfo->sortedGroupArray);
17,740✔
911
  clearDiskbasedBuf(pInfo->pBuf);
17,741✔
912
}
17,741✔
913

914
static int compareDataGroupInfo(const void* group1, const void* group2) {
675,478✔
915
  const SDataGroupInfo* pGroupInfo1 = group1;
675,478✔
916
  const SDataGroupInfo* pGroupInfo2 = group2;
675,478✔
917

918
  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
675,478!
919
    return 0;
×
920
  }
921

922
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
675,478✔
923
}
924

925
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
104,286✔
926
  if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
104,286!
927
    SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
×
928
    if (!pBlock) {
×
929
      return NULL;
×
930
    }
931
    pGroupInfo->offsetForNotLoaded++;
×
932
    return *pBlock;
×
933
  }
934
  return NULL;
104,286✔
935
}
936

937
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
1,454,204✔
938
  int32_t                 code = TSDB_CODE_SUCCESS;
1,454,204✔
939
  int32_t                 lino = 0;
1,454,204✔
940
  SPartitionOperatorInfo* pInfo = pOperator->info;
1,454,204✔
941
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
1,454,204✔
942

943
  if (pInfo->remainRows == 0) {
1,454,204✔
944
    blockDataCleanup(pInfo->binfo.pRes);
1,255,316✔
945
    SDataGroupInfo* pGroupInfo =
1,255,106✔
946
        (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
1,255,154✔
947
    if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
1,255,106✔
948
      if (pGroupInfo != NULL) {
122,219✔
949
        SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
104,286✔
950
        if (ret != NULL) return ret;
104,285!
951
      }
952
      // try next group data
953
      if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
122,218✔
954
        setOperatorCompleted(pOperator);
17,740✔
955
        clearPartitionOperator(pInfo);
17,740✔
956
        return NULL;
17,741✔
957
      }
958
      ++pInfo->groupIndex;
104,469✔
959

960
      pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
104,469✔
961
      if (pGroupInfo == NULL) {
104,473!
962
        qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
963
        T_LONG_JMP(pTaskInfo->env, terrno);
×
964
      }
965
      pInfo->pageIndex = 0;
104,473✔
966
    }
967

968
    int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
1,237,318✔
969
    if (pageId == NULL) {
1,237,039!
970
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
971
      T_LONG_JMP(pTaskInfo->env, terrno);
×
972
    }
973
    void*    page = getBufPage(pInfo->pBuf, *pageId);
1,237,039✔
974
    if (page == NULL) {
1,236,811!
975
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
976
      T_LONG_JMP(pTaskInfo->env, terrno);
×
977
    }
978
    if (*(int32_t*)page == 0) {
1,236,948!
979
      releaseBufPage(pInfo->pBuf, page);
×
980
      SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
×
981
      if (ret != NULL) return ret;
×
982
      if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
×
983
        pInfo->groupIndex++;
×
984
        pInfo->pageIndex = 0;
×
985
      } else {
986
        setOperatorCompleted(pOperator);
×
987
        clearPartitionOperator(pInfo);
×
988
        return NULL;
×
989
      }
990
      return buildPartitionResult(pOperator);
×
991
    }
992

993
    code = blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
1,236,948✔
994
    QUERY_CHECK_CODE(code, lino, _end);
1,236,929!
995

996
    code = blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
1,236,929✔
997
    QUERY_CHECK_CODE(code, lino, _end);
1,237,301!
998

999
    pInfo->pageIndex += 1;
1,237,301✔
1000
    releaseBufPage(pInfo->pBuf, page);
1,237,301✔
1001
    pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
1,237,284✔
1002
    pInfo->binfo.pRes->info.dataLoad = 1;
1,237,284✔
1003
    pInfo->orderedRows = 0;
1,237,284✔
1004
  } else if (pInfo->pOrderInfoArr == NULL) {
198,888!
1005
    qError("Exception, remainRows not zero, but pOrderInfoArr is NULL");
×
1006
  }
1007

1008
  if (pInfo->pOrderInfoArr) {
1,436,172✔
1009
    pInfo->binfo.pRes->info.rows += pInfo->remainRows;
600,896✔
1010
    code = blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows);
600,896✔
1011
    QUERY_CHECK_CODE(code, lino, _end);
600,885!
1012
    pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr);
600,885✔
1013
    pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows;
601,349✔
1014
    pInfo->binfo.pRes->info.rows = pInfo->orderedRows;
601,349✔
1015
  }
1016

1017
  code = blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
1,436,625✔
1018
  QUERY_CHECK_CODE(code, lino, _end);
1,436,444!
1019

1020
_end:
1,436,444✔
1021
  if (code != TSDB_CODE_SUCCESS) {
1,436,444!
1022
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1023
    T_LONG_JMP(pTaskInfo->env, code);
×
1024
  }
1025

1026
  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
1,436,444✔
1027
  return pInfo->binfo.pRes;
1,436,444✔
1028
}
1029

1030
static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,454,236✔
1031
  if (pOperator->status == OP_EXEC_DONE) {
1,454,236✔
1032
    (*ppRes) = NULL;
52✔
1033
    return TSDB_CODE_SUCCESS;
52✔
1034
  }
1035

1036
  int32_t                 code = TSDB_CODE_SUCCESS;
1,454,184✔
1037
  int32_t                 lino = 0;
1,454,184✔
1038
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
1,454,184✔
1039
  SPartitionOperatorInfo* pInfo = pOperator->info;
1,454,184✔
1040
  SSDataBlock*            pRes = pInfo->binfo.pRes;
1,454,184✔
1041

1042
  if (pOperator->status == OP_RES_TO_RETURN) {
1,454,184✔
1043
    (*ppRes) =  buildPartitionResult(pOperator);
1,436,284✔
1044
    return code;
1,436,252✔
1045
  }
1046

1047
  int64_t        st = taosGetTimestampUs();
17,922✔
1048
  SOperatorInfo* downstream = pOperator->pDownstream[0];
17,922✔
1049

1050
  while (1) {
217,253✔
1051
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
235,175✔
1052
    if (pBlock == NULL) {
235,242✔
1053
      break;
17,933✔
1054
    }
1055

1056
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
217,309✔
1057
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1058
    if (pInfo->scalarSup.pExprInfo != NULL) {
217,309✔
1059
      code =
1060
          projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
621✔
1061
                                pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
621!
1062
      QUERY_CHECK_CODE(code, lino, _end);
621!
1063
    }
1064

1065
    terrno = TSDB_CODE_SUCCESS;
217,309✔
1066
    doHashPartition(pOperator, pBlock);
217,314✔
1067
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
217,253!
1068
      code = terrno;
×
1069
      QUERY_CHECK_CODE(code, lino, _end);
×
1070
    }
1071
  }
1072

1073
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
17,933✔
1074
  QUERY_CHECK_NULL(groupArray, code, lino, _end, terrno);
17,933!
1075

1076
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
17,933✔
1077
  while (pGroupIter != NULL) {
148,841✔
1078
    SDataGroupInfo* pGroupInfo = pGroupIter;
130,907✔
1079
    void*           tmp = taosArrayPush(groupArray, pGroupInfo);
130,905✔
1080
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
130,905!
1081
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
130,905✔
1082
  }
1083

1084
  taosArraySort(groupArray, compareDataGroupInfo);
17,934✔
1085
  pInfo->sortedGroupArray = groupArray;
17,933✔
1086
  pInfo->groupIndex = -1;
17,933✔
1087
  taosHashClear(pInfo->pGroupSet);
17,933✔
1088

1089
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
17,934✔
1090

1091
  pOperator->status = OP_RES_TO_RETURN;
17,934✔
1092
  code = blockDataEnsureCapacity(pRes, 4096);
17,934✔
1093
  QUERY_CHECK_CODE(code, lino, _end);
17,933!
1094

1095
_end:
17,933✔
1096
  if (code != TSDB_CODE_SUCCESS) {
17,933!
1097
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1098
    pTaskInfo->code = code;
×
1099
    T_LONG_JMP(pTaskInfo->env, code);
×
1100
  }
1101

1102
  (*ppRes) = buildPartitionResult(pOperator);
17,933✔
1103
  return code;
17,932✔
1104
}
1105

1106
static void destroyPartitionOperatorInfo(void* param) {
17,928✔
1107
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
17,928✔
1108
  cleanupBasicInfo(&pInfo->binfo);
17,928✔
1109
  taosArrayDestroy(pInfo->pGroupCols);
17,933✔
1110

1111
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
42,525✔
1112
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
24,591✔
1113
    taosMemoryFree(key.pData);
24,589!
1114
  }
1115

1116
  taosArrayDestroy(pInfo->pGroupColVals);
17,934✔
1117
  taosMemoryFree(pInfo->keyBuf);
17,934!
1118

1119
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
17,934✔
1120
  for (int32_t i = 0; i < size; i++) {
45,632✔
1121
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
27,698✔
1122
    if (pGp) {
27,698!
1123
      taosArrayDestroy(pGp->pPageList);
27,698✔
1124
    }
1125
  }
1126
  taosArrayDestroy(pInfo->sortedGroupArray);
17,934✔
1127

1128
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
17,934✔
1129
  while (pGroupIter != NULL) {
17,933!
1130
    SDataGroupInfo* pGroupInfo = pGroupIter;
×
1131
    taosArrayDestroy(pGroupInfo->pPageList);
×
1132
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
×
1133
  }
1134

1135
  taosHashCleanup(pInfo->pGroupSet);
17,933✔
1136
  taosMemoryFree(pInfo->columnOffset);
17,934!
1137

1138
  cleanupExprSupp(&pInfo->scalarSup);
17,934✔
1139
  destroyDiskbasedBuf(pInfo->pBuf);
17,934✔
1140
  taosArrayDestroy(pInfo->pOrderInfoArr);
17,934✔
1141
  taosMemoryFreeClear(param);
17,931!
1142
}
17,931✔
1143

1144
static int32_t resetPartitionOperState(SOperatorInfo* pOper) {
×
1145
  SPartitionOperatorInfo* pInfo = pOper->info;
×
1146
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1147
  SPartitionPhysiNode* pPhynode = (SPartitionPhysiNode*)pOper->pPhyNode;
×
1148
  resetBasicOperatorState(&pInfo->binfo);
×
1149

1150
  int32_t code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
1151
    &pTaskInfo->storageAPI.functionStore);
1152

1153
  clearPartitionOperator(pInfo);
×
1154

1155
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
×
1156
  while (pGroupIter != NULL) {
×
1157
    SDataGroupInfo* pGroupInfo = pGroupIter;
×
1158
    taosArrayDestroy(pGroupInfo->pPageList);
×
1159
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
×
1160
  }
1161
  taosHashClear(pInfo->pGroupSet);
×
1162

1163
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
×
1164
  for (int32_t i = 0; i < size; i++) {
×
1165
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
×
1166
    if (pGp) {
×
1167
      taosArrayDestroy(pGp->pPageList);
×
1168
    }
1169
  }
1170
  taosArrayDestroy(pInfo->sortedGroupArray);
×
1171
  pInfo->sortedGroupArray = NULL;
×
1172

1173
  pInfo->groupIndex = 0;
×
1174
  pInfo->pageIndex = 0;
×
1175
  pInfo->remainRows = 0;
×
1176
  pInfo->orderedRows = 0;
×
1177
  return 0;
×
1178
}
1179

1180
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
17,915✔
1181
                                           SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1182
  QRY_PARAM_CHECK(pOptrInfo);
17,915!
1183

1184
  int32_t                 code = TSDB_CODE_SUCCESS;
17,915✔
1185
  int32_t                 lino = 0;
17,915✔
1186
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
17,915!
1187
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
17,924!
1188
  if (pInfo == NULL || pOperator == NULL) {
17,920!
1189
    pTaskInfo->code = code = terrno;
×
1190
    goto _error;
×
1191
  }
1192

1193
  pOperator->pPhyNode = pPartNode;
17,920✔
1194
  int32_t    numOfCols = 0;
17,920✔
1195
  SExprInfo* pExprInfo = NULL;
17,920✔
1196
  code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
17,920✔
1197
  QUERY_CHECK_CODE(code, lino, _error);
17,930!
1198
  pOperator->exprSupp.numOfExprs = numOfCols;
17,930✔
1199
  pOperator->exprSupp.pExprInfo = pExprInfo;
17,930✔
1200

1201
  pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
17,930✔
1202

1203
  if (pPartNode->needBlockOutputTsOrder) {
17,929✔
1204
    SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
923✔
1205
    pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
923✔
1206
    if (!pInfo->pOrderInfoArr) {
924!
1207
      pTaskInfo->code = terrno;
×
1208
      goto _error;
×
1209
    }
1210

1211
    void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
924✔
1212
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
924!
1213
  }
1214

1215
  if (pPartNode->pExprs != NULL) {
17,930✔
1216
    int32_t    num = 0;
979✔
1217
    SExprInfo* pExprInfo1 = NULL;
979✔
1218
    code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num);
979✔
1219
    QUERY_CHECK_CODE(code, lino, _error);
981!
1220

1221
    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
981✔
1222
    QUERY_CHECK_CODE(code, lino, _error);
979!
1223
  }
1224

1225
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
17,930✔
1226
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
17,919✔
1227
  if (pInfo->pGroupSet == NULL) {
17,926!
1228
    goto _error;
×
1229
  }
1230

1231
  uint32_t defaultPgsz = 0;
17,926✔
1232
  int64_t defaultBufsz = 0;
17,926✔
1233

1234
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
17,926✔
1235
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
17,933!
1236
  code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
17,933✔
1237
  if (code != TSDB_CODE_SUCCESS) {
17,930!
1238
    goto _error;
×
1239
  }
1240

1241
  if (!osTempSpaceAvailable()) {
17,930!
1242
    terrno = TSDB_CODE_NO_DISKSPACE;
×
1243
    qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1244
    goto _error;
×
1245
  }
1246

1247
  code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
17,922✔
1248
  if (code != TSDB_CODE_SUCCESS) {
17,925!
1249
    goto _error;
×
1250
  }
1251

1252
  pInfo->rowCapacity =
17,932✔
1253
      blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
17,924✔
1254
                                blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
17,925✔
1255
  if (pInfo->rowCapacity < 0) {
17,932!
1256
    code = terrno;
×
1257
    goto _error;
×
1258
  }
1259
  
1260
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
17,932✔
1261
  QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
17,934!
1262

1263
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
17,934✔
1264
  if (code != TSDB_CODE_SUCCESS) {
17,929!
1265
    goto _error;
×
1266
  }
1267

1268
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
17,929✔
1269
                  pTaskInfo);
1270

1271
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo,
17,924✔
1272
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1273

1274
  setOperatorResetStateFn(pOperator, resetPartitionOperState);
17,920✔
1275
  code = appendDownstream(pOperator, &downstream, 1);
17,922✔
1276
  if (code != TSDB_CODE_SUCCESS) {
17,922!
1277
    goto _error;
×
1278
  }
1279

1280
  *pOptrInfo = pOperator;
17,922✔
1281
  return TSDB_CODE_SUCCESS;
17,922✔
1282

1283
_error:
×
1284
  if (pInfo != NULL) {
×
1285
    destroyPartitionOperatorInfo(pInfo);
×
1286
  }
1287
  pTaskInfo->code = code;
×
1288
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1289
  TAOS_RETURN(code);
×
1290
}
1291

1292
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
25,689,084✔
1293
                                int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
1294
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
25,689,084✔
1295
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
25,689,084✔
1296
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
25,689,084✔
1297

1298
  SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
25,689,084✔
1299
                                                  false, pAggSup, false);
1300
  if (pResultRow == NULL || pTaskInfo->code != 0) {
25,927,689!
1301
    return pTaskInfo->code;
×
1302
  }
1303

1304
  return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
25,931,364✔
1305
}
1306

1307
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
1,329✔
1308
  int32_t      code = TSDB_CODE_SUCCESS;
1,329✔
1309
  int32_t      lino = 0;
1,329✔
1310
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1,329!
1311
  if (!pBlock) {
1,331!
1312
    return NULL;
×
1313
  }
1314
  pBlock->info.hasVarCol = false;
1,331✔
1315
  pBlock->info.id.groupId = 0;
1,331✔
1316
  pBlock->info.rows = 0;
1,331✔
1317
  pBlock->info.type = STREAM_CREATE_CHILD_TABLE;
1,331✔
1318
  pBlock->info.watermark = INT64_MIN;
1,331✔
1319

1320
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
1,331✔
1321
  QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno);
1,331!
1322
  SColumnInfoData infoData = {0};
1,331✔
1323
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
1,331✔
1324
  if (tbName->numOfExprs > 0) {
1,331!
1325
    infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes;
×
1326
  } else {
1327
    infoData.info.bytes = 1;
1,331✔
1328
  }
1329
  pBlock->info.rowSize += infoData.info.bytes;
1,331✔
1330
  // sub table name
1331
  void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
1,331✔
1332
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,331!
1333

1334
  SColumnInfoData gpIdData = {0};
1,331✔
1335
  gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
1,331✔
1336
  gpIdData.info.bytes = 8;
1,331✔
1337
  pBlock->info.rowSize += gpIdData.info.bytes;
1,331✔
1338
  // group id
1339
  tmp = taosArrayPush(pBlock->pDataBlock, &gpIdData);
1,331✔
1340
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,331!
1341

1342
  for (int32_t i = 0; i < tag->numOfExprs; i++) {
1,331!
1343
    SColumnInfoData tagCol = {0};
×
1344
    tagCol.info.type = tag->pExprInfo[i].base.resSchema.type;
×
1345
    tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes;
×
1346
    tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
×
1347
    // tag info
1348
    tmp = taosArrayPush(pBlock->pDataBlock, &tagCol);
×
1349
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1350
    pBlock->info.rowSize += tagCol.info.bytes;
×
1351
  }
1352

1353
_end:
1,331✔
1354
  if (code != TSDB_CODE_SUCCESS) {
1,331!
1355
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1356
    blockDataDestroy(pBlock);
×
1357
    return NULL;
×
1358
  }
1359
  return pBlock;
1,331✔
1360
}
1361

1362
void freePartItem(void* ptr) {
×
1363
  SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
×
1364
  taosArrayDestroy(pPart->rowIds);
×
1365
}
×
1366

1367
int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
413,406✔
1368
  int32_t code = TSDB_CODE_SUCCESS;
413,406✔
1369
  int32_t lino = 0;
413,406✔
1370
  size_t  numOfCols = LIST_LENGTH(pNodeList);
413,406!
1371
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
413,406✔
1372
  if (pList == NULL) {
413,471!
1373
    code = terrno;
×
1374
    (*pArrayRes) = NULL;
×
1375
    QUERY_CHECK_CODE(code, lino, _end);
×
1376
  }
1377

1378
  for (int32_t i = 0; i < numOfCols; ++i) {
1,096,658✔
1379
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
682,944✔
1380
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
682,156!
1381

1382
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
682,244!
1383
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
682,244✔
1384

1385
      SColumn c = extractColumnFromColumnNode(pColNode);
682,244✔
1386
      void*   tmp = taosArrayPush(pList, &c);
683,184✔
1387
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
683,184!
1388
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
×
1389
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
×
1390
      SColumn     c = {0};
×
1391
      c.slotId = pNode->slotId;
×
1392
      c.colId = pNode->slotId;
×
1393
      c.type = pValNode->node.type;
×
1394
      c.bytes = pValNode->node.resType.bytes;
×
1395
      c.scale = pValNode->node.resType.scale;
×
1396
      c.precision = pValNode->node.resType.precision;
×
1397

1398
      void* tmp = taosArrayPush(pList, &c);
×
1399
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1400
    }
1401
  }
1402

1403
  (*pArrayRes) = pList;
413,714✔
1404

1405
_end:
413,714✔
1406
  if (code != TSDB_CODE_SUCCESS) {
413,714!
1407
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  }
1409
  return code;
413,503✔
1410
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc