• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.91
/source/libs/executor/src/groupoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "tname.h"
20
#include "tutil.h"
21

22
#include "tdatablock.h"
23
#include "tmsg.h"
24

25
#include "executorInt.h"
26
#include "operator.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "thash.h"
30
#include "ttypes.h"
31

32
typedef struct SGroupbyOperatorInfo {
33
  SOptrBasicInfo binfo;
34
  SAggSupporter  aggSup;
35
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
36
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
37
  bool           isInit;         // denote if current val is initialized or not
38
  char*          keyBuf;         // group by keys for hash
39
  int32_t        groupKeyLen;    // total group by column width
40
  SGroupResInfo  groupResInfo;
41
  SExprSupp      scalarSup;
42
  SOperatorInfo  *pOperator;
43
} SGroupbyOperatorInfo;
44

45
// The sort in partition may be needed later.
46
typedef struct SPartitionOperatorInfo {
47
  SOptrBasicInfo binfo;
48
  SArray*        pGroupCols;
49
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
50
  char*          keyBuf;         // group by keys for hash
51
  int32_t        groupKeyLen;    // total group by column width
52
  SHashObj*      pGroupSet;      // quick locate the window object for each result
53

54
  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
55
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
56
  int32_t*       columnOffset;      // start position for each column data
57
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
58
  int32_t        groupIndex;        // group index
59
  int32_t        pageIndex;         // page index of current group
60
  SExprSupp      scalarSup;
61

62
  int32_t remainRows;
63
  int32_t orderedRows;
64
  SArray* pOrderInfoArr;
65
} SPartitionOperatorInfo;
66

67
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
68
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
69
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
70
                                        int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
71
static int32_t  extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes);
72

73
static void freeGroupKey(void* param) {
480,634✔
74
  SGroupKeys* pKey = (SGroupKeys*)param;
480,634✔
75
  taosMemoryFree(pKey->pData);
480,634!
76
}
480,687✔
77

78
static void destroyGroupOperatorInfo(void* param) {
364,553✔
79
  if (param == NULL) {
364,553!
80
    return;
×
81
  }
82
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
364,553✔
83

84
  cleanupBasicInfo(&pInfo->binfo);
364,553✔
85
  taosMemoryFreeClear(pInfo->keyBuf);
364,616!
86
  taosArrayDestroy(pInfo->pGroupCols);
364,620✔
87
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
364,616✔
88
  cleanupExprSupp(&pInfo->scalarSup);
364,606✔
89

90
  if (pInfo->pOperator != NULL) {
364,592!
91
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
364,593✔
92
                      false);
93
    pInfo->pOperator = NULL;
364,521✔
94
  }
95

96
  cleanupGroupResInfo(&pInfo->groupResInfo);
364,520✔
97
  cleanupAggSup(&pInfo->aggSup);
364,532✔
98
  taosMemoryFreeClear(param);
364,621!
99
}
100

101
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
391,338✔
102
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
391,338✔
103
  if ((*pGroupColVals) == NULL) {
391,589!
104
    return terrno;
×
105
  }
106

107
  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
391,589✔
108
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
948,490✔
109
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
556,844✔
110
    if (!pCol) {
556,770!
111
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
112
      return terrno;
×
113
    }
114
    (*keyLen) += pCol->bytes;  // actual data + null_flag
556,770✔
115

116
    SGroupKeys key = {0};
556,770✔
117
    key.bytes = pCol->bytes;
556,770✔
118
    key.type = pCol->type;
556,770✔
119
    key.isNull = false;
556,770✔
120
    key.pData = taosMemoryCalloc(1, pCol->bytes);
556,770✔
121
    if (key.pData == NULL) {
557,053!
122
      return terrno;
×
123
    }
124

125
    void* tmp = taosArrayPush((*pGroupColVals), &key);
557,053✔
126
    if (!tmp) {
557,066!
127
      return terrno;
×
128
    }
129
  }
130

131
  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
391,646✔
132
  (*keyLen) += nullFlagSize;
391,646✔
133

134
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
391,646✔
135
  if ((*keyBuf) == NULL) {
391,607!
136
    return terrno;
×
137
  }
138

139
  return TSDB_CODE_SUCCESS;
391,607✔
140
}
141

142
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
144,574,693✔
143
                            int32_t numOfGroupCols) {
144
  SColumnDataAgg* pColAgg = NULL;
144,574,693✔
145
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
392,834,525✔
146
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
264,389,340✔
147
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
263,930,707✔
148
    if (pBlock->pBlockAgg != NULL) {
263,891,952!
149
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
150
    }
151

152
    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
263,891,952✔
153

154
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
263,891,952✔
155
    if (pkey->isNull && isNull) {
263,722,490✔
156
      continue;
1,726,165✔
157
    }
158

159
    if (isNull || pkey->isNull) {
261,996,325!
160
      return false;
8,153✔
161
    }
162

163
    char* val = colDataGetData(pColInfoData, rowIndex);
261,988,172!
164

165
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
261,988,172✔
166
      int32_t dataLen = getJsonValueLen(val);
62✔
167

168
      if (memcmp(pkey->pData, val, dataLen) == 0) {
62✔
169
        continue;
7✔
170
      } else {
171
        return false;
55✔
172
      }
173
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
261,988,110!
174
      int32_t len = varDataLen(val);
12,293,220✔
175
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
12,293,220✔
176
        continue;
9,705,750✔
177
      } else {
178
        return false;
2,587,470✔
179
      }
180
    } else {
181
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
249,694,890✔
182
        return false;
12,866,980✔
183
      }
184
    }
185
  }
186

187
  return true;
128,445,185✔
188
}
189

190
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
67,006,854✔
191
  SColumnDataAgg* pColAgg = NULL;
67,006,854✔
192

193
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
67,006,854✔
194

195
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
186,826,522✔
196
    SColumn*         pCol = (SColumn*)taosArrayGet(pGroupCols, i);
120,884,969✔
197
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
120,648,748✔
198

199
    // valid range check. todo: return error code.
200
    if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
120,249,432!
201
      continue;
×
202
    }
203

204
    if (pBlock->pBlockAgg != NULL) {
120,209,230!
205
      pColAgg = &pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
×
206
    }
207

208
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
120,209,230✔
209
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
239,806,868✔
210
      pkey->isNull = true;
8,256,029✔
211
    } else {
212
      pkey->isNull = false;
111,647,405✔
213
      char* val = colDataGetData(pColInfoData, rowIndex);
111,647,405!
214
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
111,647,405✔
215
        if (tTagIsJson(val)) {
157✔
216
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
4✔
217
          return;
4✔
218
        }
219
        int32_t dataLen = getJsonValueLen(val);
153✔
220
        memcpy(pkey->pData, val, dataLen);
153✔
221
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
111,647,248!
222
        memcpy(pkey->pData, val, varDataTLen(val));
69,739,417✔
223
      } else {
224
        memcpy(pkey->pData, val, pkey->bytes);
41,907,831✔
225
      }
226
    }
227
  }
228
}
229

230
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
66,454,571✔
231
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
66,454,571✔
232

233
  char* isNull = (char*)pKey;
66,425,238✔
234
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
66,425,238✔
235
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
187,152,357✔
236
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
120,775,780✔
237
    if (pkey->isNull) {
120,727,119✔
238
      isNull[i] = 1;
8,262,078✔
239
      continue;
8,262,078✔
240
    }
241

242
    isNull[i] = 0;
112,465,041✔
243
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
112,465,041✔
244
      int32_t dataLen = getJsonValueLen(pkey->pData);
153✔
245
      memcpy(pStart, (pkey->pData), dataLen);
153✔
246
      pStart += dataLen;
153✔
247
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
112,464,888!
248
      varDataCopy(pStart, pkey->pData);
70,169,875✔
249
      pStart += varDataTLen(pkey->pData);
70,169,875✔
250
    } else {
251
      memcpy(pStart, pkey->pData, pkey->bytes);
42,295,013✔
252
      pStart += pkey->bytes;
42,295,013✔
253
    }
254
  }
255

256
  return (int32_t)(pStart - (char*)pKey);
66,376,577✔
257
}
258

259
// assign the group keys or user input constant values if required
260
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
16,339,472✔
261
  for (int32_t i = 0; i < numOfOutput; ++i) {
72,607,006✔
262
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
56,267,534✔
263
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
34,647,884✔
264

265
      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
34,647,884✔
266
      // todo OPT all/all not NULL
267
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
69,295,768✔
268
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
30,303,347✔
269
        char* data = colDataGetData(pColInfoData, rowIndex);
30,303,347!
270

271
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
30,303,347✔
272
          int32_t dataLen = getJsonValueLen(data);
89✔
273
          memcpy(dest, data, dataLen);
89✔
274
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
30,303,258!
275
          varDataCopy(dest, data);
6,141,133✔
276
        } else {
277
          memcpy(dest, data, pColInfoData->info.bytes);
24,162,125✔
278
        }
279
      } else {  // it is a NULL value
280
        pEntryInfo->isNullRes = 1;
4,344,537✔
281
      }
282

283
      pEntryInfo->numOfRes = 1;
34,647,884✔
284
    }
285
  }
286
}
16,339,472✔
287

288
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
3,706,430✔
289
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
3,706,430✔
290
  SGroupbyOperatorInfo* pInfo = pOperator->info;
3,706,430✔
291

292
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
3,706,430✔
293
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
3,706,430✔
294
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
295
  //  qError("QInfo:0x%" PRIx64 ", group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
296
  //    return;
297
  //  }
298

299
  int32_t len = 0;
3,706,781✔
300
  terrno = TSDB_CODE_SUCCESS;
3,706,781✔
301

302
  int32_t num = 0;
3,738,620✔
303
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
148,185,320✔
304
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
305
    if (!pInfo->isInit) {
144,479,045✔
306
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
305,539✔
307
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
305,378✔
308
        T_LONG_JMP(pTaskInfo->env, terrno);
4!
309
      }
310
      pInfo->isInit = true;
305,390✔
311
      num++;
305,390✔
312
      continue;
305,390✔
313
    }
314

315
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
144,173,506✔
316
    if (equal) {
144,133,105✔
317
      num++;
128,364,330✔
318
      continue;
128,364,330✔
319
    }
320

321
    // The first row of a new block does not belongs to the previous existed group
322
    if (j == 0) {
15,768,775✔
323
      num++;
3,261,740✔
324
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
3,261,740✔
325
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
3,261,304!
326
        T_LONG_JMP(pTaskInfo->env, terrno);
×
327
      }
328
      continue;
3,261,296✔
329
    }
330

331
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
12,507,035✔
332
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
12,537,547✔
333
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
334
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
12,592,225!
335
      T_LONG_JMP(pTaskInfo->env, ret);
×
336
    }
337

338
    int32_t rowIndex = j - num;
12,592,225✔
339
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
12,592,225✔
340
                                                   pOperator->exprSupp.numOfExprs);
341
    if (ret != TSDB_CODE_SUCCESS) {
12,628,173!
342
      T_LONG_JMP(pTaskInfo->env, ret);
×
343
    }
344

345
    // assign the group keys or user input constant values if required
346
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
12,628,173✔
347
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
12,646,924✔
348
    num = 1;
12,515,684✔
349
  }
350

351
  if (num > 0) {
3,706,275!
352
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
3,706,372✔
353
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
3,706,463✔
354
                                          len, pBlock->info.id.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
355
    if (ret != TSDB_CODE_SUCCESS) {
3,706,671!
356
      T_LONG_JMP(pTaskInfo->env, ret);
×
357
    }
358

359
    int32_t rowIndex = pBlock->info.rows - num;
3,706,671✔
360
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
3,706,671✔
361
                                          pOperator->exprSupp.numOfExprs);
362
    if (ret != TSDB_CODE_SUCCESS) {
3,706,635!
363
      T_LONG_JMP(pTaskInfo->env, ret);
×
364
    }
365
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
3,706,635✔
366
  }
367
}
3,706,386✔
368

369
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
7,245,291✔
370
  SGroupbyOperatorInfo* pInfo = pOperator->info;
7,245,291✔
371
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
7,245,291✔
372
  return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap);
7,245,291✔
373
}
374

375
void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
3,377,359✔
376
                                  SDiskbasedBuf* pBuf) {
377
  SGroupbyOperatorInfo* pInfo = pOperator->info;
3,377,359✔
378
  SSHashObj*            pHashmap = pInfo->aggSup.pResultRowHashTable;
3,377,359✔
379
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
3,377,359✔
380

381
  SSDataBlock* pBlock = pInfo->binfo.pRes;
3,377,359✔
382

383
  // set output datablock version
384
  pBlock->info.version = pTaskInfo->version;
3,377,359✔
385

386
  blockDataCleanup(pBlock);
3,377,359✔
387
  if (!hasRemainResultByHash(pOperator)) {
3,375,250✔
388
    return;
58,853✔
389
  }
390

391
  pBlock->info.id.groupId = 0;
3,316,967✔
392
  if (!pInfo->binfo.mergeResultBlock) {
3,316,967✔
393
    doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
3,052,689✔
394
                             pHashmap, pOperator->resultInfo.threshold, false);
395
  } else {
396
    while (hasRemainResultByHash(pOperator)) {
528,451✔
397
      doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
264,253✔
398
                               pHashmap, pOperator->resultInfo.threshold, true);
399
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
264,277✔
400
        break;
104✔
401
      }
402
      pBlock->info.id.groupId = 0;
264,173✔
403
    }
404

405
    // clear the group id info in SSDataBlock, since the client does not need it
406
    pBlock->info.id.groupId = 0;
264,292✔
407
  }
408
}
409

410
static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) {
3,378,494✔
411
  int32_t               code = TSDB_CODE_SUCCESS;
3,378,494✔
412
  int32_t               lino = 0;
3,378,494✔
413
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
3,378,494✔
414
  SGroupbyOperatorInfo* pInfo = pOperator->info;
3,378,494✔
415
  SSDataBlock*          pRes = pInfo->binfo.pRes;
3,378,494✔
416

417
  // after filter, if result block turn to null, get next from whole set
418
  while (1) {
419
    doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,378,494✔
420

421
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
3,351,691✔
422
    QUERY_CHECK_CODE(code, lino, _end);
3,351,441!
423

424
    if (!hasRemainResultByHash(pOperator)) {
3,351,441✔
425
      setOperatorCompleted(pOperator);
363,825✔
426
      // clean hash after completed
427
      tSimpleHashCleanup(pInfo->aggSup.pResultRowHashTable);
364,382✔
428
      pInfo->aggSup.pResultRowHashTable = NULL;
364,153✔
429
      break;
364,153✔
430
    }
431
    if (pRes->info.rows > 0) {
2,987,952!
432
      break;
2,987,952✔
433
    }
434
  }
435

436
  pOperator->resultInfo.totalRows += pRes->info.rows;
3,352,105✔
437

438
_end:
3,352,105✔
439
  if (code != TSDB_CODE_SUCCESS) {
3,352,105!
440
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
441
    T_LONG_JMP(pTaskInfo->env, code);
×
442
  }
443
  return (pRes->info.rows == 0) ? NULL : pRes;
3,352,105✔
444
}
445

446
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,681,761✔
447
  int32_t               code = TSDB_CODE_SUCCESS;
3,681,761✔
448
  int32_t               lino = 0;
3,681,761✔
449
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
3,681,761✔
450
  SGroupbyOperatorInfo* pInfo = pOperator->info;
3,681,761✔
451
  SGroupResInfo*        pGroupResInfo = &pInfo->groupResInfo;
3,681,761✔
452
  int32_t               order = pInfo->binfo.inputTsOrder;
3,681,761✔
453
  int64_t               st = taosGetTimestampUs();
3,682,796✔
454

455
  QRY_PARAM_CHECK(ppRes);
3,682,796!
456
  if (pOperator->status == OP_EXEC_DONE) {
3,682,796✔
457
    return code;
304,644✔
458
  }
459

460
  if (pOperator->status == OP_RES_TO_RETURN) {
3,378,152✔
461
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
3,014,854✔
462
    return code;
2,987,137✔
463
  }
464

465
  while (1) {
3,706,269✔
466
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,069,567✔
467
    if (pBlock == NULL) {
4,070,829✔
468
      break;
364,501✔
469
    }
470

471
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
3,706,328✔
472

473
    // the pDataBlock are always the same one, no need to call this again
474
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, order, pBlock->info.scanFlag, true);
3,706,328✔
475
    QUERY_CHECK_CODE(code, lino, _end);
3,706,560!
476

477
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
478
    if (pInfo->scalarSup.pExprInfo != NULL) {
3,706,560✔
479
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
290,747✔
480
                                   pInfo->scalarSup.numOfExprs, NULL);
481
      QUERY_CHECK_CODE(code, lino, _end);
290,675!
482
    }
483

484
    doHashGroupbyAgg(pOperator, pBlock);
3,706,488✔
485
  }
486

487
  pOperator->status = OP_RES_TO_RETURN;
364,501✔
488

489
  // initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
490
  if (pGroupResInfo->pRows != NULL) {
364,501!
491
    taosArrayDestroy(pGroupResInfo->pRows);
×
492
  }
493

494
  if (pGroupResInfo->pBuf) {
364,501!
495
    taosMemoryFree(pGroupResInfo->pBuf);
×
496
    pGroupResInfo->pBuf = NULL;
×
497
  }
498

499
  pGroupResInfo->index = 0;
364,501✔
500
  pGroupResInfo->iter = 0;
364,501✔
501
  pGroupResInfo->dataPos = NULL;
364,501✔
502

503
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
364,525✔
504

505
_end:
364,525✔
506
  if (code != TSDB_CODE_SUCCESS) {
364,525!
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
508
    pTaskInfo->code = code;
×
509
    T_LONG_JMP(pTaskInfo->env, code);
×
510
  } else {
511
    (*ppRes) = buildGroupResultDataBlockByHash(pOperator);
364,525✔
512
  }
513

514
  return code;
364,555✔
515
}
516

517
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
364,211✔
518
                                SOperatorInfo** pOptrInfo) {
519
  QRY_PARAM_CHECK(pOptrInfo);
364,211!
520

521
  int32_t               code = TSDB_CODE_SUCCESS;
364,211✔
522
  int32_t               lino = 0;
364,211✔
523
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
364,211!
524
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
364,176!
525
  if (pInfo == NULL || pOperator == NULL) {
364,297!
526
    code = terrno;
×
527
    goto _error;
×
528
  }
529

530
  pOperator->exprSupp.hasWindowOrGroup = true;
364,318✔
531

532
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
364,318✔
533
  if (pResBlock == NULL) {
364,463!
534
    code = terrno;
×
535
    goto _error;
×
536
  }
537
  initBasicInfo(&pInfo->binfo, pResBlock);
364,463✔
538

539
  pInfo->pGroupCols = NULL;
364,497✔
540
  code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols);
364,497✔
541
  QUERY_CHECK_CODE(code, lino, _error);
364,379!
542

543
  int32_t    numOfScalarExpr = 0;
364,379✔
544
  SExprInfo* pScalarExprInfo = NULL;
364,379✔
545
  if (pAggNode->pExprs != NULL) {
364,379✔
546
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
110,637✔
547
    QUERY_CHECK_CODE(code, lino, _error);
110,578!
548
  }
549

550
  code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
364,320✔
551
  QUERY_CHECK_CODE(code, lino, _error);
364,302!
552

553
  initResultSizeInfo(&pOperator->resultInfo, 4096);
364,302✔
554
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
364,232✔
555
  QUERY_CHECK_CODE(code, lino, _error);
364,439!
556

557
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
364,439✔
558
  QUERY_CHECK_CODE(code, lino, _error);
364,556!
559

560
  int32_t    num = 0;
364,556✔
561
  SExprInfo* pExprInfo = NULL;
364,556✔
562

563
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
364,556✔
564
  QUERY_CHECK_CODE(code, lino, _error);
364,430!
565

566
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
364,430✔
567
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
364,430✔
568
  QUERY_CHECK_CODE(code, lino, _error);
364,456!
569

570
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
364,456✔
571
  QUERY_CHECK_CODE(code, lino, _error);
364,464!
572

573
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
364,464✔
574
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
364,438✔
575

576
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
364,380✔
577
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
364,380✔
578
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
364,380✔
579

580
  pInfo->pOperator = pOperator;
364,380✔
581

582
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo,
364,380✔
583
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
584
  code = appendDownstream(pOperator, &downstream, 1);
364,366✔
585
  QUERY_CHECK_CODE(code, lino, _error);
364,498!
586

587
  *pOptrInfo = pOperator;
364,498✔
588
  return TSDB_CODE_SUCCESS;
364,498✔
589

590
_error:
×
591
  if (pInfo != NULL) destroyGroupOperatorInfo(pInfo);
×
592
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
593
  pTaskInfo->code = code;
×
594
  return code;
×
595
}
596

597
SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) {
×
598
  int32_t code = TSDB_CODE_SUCCESS;
×
599
  int32_t lino = 0;
×
600
  if (pDataBlock == NULL) {
×
601
    return NULL;
×
602
  }
603

604
  SSDataBlock* pDstBlock = NULL;
×
605
  code = createDataBlock(&pDstBlock);
×
606
  QUERY_CHECK_CODE(code, lino, _end);
×
607

608
  pDstBlock->info = pDataBlock->info;
×
609
  pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
×
610
  pDstBlock->info.capacity = 0;
×
611
  pDstBlock->info.rowSize = 0;
×
612

613
  size_t numOfCols = pOperator->exprSupp.numOfExprs;
×
614
  if (pDataBlock->pBlockAgg) {
×
615
    pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
×
616
    if (pDstBlock->pBlockAgg == NULL) {
×
617
      blockDataDestroy(pDstBlock);
×
618
      return NULL;
×
619
    }
620
    for (int i = 0; i < numOfCols; ++i) {
×
621
      pDstBlock->pBlockAgg[i].colId = -1;
×
622
    }
623
  }
624

625
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
×
626
    SExprInfo*       pExpr = &pOperator->exprSupp.pExprInfo[i];
×
627
    int32_t          slotId = pExpr->base.pParam[0].pCol->slotId;
×
628
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
×
629
    SColumnInfoData  colInfo = {.hasNull = true, .info = pSrc->info};
×
630
    code = blockDataAppendColInfo(pDstBlock, &colInfo);
×
631
    QUERY_CHECK_CODE(code, lino, _end);
×
632

633
    SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
×
634
    if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) {
×
635
      pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
×
636
    } else {
637
      code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
×
638
      QUERY_CHECK_CODE(code, lino, _end);
×
639

640
      code = colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
×
641
      QUERY_CHECK_CODE(code, lino, _end);
×
642
    }
643
  }
644

645
_end:
×
646
  if (code != TSDB_CODE_SUCCESS) {
×
647
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
648
    blockDataDestroy(pDstBlock);
×
649
    return NULL;
×
650
  }
651
  return pDstBlock;
×
652
}
653

654
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
205,443✔
655
  int32_t                 code = TSDB_CODE_SUCCESS;
205,443✔
656
  int32_t                 lino = 0;
205,443✔
657
  SPartitionOperatorInfo* pInfo = pOperator->info;
205,443✔
658
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
205,443✔
659

660
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
50,323,076✔
661
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
50,117,699✔
662
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
49,391,135✔
663

664
    SDataGroupInfo* pGroupInfo = NULL;
49,542,230✔
665
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
49,542,230✔
666
    if (pPage == NULL) {
50,476,086!
667
      T_LONG_JMP(pTaskInfo->env, terrno);
×
668
    }
669

670
    pGroupInfo->numOfRows += 1;
50,476,086✔
671

672
    // group id
673
    if (pGroupInfo->groupId == 0) {
50,476,086✔
674
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
161,807✔
675
    }
676

677
    if (pBlock->info.dataLoad) {
50,476,127!
678
      // number of rows
679
      int32_t* rows = (int32_t*)pPage;
50,476,127✔
680

681
      size_t numOfCols = pOperator->exprSupp.numOfExprs;
50,476,127✔
682
      for (int32_t i = 0; i < numOfCols; ++i) {
253,981,708✔
683
        SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
202,462,864✔
684
        int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
202,462,864✔
685

686
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
202,462,864✔
687

688
        int32_t bytes = pColInfoData->info.bytes;
202,122,884✔
689
        int32_t startOffset = pInfo->columnOffset[i];
202,122,884✔
690

691
        int32_t* columnLen = NULL;
202,122,884✔
692
        int32_t  contentLen = 0;
202,122,884✔
693

694
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
260,432,166!
695
          int32_t* offset = (int32_t*)((char*)pPage + startOffset);
58,309,282✔
696
          columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
58,309,282✔
697
          char* data = (char*)((char*)columnLen + sizeof(int32_t));
58,309,282✔
698

699
          if (colDataIsNull_s(pColInfoData, j)) {
116,618,564✔
700
            offset[(*rows)] = -1;
2,898,518✔
701
            contentLen = 0;
2,898,518✔
702
          } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
55,410,764✔
703
            offset[*rows] = (*columnLen);
64✔
704
            char*   src = colDataGetData(pColInfoData, j);
64!
705
            int32_t dataLen = getJsonValueLen(src);
64✔
706

707
            memcpy(data + (*columnLen), src, dataLen);
64✔
708
            int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
64✔
709
            QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
64!
710

711
            contentLen = dataLen;
64✔
712
          } else {
713
            offset[*rows] = (*columnLen);
55,410,700✔
714
            char* src = colDataGetData(pColInfoData, j);
55,410,700!
715
            memcpy(data + (*columnLen), src, varDataTLen(src));
55,410,700✔
716
            int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
55,410,700✔
717
            QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
55,410,700!
718

719
            contentLen = varDataTLen(src);
55,410,700✔
720
          }
721
        } else {
722
          char* bitmap = (char*)pPage + startOffset;
143,813,602✔
723
          columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
143,813,602✔
724
          char* data = (char*)columnLen + sizeof(int32_t);
143,813,602✔
725

726
          bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
143,813,602✔
727
          if (isNull) {
143,813,602✔
728
            colDataSetNull_f(bitmap, (*rows));
7,838,830✔
729
          } else {
730
            memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
135,974,772!
731
            QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
135,974,772!
732
                                  lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
733
          }
734
          contentLen = bytes;
145,196,299✔
735
        }
736

737
        (*columnLen) += contentLen;
203,505,581✔
738
      }
739

740
      (*rows) += 1;
51,518,844✔
741

742
      setBufPageDirty(pPage, true);
51,518,844✔
743
      releaseBufPage(pInfo->pBuf, pPage);
50,397,894✔
744
    } else {
745
      SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock);
×
746
      if (dataNotLoadBlock == NULL) {
×
747
        T_LONG_JMP(pTaskInfo->env, terrno);
×
748
      }
749
      if (pGroupInfo->blockForNotLoaded == NULL) {
×
750
        pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
×
751
        QUERY_CHECK_NULL(pGroupInfo->blockForNotLoaded, code, lino, _end, terrno);
×
752
        pGroupInfo->offsetForNotLoaded = 0;
×
753
      }
754
      dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
×
755
      dataNotLoadBlock->info.dataLoad = 0;
×
756
      void* tmp = taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
×
757
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
758
      break;
×
759
    }
760
  }
761

762
_end:
205,377✔
763
  if (code != TSDB_CODE_SUCCESS) {
205,377!
764
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
765
    T_LONG_JMP(pTaskInfo->env, code);
×
766
  }
767
}
205,377✔
768

769
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
49,648,222✔
770
  int32_t         code = TSDB_CODE_SUCCESS;
49,648,222✔
771
  int32_t         lino = 0;
49,648,222✔
772
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
49,648,222✔
773

774
  void* pPage = NULL;
51,326,311✔
775
  if (p == NULL) {  // it is a new group
51,326,311✔
776
    SDataGroupInfo gi = {0};
161,782✔
777
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
161,782✔
778
    QUERY_CHECK_NULL(gi.pPageList, code, lino, _end, terrno);
161,814!
779

780
    code = taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
161,814✔
781
    if (code == TSDB_CODE_DUP_KEY) {
161,839!
782
      code = TSDB_CODE_SUCCESS;
×
783
    }
784
    QUERY_CHECK_CODE(code, lino, _end);
161,839!
785

786
    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
161,839✔
787

788
    int32_t pageId = 0;
161,807✔
789
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
161,807✔
790
    if (pPage == NULL) {
161,836!
791
      return pPage;
×
792
    }
793

794
    void* tmp = taosArrayPush(p->pPageList, &pageId);
161,836✔
795
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
161,829!
796

797
    *(int32_t*)pPage = 0;
161,829✔
798
  } else {
799
    int32_t* curId = taosArrayGetLast(p->pPageList);
51,164,529✔
800
    pPage = getBufPage(pInfo->pBuf, *curId);
51,049,834✔
801
    if (pPage == NULL) {
50,370,446!
802
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
803
      return pPage;
×
804
    }
805

806
    int32_t* rows = (int32_t*)pPage;
50,370,446✔
807
    if (*rows >= pInfo->rowCapacity) {
50,370,446✔
808
      // release buffer
809
      releaseBufPage(pInfo->pBuf, pPage);
1,101,648✔
810

811
      // add a new page for current group
812
      int32_t pageId = 0;
1,100,877✔
813
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
1,100,877✔
814
      if (pPage == NULL) {
1,104,806!
815
        qError("failed to get new buffer, code:%s", tstrerror(terrno));
×
816
        return NULL;
×
817
      }
818

819
      void* tmp = taosArrayPush(p->pPageList, &pageId);
1,104,806✔
820
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,104,332!
821

822
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
1,104,332✔
823
    }
824
  }
825

826
  *pGroupInfo = p;
50,534,669✔
827

828
_end:
50,534,669✔
829
  if (code != TSDB_CODE_SUCCESS) {
50,534,669!
830
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
831
    return NULL;
×
832
  }
833

834
  return pPage;
50,534,669✔
835
}
836

837
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
25,787✔
838
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
25,787✔
839
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
25,824!
840
  if (!offset) {
25,842!
841
    return NULL;
×
842
  }
843

844
  offset[0] = sizeof(int32_t) +
25,842✔
845
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
846

847
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
221,693✔
848
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
195,786✔
849

850
    int32_t bytes = pColInfoData->info.bytes;
195,851✔
851
    int32_t payloadLen = bytes * rowCapacity;
195,851✔
852

853
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
195,851!
854
      // offset segment + content length + payload
855
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
18,078✔
856
    } else {
857
      // bitmap + content length + payload
858
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
177,773✔
859
    }
860
  }
861

862
  return offset;
25,907✔
863
}
864

865
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
25,702✔
866
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
25,702✔
867
  for (int32_t i = 0; i < size; i++) {
186,135✔
868
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
160,431✔
869
    if (pGp && pGp->blockForNotLoaded) {
160,431!
870
      for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
×
871
        SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
×
872
        if (pBlock) blockDataDestroy(*pBlock);
×
873
      }
874
      taosArrayClear(pGp->blockForNotLoaded);
×
875
      pGp->offsetForNotLoaded = 0;
×
876
    }
877
    taosArrayDestroy(pGp->pPageList);
160,431✔
878
  }
879
  taosArrayClear(pInfo->sortedGroupArray);
25,704✔
880
  clearDiskbasedBuf(pInfo->pBuf);
25,701✔
881
}
25,697✔
882

883
static int compareDataGroupInfo(const void* group1, const void* group2) {
586,820✔
884
  const SDataGroupInfo* pGroupInfo1 = group1;
586,820✔
885
  const SDataGroupInfo* pGroupInfo2 = group2;
586,820✔
886

887
  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
586,820!
888
    return 0;
×
889
  }
890

891
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
586,820✔
892
}
893

894
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
161,013✔
895
  if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
161,013!
896
    SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
×
897
    if (!pBlock) {
×
898
      return NULL;
×
899
    }
900
    pGroupInfo->offsetForNotLoaded++;
×
901
    return *pBlock;
×
902
  }
903
  return NULL;
161,013✔
904
}
905

906
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
1,402,497✔
907
  int32_t                 code = TSDB_CODE_SUCCESS;
1,402,497✔
908
  int32_t                 lino = 0;
1,402,497✔
909
  SPartitionOperatorInfo* pInfo = pOperator->info;
1,402,497✔
910
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
1,402,497✔
911

912
  if (pInfo->remainRows == 0) {
1,402,497✔
913
    blockDataCleanup(pInfo->binfo.pRes);
1,203,668✔
914
    SDataGroupInfo* pGroupInfo =
1,203,773✔
915
        (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
1,203,896✔
916
    if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
1,203,773✔
917
      if (pGroupInfo != NULL) {
186,864✔
918
        SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
161,013✔
919
        if (ret != NULL) return ret;
161,012!
920
      }
921
      // try next group data
922
      if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
186,863✔
923
        setOperatorCompleted(pOperator);
25,703✔
924
        clearPartitionOperator(pInfo);
25,704✔
925
        return NULL;
25,698✔
926
      }
927
      ++pInfo->groupIndex;
161,153✔
928

929
      pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
161,153✔
930
      if (pGroupInfo == NULL) {
161,150!
931
        qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
932
        T_LONG_JMP(pTaskInfo->env, terrno);
×
933
      }
934
      pInfo->pageIndex = 0;
161,150✔
935
    }
936

937
    int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
1,178,000✔
938
    if (pageId == NULL) {
1,177,750!
939
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
940
      T_LONG_JMP(pTaskInfo->env, terrno);
×
941
    }
942
    void*    page = getBufPage(pInfo->pBuf, *pageId);
1,177,750✔
943
    if (page == NULL) {
1,177,345!
944
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
945
      T_LONG_JMP(pTaskInfo->env, terrno);
×
946
    }
947
    if (*(int32_t*)page == 0) {
1,177,502!
948
      releaseBufPage(pInfo->pBuf, page);
×
949
      SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
×
950
      if (ret != NULL) return ret;
×
951
      if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
×
952
        pInfo->groupIndex++;
×
953
        pInfo->pageIndex = 0;
×
954
      } else {
955
        setOperatorCompleted(pOperator);
×
956
        clearPartitionOperator(pInfo);
×
957
        return NULL;
×
958
      }
959
      return buildPartitionResult(pOperator);
×
960
    }
961

962
    code = blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
1,177,502✔
963
    QUERY_CHECK_CODE(code, lino, _end);
1,177,495!
964

965
    code = blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
1,177,495✔
966
    QUERY_CHECK_CODE(code, lino, _end);
1,177,821!
967

968
    pInfo->pageIndex += 1;
1,177,821✔
969
    releaseBufPage(pInfo->pBuf, page);
1,177,821✔
970
    pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
1,177,826✔
971
    pInfo->binfo.pRes->info.dataLoad = 1;
1,177,826✔
972
    pInfo->orderedRows = 0;
1,177,826✔
973
  } else if (pInfo->pOrderInfoArr == NULL) {
198,829!
974
    qError("Exception, remainRows not zero, but pOrderInfoArr is NULL");
×
975
  }
976

977
  if (pInfo->pOrderInfoArr) {
1,376,655✔
978
    pInfo->binfo.pRes->info.rows += pInfo->remainRows;
595,642✔
979
    code = blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows);
595,642✔
980
    QUERY_CHECK_CODE(code, lino, _end);
595,678!
981
    pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr);
595,678✔
982
    pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows;
595,852✔
983
    pInfo->binfo.pRes->info.rows = pInfo->orderedRows;
595,852✔
984
  }
985

986
  code = blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
1,376,865✔
987
  QUERY_CHECK_CODE(code, lino, _end);
1,376,751!
988

989
_end:
1,376,751✔
990
  if (code != TSDB_CODE_SUCCESS) {
1,376,751!
991
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
992
    T_LONG_JMP(pTaskInfo->env, code);
×
993
  }
994

995
  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
1,376,751✔
996
  return pInfo->binfo.pRes;
1,376,751✔
997
}
998

999
static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,403,671✔
1000
  if (pOperator->status == OP_EXEC_DONE) {
1,403,671✔
1001
    (*ppRes) = NULL;
1,309✔
1002
    return TSDB_CODE_SUCCESS;
1,309✔
1003
  }
1004

1005
  int32_t                 code = TSDB_CODE_SUCCESS;
1,402,362✔
1006
  int32_t                 lino = 0;
1,402,362✔
1007
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
1,402,362✔
1008
  SPartitionOperatorInfo* pInfo = pOperator->info;
1,402,362✔
1009
  SSDataBlock*            pRes = pInfo->binfo.pRes;
1,402,362✔
1010

1011
  if (pOperator->status == OP_RES_TO_RETURN) {
1,402,362✔
1012
    (*ppRes) =  buildPartitionResult(pOperator);
1,376,644✔
1013
    return code;
1,376,600✔
1014
  }
1015

1016
  int64_t        st = taosGetTimestampUs();
25,761✔
1017
  SOperatorInfo* downstream = pOperator->pDownstream[0];
25,761✔
1018

1019
  while (1) {
205,375✔
1020
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
231,136✔
1021
    if (pBlock == NULL) {
231,278✔
1022
      break;
25,821✔
1023
    }
1024

1025
    pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
205,457✔
1026
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1027
    if (pInfo->scalarSup.pExprInfo != NULL) {
205,457✔
1028
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
3,277✔
1029
                                   pInfo->scalarSup.numOfExprs, NULL);
1030
      QUERY_CHECK_CODE(code, lino, _end);
3,277!
1031
    }
1032

1033
    terrno = TSDB_CODE_SUCCESS;
205,457✔
1034
    doHashPartition(pOperator, pBlock);
205,456✔
1035
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
205,378!
1036
      code = terrno;
×
1037
      QUERY_CHECK_CODE(code, lino, _end);
×
1038
    }
1039
  }
1040

1041
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
25,821✔
1042
  QUERY_CHECK_NULL(groupArray, code, lino, _end, terrno);
25,843!
1043

1044
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
25,843✔
1045
  while (pGroupIter != NULL) {
187,703✔
1046
    SDataGroupInfo* pGroupInfo = pGroupIter;
161,863✔
1047
    void*           tmp = taosArrayPush(groupArray, pGroupInfo);
161,864✔
1048
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
161,864!
1049
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
161,864✔
1050
  }
1051

1052
  taosArraySort(groupArray, compareDataGroupInfo);
25,840✔
1053
  pInfo->sortedGroupArray = groupArray;
25,828✔
1054
  pInfo->groupIndex = -1;
25,828✔
1055
  taosHashClear(pInfo->pGroupSet);
25,828✔
1056

1057
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
25,848✔
1058

1059
  pOperator->status = OP_RES_TO_RETURN;
25,848✔
1060
  code = blockDataEnsureCapacity(pRes, 4096);
25,848✔
1061
  QUERY_CHECK_CODE(code, lino, _end);
25,852!
1062

1063
_end:
25,852✔
1064
  if (code != TSDB_CODE_SUCCESS) {
25,852!
1065
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1066
    pTaskInfo->code = code;
×
1067
    T_LONG_JMP(pTaskInfo->env, code);
×
1068
  }
1069

1070
  (*ppRes) = buildPartitionResult(pOperator);
25,852✔
1071
  return code;
25,841✔
1072
}
1073

1074
static void destroyPartitionOperatorInfo(void* param) {
25,846✔
1075
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
25,846✔
1076
  cleanupBasicInfo(&pInfo->binfo);
25,846✔
1077
  taosArrayDestroy(pInfo->pGroupCols);
25,851✔
1078

1079
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
98,109✔
1080
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
72,253✔
1081
    taosMemoryFree(key.pData);
72,250✔
1082
  }
1083

1084
  taosArrayDestroy(pInfo->pGroupColVals);
25,848✔
1085
  taosMemoryFree(pInfo->keyBuf);
25,851!
1086

1087
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
25,852✔
1088
  for (int32_t i = 0; i < size; i++) {
27,280✔
1089
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
1,428✔
1090
    if (pGp) {
1,428!
1091
      taosArrayDestroy(pGp->pPageList);
1,428✔
1092
    }
1093
  }
1094
  taosArrayDestroy(pInfo->sortedGroupArray);
25,852✔
1095

1096
  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
25,852✔
1097
  while (pGroupIter != NULL) {
25,848!
1098
    SDataGroupInfo* pGroupInfo = pGroupIter;
×
1099
    taosArrayDestroy(pGroupInfo->pPageList);
×
1100
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
×
1101
  }
1102

1103
  taosHashCleanup(pInfo->pGroupSet);
25,848✔
1104
  taosMemoryFree(pInfo->columnOffset);
25,852!
1105

1106
  cleanupExprSupp(&pInfo->scalarSup);
25,852✔
1107
  destroyDiskbasedBuf(pInfo->pBuf);
25,851✔
1108
  taosArrayDestroy(pInfo->pOrderInfoArr);
25,849✔
1109
  taosMemoryFreeClear(param);
25,846!
1110
}
25,850✔
1111

1112
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
25,794✔
1113
                                           SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1114
  QRY_PARAM_CHECK(pOptrInfo);
25,794!
1115

1116
  int32_t                 code = TSDB_CODE_SUCCESS;
25,794✔
1117
  int32_t                 lino = 0;
25,794✔
1118
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
25,794!
1119
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
25,798!
1120
  if (pInfo == NULL || pOperator == NULL) {
25,817!
1121
    pTaskInfo->code = code = terrno;
1✔
1122
    goto _error;
×
1123
  }
1124

1125
  int32_t    numOfCols = 0;
25,816✔
1126
  SExprInfo* pExprInfo = NULL;
25,816✔
1127
  code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
25,816✔
1128
  QUERY_CHECK_CODE(code, lino, _error);
25,843!
1129
  pOperator->exprSupp.numOfExprs = numOfCols;
25,843✔
1130
  pOperator->exprSupp.pExprInfo = pExprInfo;
25,843✔
1131

1132
  pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
25,843✔
1133

1134
  if (pPartNode->needBlockOutputTsOrder) {
25,845✔
1135
    SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
2,545✔
1136
    pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
2,545✔
1137
    if (!pInfo->pOrderInfoArr) {
2,544!
1138
      pTaskInfo->code = terrno;
×
1139
      goto _error;
×
1140
    }
1141

1142
    void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
2,544✔
1143
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
2,545!
1144
  }
1145

1146
  if (pPartNode->pExprs != NULL) {
25,845✔
1147
    int32_t    num = 0;
3,844✔
1148
    SExprInfo* pExprInfo1 = NULL;
3,844✔
1149
    code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num);
3,844✔
1150
    QUERY_CHECK_CODE(code, lino, _error);
3,844!
1151

1152
    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
3,844✔
1153
    QUERY_CHECK_CODE(code, lino, _error);
3,844!
1154
  }
1155

1156
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
25,845✔
1157
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
25,804✔
1158
  if (pInfo->pGroupSet == NULL) {
25,843!
1159
    goto _error;
×
1160
  }
1161

1162
  uint32_t defaultPgsz = 0;
25,843✔
1163
  int64_t defaultBufsz = 0;
25,843✔
1164

1165
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
25,843✔
1166
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
25,852!
1167
  code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
25,852✔
1168
  if (code != TSDB_CODE_SUCCESS) {
25,843!
1169
    goto _error;
×
1170
  }
1171

1172
  if (!osTempSpaceAvailable()) {
25,843!
1173
    terrno = TSDB_CODE_NO_DISKSPACE;
×
1174
    qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir);
×
1175
    goto _error;
×
1176
  }
1177

1178
  code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
25,829✔
1179
  if (code != TSDB_CODE_SUCCESS) {
25,826!
1180
    goto _error;
×
1181
  }
1182

1183
  pInfo->rowCapacity =
25,818✔
1184
      blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
25,831✔
1185
                                blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
25,826✔
1186
  if (pInfo->rowCapacity < 0) {
25,818!
1187
    code = terrno;
×
1188
    goto _error;
×
1189
  }
1190
  
1191
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
25,818✔
1192
  QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
25,830!
1193

1194
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
25,830✔
1195
  if (code != TSDB_CODE_SUCCESS) {
25,826!
1196
    goto _error;
×
1197
  }
1198

1199
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
25,826✔
1200
                  pTaskInfo);
1201

1202
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartitionNext, NULL, destroyPartitionOperatorInfo,
25,813✔
1203
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1204

1205
  code = appendDownstream(pOperator, &downstream, 1);
25,816✔
1206
  if (code != TSDB_CODE_SUCCESS) {
25,838!
1207
    goto _error;
×
1208
  }
1209

1210
  *pOptrInfo = pOperator;
25,838✔
1211
  return TSDB_CODE_SUCCESS;
25,838✔
1212

1213
_error:
×
1214
  if (pInfo != NULL) {
×
1215
    destroyPartitionOperatorInfo(pInfo);
×
1216
  }
1217
  pTaskInfo->code = code;
×
1218
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1219
  TAOS_RETURN(code);
×
1220
}
1221

1222
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
16,306,871✔
1223
                                int32_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
1224
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
16,306,871✔
1225
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
16,306,871✔
1226
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
16,306,871✔
1227

1228
  SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo,
16,306,871✔
1229
                                                  false, pAggSup, false);
1230
  if (pResultRow == NULL || pTaskInfo->code != 0) {
16,499,369!
1231
    return pTaskInfo->code;
×
1232
  }
1233

1234
  return setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
16,502,427✔
1235
}
1236

1237
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
2,141✔
1238
  if (pExprSup->pExprInfo != NULL) {
2,141✔
1239
    int32_t code =
1240
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
833✔
1241
    if (code != TSDB_CODE_SUCCESS) {
833!
1242
      qError("calaculate group id error, code:%d", code);
×
1243
    }
1244
  }
1245
  recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
2,141✔
1246
  int32_t  len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
2,141✔
1247
  uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
2,141✔
1248
  return groupId;
2,141✔
1249
}
1250

1251
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
1,474,711✔
1252
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
744,385✔
1253

1254
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
730,944✔
1255
  int32_t        code = TSDB_CODE_SUCCESS;
730,944✔
1256
  int32_t        lino = 0;
730,944✔
1257
  SStorageAPI*   pAPI = &pOperator->pTaskInfo->storageAPI;
730,944✔
1258
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
730,944✔
1259

1260
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
730,944✔
1261
  SSDataBlock*                  pDest = pInfo->binfo.pRes;
730,944✔
1262
  QUERY_CHECK_CONDITION((hasRemainPartion(pInfo)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
730,944!
1263
  SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
730,944✔
1264
  blockDataCleanup(pDest);
730,944✔
1265
  int32_t rows = taosArrayGetSize(pParInfo->rowIds);
730,943✔
1266
  code = blockDataEnsureCapacity(pDest, rows);
730,943✔
1267
  QUERY_CHECK_CODE(code, lino, _end);
730,943!
1268

1269
  SSDataBlock* pSrc = pInfo->pInputDataBlock;
730,943✔
1270
  for (int32_t i = 0; i < rows; i++) {
1,462,621✔
1271
    int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
731,686✔
1272
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
3,703,099✔
1273
      int32_t          slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
2,971,421✔
1274
      SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
2,971,421✔
1275
      QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
2,971,471!
1276
      SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
2,971,471✔
1277
      QUERY_CHECK_NULL(pDestCol, code, lino, _end, terrno);
2,971,457!
1278
      bool             isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
2,971,490✔
1279
      char*            pSrcData = NULL;
2,971,490✔
1280
      if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
2,971,490!
1281
      code = colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull);
2,971,490✔
1282
      QUERY_CHECK_CODE(code, lino, _end);
2,971,413!
1283
    }
1284
    pDest->info.rows++;
731,678✔
1285
  }
1286
  pDest->info.parTbName[0] = 0;
730,935✔
1287
  if (pInfo->tbnameCalSup.numOfExprs > 0) {
730,935✔
1288
    void*   tbname = NULL;
4,924✔
1289
    int32_t winCode = TSDB_CODE_SUCCESS;
4,924✔
1290
    code = pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname,
4,924✔
1291
                                                  false, &winCode);
1292
    QUERY_CHECK_CODE(code, lino, _end);
4,921!
1293

1294
    if (winCode == TSDB_CODE_SUCCESS) {
4,921✔
1295
      memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
4,783✔
1296
      pAPI->stateStore.streamStateFreeVal(tbname);
4,783✔
1297
    }
1298
  }
1299
  taosArrayDestroy(pParInfo->rowIds);
730,932✔
1300
  pParInfo->rowIds = NULL;
730,943✔
1301
  pDest->info.dataLoad = 1;
730,943✔
1302

1303
  code = blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
730,943✔
1304
  QUERY_CHECK_CODE(code, lino, _end);
730,944!
1305

1306
  pDest->info.id.groupId = pParInfo->groupId;
730,944✔
1307
  pOperator->resultInfo.totalRows += pDest->info.rows;
730,944✔
1308
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
730,944✔
1309
  QUERY_CHECK_CONDITION((pDest->info.rows > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
730,944!
1310

1311
_end:
730,944✔
1312
  if (code != TSDB_CODE_SUCCESS) {
730,944!
1313
    blockDataCleanup(pDest);
×
1314
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1315
  }
1316
  printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
730,944✔
1317
  return pDest;
730,944✔
1318
}
1319

1320
int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
25,357✔
1321
                             SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, SStateStore* pAPI) {
1322
  int32_t code = TSDB_CODE_SUCCESS;
25,357✔
1323
  int32_t lino = 0;
25,357✔
1324
  void*   pValue = NULL;
25,357✔
1325
  int32_t winCode = TSDB_CODE_SUCCESS;
25,357✔
1326
  code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
25,357✔
1327
  QUERY_CHECK_CODE(code, lino, _end);
25,372!
1328
  if (winCode != TSDB_CODE_SUCCESS) {
25,372✔
1329
    SSDataBlock* pTmpBlock = NULL;
7,302✔
1330
    code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
7,302✔
1331
    QUERY_CHECK_CODE(code, lino, _end);
7,298!
1332

1333
    memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
7,298✔
1334
    pTmpBlock->info.id.groupId = groupId;
7,298✔
1335
    char* tbName = pSrcBlock->info.parTbName;
7,298✔
1336
    if (pTableSup->numOfExprs > 0) {
7,298✔
1337
      code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
5,813✔
1338
                                   NULL);
1339
      QUERY_CHECK_CODE(code, lino, _end);
5,811!
1340

1341
      SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
5,811✔
1342
      QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
5,811✔
1343
      memset(tbName, 0, TSDB_TABLE_NAME_LEN);
5,810✔
1344
      int32_t len = 0;
5,810✔
1345
      if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
11,620✔
1346
        len = 1;
137✔
1347
        tbName[0] = 0;
137✔
1348
      } else {
1349
        void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
5,673!
1350
        len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
5,673✔
1351
        memcpy(tbName, varDataVal(pData), len);
5,673✔
1352
        code = pAPI->streamStatePutParName(pState, groupId, tbName);
5,673✔
1353
        QUERY_CHECK_CODE(code, lino, _end);
5,672!
1354
      }
1355
      memcpy(pTmpBlock->info.parTbName, tbName, len);
5,809✔
1356
      pDestBlock->info.rows--;
5,809✔
1357
    } else {
1358
      void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
1,485✔
1359
      QUERY_CHECK_NULL(pTbNameCol, code, lino, _end, terrno);
1,485!
1360
      colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
1,485!
1361
      tbName[0] = 0;
1,485✔
1362
    }
1363

1364
    if (pTagSup->numOfExprs > 0) {
7,294✔
1365
      code = projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
4,430✔
1366
      QUERY_CHECK_CODE(code, lino, _end);
4,430!
1367
      pDestBlock->info.rows--;
4,430✔
1368
    } else {
1369
      memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
2,864✔
1370
    }
1371

1372
    void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
7,294✔
1373
    QUERY_CHECK_NULL(pGpIdCol, code, lino, _end, terrno);
7,292!
1374
    code = colDataSetVal(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
7,292✔
1375
    QUERY_CHECK_CODE(code, lino, _end);
7,286!
1376
    pDestBlock->info.rows++;
7,286✔
1377
    blockDataDestroy(pTmpBlock);
7,286✔
1378
  } else {
1379
    memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
18,070✔
1380
  }
1381
  pAPI->streamStateFreeVal(pValue);
25,373✔
1382

1383
_end:
25,373✔
1384
  if (code != TSDB_CODE_SUCCESS) {
25,373!
1385
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1386
  }
1387
  return code;
25,373✔
1388
}
1389

1390
static int32_t buildStreamCreateTableResult(SOperatorInfo* pOperator) {
732,319✔
1391
  int32_t                       code = TSDB_CODE_SUCCESS;
732,319✔
1392
  int32_t                       lino = 0;
732,319✔
1393
  SExecTaskInfo*                pTask = pOperator->pTaskInfo;
732,319✔
1394
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
732,319✔
1395
  SSDataBlock*                  pSrc = pInfo->pInputDataBlock;
732,319✔
1396
  if ((pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0)) {
732,319✔
1397
    pTask->storageAPI.stateStore.streamStateSetParNameInvalid(pTask->streamInfo.pState);
727,049✔
1398
    goto _end;
727,049✔
1399
  }
1400
  if (taosHashGetSize(pInfo->pPartitions) == 0) {
5,270!
UNCOV
1401
    goto _end;
×
1402
  }
1403

1404
  blockDataCleanup(pInfo->pCreateTbRes);
5,269✔
1405
  code = blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
5,266✔
1406
  QUERY_CHECK_CODE(code, lino, _end);
5,266!
1407

1408
  if (pInfo->pTbNameIte != NULL) {
5,266!
1409
    SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
5,266✔
1410
    int32_t             rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
5,266✔
1411
    code = appendCreateTableRow(pTask->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, pParInfo->groupId,
5,266✔
1412
                                pSrc, rowId, pInfo->pCreateTbRes, &pTask->storageAPI.stateStore);
1413
    QUERY_CHECK_CODE(code, lino, _end);
5,269!
1414
    pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
5,269✔
1415
  }
1416

UNCOV
1417
_end:
×
1418
  if (code != TSDB_CODE_SUCCESS) {
732,319!
UNCOV
1419
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1420
  }
1421
  return code;
732,319✔
1422
}
1423

1424
static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
5,453✔
1425
  int32_t code = TSDB_CODE_SUCCESS;
5,453✔
1426
  int32_t lino = 0;
5,453✔
1427
  pInfo->pInputDataBlock = pBlock;
5,453✔
1428
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
737,140✔
1429
    recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
731,684✔
1430
    int32_t             keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
731,670✔
1431
    SPartitionDataInfo* pParData =
1432
        (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
731,670✔
1433
    if (pParData) {
731,681✔
1434
      void* tmp = taosArrayPush(pParData->rowIds, &i);
743✔
1435
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
743!
1436
    } else {
1437
      SPartitionDataInfo newParData = {0};
730,938✔
1438
      newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
730,938✔
1439
      newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
730,927✔
1440
      QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno);
730,939!
1441
      void* tmp = taosArrayPush(newParData.rowIds, &i);
730,939✔
1442
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
730,939!
1443

1444
      code =
1445
          taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
730,939✔
1446
      if (code == TSDB_CODE_DUP_KEY) {
730,944!
UNCOV
1447
        code = TSDB_CODE_SUCCESS;
×
1448
      }
1449
      QUERY_CHECK_CODE(code, lino, _end);
730,944!
1450
    }
1451
  }
1452

1453
_end:
5,456✔
1454
  if (code != TSDB_CODE_SUCCESS) {
5,456!
UNCOV
1455
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1456
  }
1457
}
5,456✔
1458

1459
static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
744,386✔
1460
  int32_t                       code = TSDB_CODE_SUCCESS;
744,386✔
1461
  int32_t                       lino = 0;
744,386✔
1462
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
744,386✔
1463
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
744,386✔
1464

1465
  if (pOperator->status == OP_EXEC_DONE) {
744,386!
UNCOV
1466
    (*ppRes) = NULL;
×
UNCOV
1467
    return code;
×
1468
  }
1469

1470
  if (hasRemainTbName(pInfo)) {
744,386✔
1471
    code = buildStreamCreateTableResult(pOperator);
726,865✔
1472
    QUERY_CHECK_CODE(code, lino, _end);
726,865!
1473
    if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
726,865!
1474
      (*ppRes) = pInfo->pCreateTbRes;
608✔
1475
      return code;
608✔
1476
    }
1477
  }
1478

1479
  if (hasRemainPartion(pInfo)) {
743,771✔
1480
    (*ppRes) = buildStreamPartitionResult(pOperator);
728,392✔
1481
    return code;
728,392✔
1482
  }
1483

1484
  int64_t        st = taosGetTimestampUs();
15,383✔
1485
  SOperatorInfo* downstream = pOperator->pDownstream[0];
15,383✔
1486
  {
1487
    pInfo->pInputDataBlock = NULL;
15,383✔
1488
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
15,383✔
1489
    if (pBlock == NULL) {
15,375✔
1490
      setOperatorCompleted(pOperator);
7,666✔
1491
      (*ppRes) = NULL;
7,668✔
1492
      return code;
7,668✔
1493
    }
1494
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
7,709✔
1495
    switch (pBlock->info.type) {
7,709!
1496
      case STREAM_NORMAL:
5,445✔
1497
      case STREAM_PULL_DATA:
1498
      case STREAM_INVALID:
1499
        pInfo->binfo.pRes->info.type = pBlock->info.type;
5,445✔
1500
        break;
5,445✔
1501
      case STREAM_DELETE_DATA: {
657✔
1502
        code = copyDataBlock(pInfo->pDelRes, pBlock);
657✔
1503
        QUERY_CHECK_CODE(code, lino, _end);
657!
1504

1505
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
657✔
1506
        printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
657✔
1507
        (*ppRes) = pInfo->pDelRes;
657✔
1508
        return code;
657✔
1509
      } break;
1510
      case STREAM_RECALCULATE_DATA:
1,607✔
1511
      case STREAM_RECALCULATE_DELETE:
1512
      case STREAM_RECALCULATE_START:
1513
      case STREAM_RECALCULATE_END:
1514
      case STREAM_CREATE_CHILD_TABLE:
1515
      case STREAM_RETRIEVE:
1516
      case STREAM_CHECKPOINT:
1517
      case STREAM_GET_RESULT:
1518
      case STREAM_GET_ALL: {
1519
        (*ppRes) = pBlock;
1,607✔
1520
        return code;
1,607✔
1521
      }
UNCOV
1522
      default:
×
UNCOV
1523
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
UNCOV
1524
        QUERY_CHECK_CODE(code, lino, _end);
×
1525
    }
1526

1527
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
1528
    if (pInfo->scalarSup.pExprInfo != NULL) {
5,445✔
1529
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
1,666✔
1530
                                   pInfo->scalarSup.numOfExprs, NULL);
1531
      QUERY_CHECK_CODE(code, lino, _end);
1,658!
1532
    }
1533
    taosHashClear(pInfo->pPartitions);
5,437✔
1534
    doStreamHashPartitionImpl(pInfo, pBlock);
5,453✔
1535
  }
1536
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
5,452✔
1537

1538
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
5,452✔
1539
  pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL);
5,452✔
1540
  code = buildStreamCreateTableResult(pOperator);
5,454✔
1541
  QUERY_CHECK_CODE(code, lino, _end);
5,454!
1542
  if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
5,454✔
1543
    (*ppRes) = pInfo->pCreateTbRes;
2,902✔
1544
    return code;
2,902✔
1545
  }
1546
  (*ppRes) = buildStreamPartitionResult(pOperator);
2,552✔
1547
  return code;
2,552✔
1548

UNCOV
1549
_end:
×
UNCOV
1550
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1551
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1552
    pTaskInfo->code = code;
×
UNCOV
1553
    T_LONG_JMP(pTaskInfo->env, code);
×
1554
  }
UNCOV
1555
  (*ppRes) = NULL;
×
UNCOV
1556
  return code;
×
1557
}
1558

1559
static void destroyStreamPartitionOperatorInfo(void* param) {
1,222✔
1560
  SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
1,222✔
1561
  cleanupBasicInfo(&pInfo->binfo);
1,222✔
1562
  taosArrayDestroy(pInfo->partitionSup.pGroupCols);
1,222✔
1563

1564
  for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
5,582✔
1565
    void* tmp = taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
4,359✔
1566
    if (!tmp) {
4,359!
UNCOV
1567
      continue;
×
1568
    }
1569
    SGroupKeys key = *(SGroupKeys*)tmp;
4,359✔
1570
    taosMemoryFree(key.pData);
4,359!
1571
  }
1572
  taosArrayDestroy(pInfo->partitionSup.pGroupColVals);
1,222✔
1573

1574
  taosMemoryFree(pInfo->partitionSup.keyBuf);
1,222!
1575
  cleanupExprSupp(&pInfo->scalarSup);
1,222✔
1576
  cleanupExprSupp(&pInfo->tbnameCalSup);
1,222✔
1577
  cleanupExprSupp(&pInfo->tagCalSup);
1,222✔
1578
  blockDataDestroy(pInfo->pDelRes);
1,222✔
1579
  taosHashCleanup(pInfo->pPartitions);
1,222✔
1580
  blockDataDestroy(pInfo->pCreateTbRes);
1,222✔
1581
  taosMemoryFreeClear(param);
1,222!
1582
}
1,222✔
1583

1584
int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr,
1,222✔
1585
                          SExprSupp* pTbnameExpr, SExprSupp* pResExprSupp, int32_t* pPkColIndex) {
1586
  int32_t      code = TSDB_CODE_SUCCESS;
1,222✔
1587
  int32_t      lino = 0;
1,222✔
1588
  SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
1,222✔
1589

1590
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1,222!
UNCOV
1591
    return code;
×
1592
  }
1593

1594
  SStreamScanInfo* pScanInfo = downstream->info;
1,222✔
1595
  pScanInfo->partitionSup = *pParSup;
1,222✔
1596
  pScanInfo->pPartScalarSup = pExpr;
1,222✔
1597
  pScanInfo->pPartTbnameSup = pTbnameExpr;
1,222✔
1598
  pScanInfo->hasPart = true;
1,222✔
1599
  for (int32_t j = 0; j < pResExprSupp->numOfExprs; j++) {
12,062✔
1600
    if (pScanInfo->primaryKeyIndex == pResExprSupp->pExprInfo[j].base.pParam[0].pCol->slotId) {
10,840✔
1601
      *pPkColIndex = j;
20✔
1602
    }
1603
  }
1604
  if (!pScanInfo->pUpdateInfo && pScanInfo->twAggSup.calTrigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
1,222!
1605
    code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate,
1,221✔
1606
                                           pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
1,221✔
1607
  }
1608
  return code;
1,222✔
1609
}
1610

1611
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
9,360✔
1612
  int32_t      code = TSDB_CODE_SUCCESS;
9,360✔
1613
  int32_t      lino = 0;
9,360✔
1614
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
9,360!
1615
  if (!pBlock) {
9,360!
UNCOV
1616
    return NULL;
×
1617
  }
1618
  pBlock->info.hasVarCol = false;
9,360✔
1619
  pBlock->info.id.groupId = 0;
9,360✔
1620
  pBlock->info.rows = 0;
9,360✔
1621
  pBlock->info.type = STREAM_CREATE_CHILD_TABLE;
9,360✔
1622
  pBlock->info.watermark = INT64_MIN;
9,360✔
1623

1624
  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
9,360✔
1625
  QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno);
9,361✔
1626
  SColumnInfoData infoData = {0};
9,359✔
1627
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,359✔
1628
  if (tbName->numOfExprs > 0) {
9,359✔
1629
    infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes;
3,966✔
1630
  } else {
1631
    infoData.info.bytes = 1;
5,393✔
1632
  }
1633
  pBlock->info.rowSize += infoData.info.bytes;
9,359✔
1634
  // sub table name
1635
  void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
9,359✔
1636
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
9,360!
1637

1638
  SColumnInfoData gpIdData = {0};
9,360✔
1639
  gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
9,360✔
1640
  gpIdData.info.bytes = 8;
9,360✔
1641
  pBlock->info.rowSize += gpIdData.info.bytes;
9,360✔
1642
  // group id
1643
  tmp = taosArrayPush(pBlock->pDataBlock, &gpIdData);
9,360✔
1644
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
9,357!
1645

1646
  for (int32_t i = 0; i < tag->numOfExprs; i++) {
29,440✔
1647
    SColumnInfoData tagCol = {0};
20,083✔
1648
    tagCol.info.type = tag->pExprInfo[i].base.resSchema.type;
20,083✔
1649
    tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes;
20,083✔
1650
    tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
20,083✔
1651
    // tag info
1652
    tmp = taosArrayPush(pBlock->pDataBlock, &tagCol);
20,083✔
1653
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
20,083!
1654
    pBlock->info.rowSize += tagCol.info.bytes;
20,083✔
1655
  }
1656

1657
_end:
9,357✔
1658
  if (code != TSDB_CODE_SUCCESS) {
9,357!
UNCOV
1659
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1660
    blockDataDestroy(pBlock);
×
UNCOV
1661
    return NULL;
×
1662
  }
1663
  return pBlock;
9,357✔
1664
}
1665

1666
void freePartItem(void* ptr) {
730,942✔
1667
  SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
730,942✔
1668
  taosArrayDestroy(pPart->rowIds);
730,942✔
1669
}
730,942✔
1670

1671
int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
1,222✔
1672
                                          SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1673
  QRY_PARAM_CHECK(pOptrInfo);
1,222!
1674

1675
  int32_t                       code = TSDB_CODE_SUCCESS;
1,222✔
1676
  int32_t                       lino = 0;
1,222✔
1677
  SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
1,222!
1678
  SOperatorInfo*                pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,221!
1679
  if (pInfo == NULL || pOperator == NULL) {
1,220!
UNCOV
1680
    code = terrno;
×
UNCOV
1681
    goto _error;
×
1682
  }
1683

1684
  pInfo->partitionSup.pGroupCols = makeColumnArrayFromList(pPartNode->part.pPartitionKeys);
1,220✔
1685

1686
  if (pPartNode->part.pExprs != NULL) {
1,222✔
1687
    int32_t    num = 0;
255✔
1688
    SExprInfo* pCalExprInfo = NULL;
255✔
1689
    code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num);
255✔
1690
    QUERY_CHECK_CODE(code, lino, _error);
255!
1691

1692
    code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore);
255✔
1693
    QUERY_CHECK_CODE(code, lino, _error);
255!
1694
  }
1695

1696
  pInfo->tbnameCalSup.numOfExprs = 0;
1,222✔
1697
  if (pPartNode->pSubtable != NULL) {
1,222✔
1698
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
777!
1699
    QUERY_CHECK_NULL(pSubTableExpr, code, lino, _error, terrno);
778!
1700

1701
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
778✔
1702
    code = createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
778✔
1703
    QUERY_CHECK_CODE(code, lino, _error);
777!
1704

1705
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore);
777✔
1706
    QUERY_CHECK_CODE(code, lino, _error);
778!
1707
  }
1708

1709
  pInfo->tagCalSup.numOfExprs = 0;
1,223✔
1710
  if (pPartNode->pTags != NULL) {
1,223✔
1711
    int32_t    numOfTags;
1712
    SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
404✔
1713
    QUERY_CHECK_NULL(pTagExpr, code, lino, _error, terrno);
404!
1714

1715
    code = initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore);
404✔
1716
    QUERY_CHECK_CODE(code, lino, _error);
404!
1717
  }
1718

1719
  if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
1,223✔
1720
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
883✔
1721
    QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
881!
1722
  } else {
1723
    pInfo->pCreateTbRes = NULL;
340✔
1724
  }
1725

1726
  int32_t keyLen = 0;
1,221✔
1727
  code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
1,221✔
1728
                           pInfo->partitionSup.pGroupCols);
1,221✔
1729
  QUERY_CHECK_CODE(code, lino, _error);
1,222!
1730

1731
  pInfo->partitionSup.needCalc = true;
1,222✔
1732

1733
  pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
1,222✔
1734
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
1,222!
1735

1736
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
1,222✔
1737
  QUERY_CHECK_CODE(code, lino, _error);
1,222!
1738

1739
  pInfo->parIte = NULL;
1,222✔
1740
  pInfo->pTbNameIte = NULL;
1,222✔
1741
  pInfo->pInputDataBlock = NULL;
1,222✔
1742

1743
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1,222✔
1744
  pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
1,222✔
1745
  taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
1,222✔
1746
  pInfo->tsColIndex = 0;
1,222✔
1747

1748
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
1,222✔
1749
  QUERY_CHECK_CODE(code, lino, _error);
1,221!
1750

1751
  int32_t    numOfCols = 0;
1,221✔
1752
  SExprInfo* pExprInfo = NULL;
1,221✔
1753
  code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols);
1,221✔
1754
  QUERY_CHECK_CODE(code, lino, _error);
1,222!
1755

1756
  setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
1,222✔
1757
                  pInfo, pTaskInfo);
1758
  pOperator->exprSupp.numOfExprs = numOfCols;
1,222✔
1759
  pOperator->exprSupp.pExprInfo = pExprInfo;
1,222✔
1760
  pOperator->fpSet =
1761
      createOperatorFpSet(optrDummyOpenFn, doStreamHashPartitionNext, NULL, destroyStreamPartitionOperatorInfo,
1,222✔
1762
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1763
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
1,222✔
1764

1765
  pInfo->basic.primaryPkIndex = -1;
1,222✔
1766
  code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup, &pOperator->exprSupp, &pInfo->basic.primaryPkIndex);
1,222✔
1767
  QUERY_CHECK_CODE(code, lino, _error);
1,222!
1768

1769
  code = appendDownstream(pOperator, &downstream, 1);
1,222✔
1770
  QUERY_CHECK_CODE(code, lino, _error);
1,222!
1771

1772
  *pOptrInfo = pOperator;
1,222✔
1773
  return TSDB_CODE_SUCCESS;
1,222✔
1774

UNCOV
1775
_error:
×
UNCOV
1776
  pTaskInfo->code = code;
×
UNCOV
1777
  if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
×
UNCOV
1778
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
1779
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1780
  return code;
×
1781
}
1782

1783
int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
364,278✔
1784
  int32_t code = TSDB_CODE_SUCCESS;
364,278✔
1785
  int32_t lino = 0;
364,278✔
1786
  size_t  numOfCols = LIST_LENGTH(pNodeList);
364,278!
1787
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
364,278✔
1788
  if (pList == NULL) {
364,312✔
1789
    code = terrno;
2✔
UNCOV
1790
    (*pArrayRes) = NULL;
×
UNCOV
1791
    QUERY_CHECK_CODE(code, lino, _end);
×
1792
  }
1793

1794
  for (int32_t i = 0; i < numOfCols; ++i) {
844,819✔
1795
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
480,319✔
1796
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
480,467!
1797

1798
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
480,524!
1799
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
480,524✔
1800

1801
      SColumn c = extractColumnFromColumnNode(pColNode);
480,524✔
1802
      void*   tmp = taosArrayPush(pList, &c);
480,509✔
1803
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
480,509!
UNCOV
1804
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
×
1805
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
×
1806
      SColumn     c = {0};
×
UNCOV
1807
      c.slotId = pNode->slotId;
×
UNCOV
1808
      c.colId = pNode->slotId;
×
UNCOV
1809
      c.type = pValNode->node.type;
×
UNCOV
1810
      c.bytes = pValNode->node.resType.bytes;
×
UNCOV
1811
      c.scale = pValNode->node.resType.scale;
×
UNCOV
1812
      c.precision = pValNode->node.resType.precision;
×
1813

1814
      void* tmp = taosArrayPush(pList, &c);
×
UNCOV
1815
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1816
    }
1817
  }
1818

1819
  (*pArrayRes) = pList;
364,500✔
1820

1821
_end:
364,500✔
1822
  if (code != TSDB_CODE_SUCCESS) {
364,500!
UNCOV
1823
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1824
  }
1825
  return code;
364,327✔
1826
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc