• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3599

08 Feb 2025 11:23AM UTC coverage: 1.77% (-61.6%) from 63.396%
#3599

push

travis-ci

web-flow
Merge pull request #29712 from taosdata/fix/TD-33652-3.0

fix: reduce write rows from 30w to 3w

3776 of 278949 branches covered (1.35%)

Branch coverage included in aggregate %.

6012 of 274147 relevant lines covered (2.19%)

1642.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamcountwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "function.h"
17
#include "functionMgt.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tdatablock.h"
24
#include "tglobal.h"
25
#include "tlog.h"
26
#include "ttime.h"
27

28
#define IS_NORMAL_COUNT_OP(op)          ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT)
29
#define STREAM_COUNT_OP_STATE_NAME      "StreamCountHistoryState"
30
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
31

32
typedef struct SCountWindowInfo {
33
  SResultWindowInfo winInfo;
34
  COUNT_TYPE*       pWindowCount;
35
} SCountWindowInfo;
36

37
typedef enum {
38
  NONE_WINDOW = 0,
39
  CREATE_NEW_WINDOW,
40
  MOVE_NEXT_WINDOW,
41
} BuffOp;
42
typedef struct SBuffInfo {
43
  bool             rebuildWindow;
44
  BuffOp           winBuffOp;
45
  SStreamStateCur* pCur;
46
} SBuffInfo;
47

48
void destroyStreamCountAggOperatorInfo(void* param) {
×
49
  if (param == NULL) {
×
50
    return;
×
51
  }
52
  SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
×
53
  cleanupBasicInfo(&pInfo->binfo);
×
54
  if (pInfo->pOperator) {
×
55
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
×
56
                              &pInfo->groupResInfo);
57
    pInfo->pOperator = NULL;
×
58
  }
59
  destroyStreamAggSupporter(&pInfo->streamAggSup);
×
60
  cleanupExprSupp(&pInfo->scalarSupp);
×
61
  clearGroupResInfo(&pInfo->groupResInfo);
×
62
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
×
63
  pInfo->pUpdated = NULL;
×
64

65
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
66
  blockDataDestroy(pInfo->pDelRes);
×
67
  tSimpleHashCleanup(pInfo->pStUpdated);
×
68
  tSimpleHashCleanup(pInfo->pStDeleted);
×
69
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
70

71
  taosArrayDestroy(pInfo->historyWins);
×
72
  blockDataDestroy(pInfo->pCheckpointRes);
×
73

74
  tSimpleHashCleanup(pInfo->pPkDeleted);
×
75

76
  taosMemoryFreeClear(param);
×
77
}
78

79
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
×
80

81
int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
×
82
                          SBuffInfo* pBuffInfo) {
83
  int32_t code = TSDB_CODE_SUCCESS;
×
84
  int32_t lino = 0;
×
85
  int32_t winCode = TSDB_CODE_SUCCESS;
×
86
  int32_t size = pAggSup->resultRowSize;
×
87
  pCurWin->winInfo.sessionWin.groupId = groupId;
×
88
  pCurWin->winInfo.sessionWin.win.skey = ts;
×
89
  pCurWin->winInfo.sessionWin.win.ekey = ts;
×
90

91
  if (isSlidingCountWindow(pAggSup)) {
×
92
    if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
×
93
      code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
×
94
                                                        (void**)&pCurWin->winInfo.pStatePos, &size);
×
95
      QUERY_CHECK_CODE(code, lino, _end);
×
96

97
      winCode = TSDB_CODE_FAILED;
×
98
    } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
×
99
      QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
×
100
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
×
101
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
×
102
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
×
103
      if (winCode == TSDB_CODE_FAILED) {
×
104
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
×
105
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
×
106
        QUERY_CHECK_CODE(code, lino, _end);
×
107
      } else {
108
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
×
109
      }
110
    } else {
111
      pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
112
                                                                        pAggSup->windowCount);
×
113
      winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
×
114
                                                                 (void**)&pCurWin->winInfo.pStatePos, &size);
×
115
      if (winCode == TSDB_CODE_FAILED) {
×
116
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
×
117
                                                          (void**)&pCurWin->winInfo.pStatePos, &size);
×
118
        QUERY_CHECK_CODE(code, lino, _end);
×
119
      } else {
120
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
×
121
      }
122
    }
123
    if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
×
124
      pBuffInfo->rebuildWindow = true;
×
125
    }
126
  } else {
127
    code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
128
                                                                pAggSup->windowCount,
×
129
                                                                (void**)&pCurWin->winInfo.pStatePos, &size, &winCode);
×
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

133
  if (winCode == TSDB_CODE_SUCCESS) {
×
134
    pCurWin->winInfo.isOutput = true;
×
135
  }
136
  pCurWin->pWindowCount =
×
137
      (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE)));
×
138

139
  if (*pCurWin->pWindowCount == pAggSup->windowCount) {
×
140
    pBuffInfo->rebuildWindow = true;
×
141
  }
142

143
_end:
×
144
  if (code != TSDB_CODE_SUCCESS) {
×
145
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
146
  }
147
  return code;
×
148
}
149

150
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
×
151
  SSessionKey key = {0};
×
152
  getSessionHashKey(pKey, &key);
×
153
  int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
×
154
  if (code != TSDB_CODE_SUCCESS) {
×
155
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
156
  }
157

158
  code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
×
159
  if (code != TSDB_CODE_SUCCESS) {
×
160
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
161
  }
162
}
×
163

164
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
×
165
                                     int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
166
                                     SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
167
  int32_t     code = TSDB_CODE_SUCCESS;
×
168
  int32_t     lino = 0;
×
169
  SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
×
170
  int32_t     num = 0;
×
171
  for (int32_t i = start; i < rows; i++) {
×
172
    if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
×
173
      num++;
×
174
    } else {
175
      break;
×
176
    }
177
  }
178
  int32_t maxNum = TMIN(maxRows - *pWinInfo->pWindowCount, rows - start);
×
179
  if (num > maxNum) {
×
180
    *pRebuild = true;
×
181
  }
182
  *pWinInfo->pWindowCount += maxNum;
×
183
  bool needDelState = false;
×
184
  if (pWinInfo->winInfo.sessionWin.win.skey > pTs[start]) {
×
185
    needDelState = true;
×
186
    if (pStDeleted && pWinInfo->winInfo.isOutput) {
×
187
      code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
×
188
      QUERY_CHECK_CODE(code, lino, _end);
×
189
    }
190

191
    pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
×
192
  }
193

194
  if (pWinInfo->winInfo.sessionWin.win.ekey < pTs[maxNum + start - 1]) {
×
195
    needDelState = true;
×
196
    pWinInfo->winInfo.sessionWin.win.ekey = pTs[maxNum + start - 1];
×
197
  }
198

199
  if (needDelState) {
×
200
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
×
201
    removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
×
202
    if (pWinInfo->winInfo.pStatePos->needFree) {
×
203
      pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
×
204
    }
205
  }
206

207
  (*pWinRows) = maxNum;
×
208

209
_end:
×
210
  if (code != TSDB_CODE_SUCCESS) {
×
211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
212
  }
213
  return code;
×
214
}
215

216
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
×
217
  *pDelRange = *pKey;
×
218
  SStreamStateCur* pCur = NULL;
×
219
  if (isSlidingCountWindow(pAggSup)) {
×
220
    pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, pKey, pAggSup->windowCount);
×
221
  } else {
222
    pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
×
223
  }
224
  SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
×
225
  int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
×
226
  if (code != TSDB_CODE_SUCCESS) {
×
227
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
228
    return;
×
229
  }
230
  pDelRange->win = tmpKey.win;
×
231
  while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
×
232
    pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
233
    code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
×
234
    if (code != TSDB_CODE_SUCCESS) {
×
235
      break;
×
236
    }
237
    pDelRange->win.ekey = TMAX(pDelRange->win.ekey, tmpKey.win.ekey);
×
238
  }
239
  pAggSup->stateStore.streamStateFreeCur(pCur);
×
240
}
241

242
static void destroySBuffInfo(SStreamAggSupporter* pAggSup, SBuffInfo* pBuffInfo) {
×
243
  pAggSup->stateStore.streamStateFreeCur(pBuffInfo->pCur);
×
244
}
×
245

246
bool inCountCalSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, TSKEY sKey, TSKEY eKey) {
×
247
  if (pAggSup->windowCount == pAggSup->windowSliding) {
×
248
    return true;
×
249
  }
250
  if (sKey <= pWin->skey && pWin->ekey <= eKey) {
×
251
    return true;
×
252
  }
253
  return false;
×
254
}
255

256
bool inCountSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
×
257
  return inCountCalSlidingWindow(pAggSup, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
×
258
}
259

260
static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
×
261
                                 SSHashObj* pStDeleted) {
262
  int32_t                      code = TSDB_CODE_SUCCESS;
×
263
  int32_t                      lino = 0;
×
264
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
265
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
266
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
×
267
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
×
268
  SResultRow*                  pResult = NULL;
×
269
  int32_t                      rows = pSDataBlock->info.rows;
×
270
  int32_t                      winRows = 0;
×
271
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
272
  SBuffInfo                    buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
273

274
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
×
275
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
×
276
  if (pAggSup->winRange.ekey <= 0) {
×
277
    pAggSup->winRange.ekey = INT64_MAX;
×
278
  }
279

280
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
×
281
  if (!pStartTsCol) {
×
282
    code = TSDB_CODE_FAILED;
×
283
    QUERY_CHECK_CODE(code, lino, _end);
×
284
  }
285
  TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
×
286
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
×
287
  QUERY_CHECK_CODE(code, lino, _end);
×
288

289
  SStreamStateCur* pCur = NULL;
×
290
  COUNT_TYPE       slidingRows = 0;
×
291

292
  for (int32_t i = 0; i < rows;) {
×
293
    if (pInfo->ignoreExpiredData &&
×
294
        checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
×
295
                         pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
×
296
      i++;
×
297
      continue;
×
298
    }
299
    SCountWindowInfo curWin = {0};
×
300
    buffInfo.rebuildWindow = false;
×
301
    code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo);
×
302
    QUERY_CHECK_CODE(code, lino, _end);
×
303

304
    if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
×
305
      buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
×
306
      continue;
×
307
    }
308
    setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
×
309
    slidingRows = *curWin.pWindowCount;
×
310
    if (!buffInfo.rebuildWindow) {
×
311
      code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
×
312
                                   &buffInfo.rebuildWindow, &winRows);
313
      QUERY_CHECK_CODE(code, lino, _end);
×
314
    }
315
    if (buffInfo.rebuildWindow) {
×
316
      SSessionKey range = {0};
×
317
      if (isSlidingCountWindow(pAggSup)) {
×
318
        curWin.winInfo.sessionWin.win.skey = startTsCols[i];
×
319
        curWin.winInfo.sessionWin.win.ekey = startTsCols[i];
×
320
      }
321
      getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range);
×
322
      range.win.skey = TMIN(startTsCols[i], range.win.skey);
×
323
      range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
×
324
      uint64_t uid = 0;
×
325
      code =
326
          appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
×
327
      QUERY_CHECK_CODE(code, lino, _end);
×
328
      break;
×
329
    }
330
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
×
331
                              pOperator, 0);
332
    QUERY_CHECK_CODE(code, lino, _end);
×
333

334
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
×
335
    QUERY_CHECK_CODE(code, lino, _end);
×
336

337
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
×
338
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
339
      QUERY_CHECK_CODE(code, lino, _end);
×
340
    }
341

342
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
×
343
      code = saveResult(curWin.winInfo, pStUpdated);
×
344
      QUERY_CHECK_CODE(code, lino, _end);
×
345
    }
346
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
×
347
      curWin.winInfo.pStatePos->beUpdated = true;
×
348
      SSessionKey key = {0};
×
349
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
×
350
      code =
351
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
×
352
      QUERY_CHECK_CODE(code, lino, _end);
×
353
    }
354

355
    if (isSlidingCountWindow(pAggSup)) {
×
356
      if (slidingRows <= pAggSup->windowSliding) {
×
357
        if (slidingRows + winRows > pAggSup->windowSliding) {
×
358
          buffInfo.winBuffOp = CREATE_NEW_WINDOW;
×
359
          winRows = pAggSup->windowSliding - slidingRows;
×
360
        }
361
      } else {
362
        buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
×
363
        winRows = 0;
×
364
      }
365
    }
366
    i += winRows;
×
367
  }
368

369
_end:
×
370
  if (code != TSDB_CODE_SUCCESS) {
×
371
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
372
  }
373
  destroySBuffInfo(pAggSup, &buffInfo);
×
374
}
×
375

376
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
377
  int32_t                      code = TSDB_CODE_SUCCESS;
×
378
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
379
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
380
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
×
381
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
382
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
×
383
  if (pInfo->pDelRes->info.rows > 0) {
×
384
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
385
    (*ppRes) = pInfo->pDelRes;
×
386
    return code;
×
387
  }
388

389
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
×
390
  if (pBInfo->pRes->info.rows > 0) {
×
391
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
392
    (*ppRes) = pBInfo->pRes;
×
393
    return code;
×
394
  }
395
  (*ppRes) = NULL;
×
396
  return code;
×
397
}
398

399
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
400
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
401
  if (!pInfo) {
×
402
    return 0;
×
403
  }
404

405
  void* pData = (buf == NULL) ? NULL : *buf;
×
406

407
  // 1.streamAggSup.pResultRows
408
  int32_t tlen = 0;
×
409
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
410
  tlen += taosEncodeFixedI32(buf, mapSize);
×
411
  void*   pIte = NULL;
×
412
  size_t  keyLen = 0;
×
413
  int32_t iter = 0;
×
414
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
415
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
416
    tlen += encodeSSessionKey(buf, key);
×
417
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
418
  }
419

420
  // 2.twAggSup
421
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
422

423
  // 3.dataVersion
424
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
425

426
  // 4.checksum
427
  if (isParent) {
×
428
    if (buf) {
×
429
      uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
430
      tlen += taosEncodeFixedU32(buf, cksum);
×
431
    } else {
432
      tlen += sizeof(uint32_t);
×
433
    }
434
  }
435

436
  return tlen;
×
437
}
438

439
int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
440
  int32_t                      code = TSDB_CODE_SUCCESS;
×
441
  int32_t                      lino = 0;
×
442
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
443
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
444
  if (!pInfo) {
×
445
    code = TSDB_CODE_FAILED;
×
446
    QUERY_CHECK_CODE(code, lino, _end);
×
447
  }
448

449
  // 4.checksum
450
  if (isParent) {
×
451
    int32_t dataLen = len - sizeof(uint32_t);
×
452
    void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
453
    if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
454
      code = TSDB_CODE_FAILED;
×
455
      QUERY_CHECK_CODE(code, lino, _end);
×
456
    }
457
  }
458

459
  // 1.streamAggSup.pResultRows
460
  int32_t mapSize = 0;
×
461
  buf = taosDecodeFixedI32(buf, &mapSize);
×
462
  for (int32_t i = 0; i < mapSize; i++) {
×
463
    SSessionKey      key = {0};
×
464
    SCountWindowInfo curWin = {0};
×
465
    buf = decodeSSessionKey(buf, &key);
×
466
    SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
467
    code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
×
468
    QUERY_CHECK_CODE(code, lino, _end);
×
469

470
    buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
×
471
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
×
472
                          sizeof(SResultWindowInfo));
473
    QUERY_CHECK_CODE(code, lino, _end);
×
474
  }
475

476
  // 2.twAggSup
477
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
478

479
  // 3.dataVersion
480
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
481

482
_end:
×
483
  if (code != TSDB_CODE_SUCCESS) {
×
484
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
485
  }
486
  return code;
×
487
}
488

489
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
×
490
  int32_t                      code = TSDB_CODE_SUCCESS;
×
491
  int32_t                      lino = 0;
×
492
  void*                        pBuf = NULL;
×
493
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
494
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
495
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
×
496
    int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
×
497
    pBuf = taosMemoryCalloc(1, len);
×
498
    if (!pBuf) {
×
499
      code = terrno;
×
500
      QUERY_CHECK_CODE(code, lino, _end);
×
501
    }
502
    void* pTmpBuf = pBuf;
×
503
    len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
×
504
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
505
                                                       strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
506
    saveStreamOperatorStateComplete(&pInfo->basic);
×
507
  }
508

509
_end:
×
510
  taosMemoryFreeClear(pBuf);
×
511
  if (code != TSDB_CODE_SUCCESS) {
×
512
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
513
  }
514
}
×
515

516
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
×
517
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
518
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
×
519
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
520
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
×
521
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
522
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
×
523
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
524
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
×
525
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
526
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
×
527

528
  SRowBuffPos* pPos = NULL;
×
529
  int32_t      size = 0;
×
530
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
531
    SSessionKey      key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]};
×
532
    SStreamStateCur* pCur = NULL;
×
533
    if (isSlidingCountWindow(pAggSup)) {
×
534
      pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount);
×
535
    } else {
536
      pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, &key);
×
537
    }
538
    while (1) {
×
539
      SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN};
×
540
      int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size);
×
541
      if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) {
×
542
        pAggSup->stateStore.streamStateFreeCur(pCur);
×
543
        break;
×
544
      }
545
      if (!inCountCalSlidingWindow(pAggSup, &tmpKey.win, calStartDatas[i], calEndDatas[i])) {
×
546
        pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
547
        continue;
×
548
      }
549
      pAggSup->stateStore.streamStateSessionReset(pAggSup->pState, pPos->pRowBuff);
×
550
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
551
    }
552
  }
553
}
×
554

555
int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
×
556
  int32_t          code = TSDB_CODE_SUCCESS;
×
557
  int32_t          lino = 0;
×
558
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
559
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
×
560
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
561
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
×
562
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
563
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
×
564
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
565
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
×
566
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
567
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
×
568
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
569
    SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]};
×
570
    while (1) {
×
571
      SSessionKey curWin = {0};
×
572
      int32_t     winCode = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin);
×
573
      if (winCode != TSDB_CODE_SUCCESS) {
×
574
        break;
×
575
      }
576
      doDeleteSessionWindow(pAggSup, &curWin);
×
577
      if (result) {
×
578
        code = saveDeleteInfo(result, curWin);
×
579
        QUERY_CHECK_CODE(code, lino, _end);
×
580
      }
581
    }
582
  }
583

584
_end:
×
585
  if (code != TSDB_CODE_SUCCESS) {
×
586
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
587
  }
588
  return code;
×
589
}
590

591
int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
×
592
                            SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
593
  int32_t code = TSDB_CODE_SUCCESS;
×
594
  int32_t lino = 0;
×
595
  SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
×
596
  if (!pWins) {
×
597
    code = terrno;
×
598
    QUERY_CHECK_CODE(code, lino, _end);
×
599
  }
600

601
  if (isSlidingCountWindow(pAggSup)) {
×
602
    code = doDeleteCountWindows(pAggSup, pBlock, pWins);
×
603
    QUERY_CHECK_CODE(code, lino, _end);
×
604
  } else {
605
    code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
×
606
    QUERY_CHECK_CODE(code, lino, _end);
×
607
  }
608
  removeSessionResults(pAggSup, pMapUpdate, pWins);
×
609
  code = copyDeleteWindowInfo(pWins, pMapDelete);
×
610
  QUERY_CHECK_CODE(code, lino, _end);
×
611
  if (needAdd) {
×
612
    code = copyDeleteWindowInfo(pWins, pPkDelete);
×
613
    QUERY_CHECK_CODE(code, lino, _end);
×
614
  }
615
  taosArrayDestroy(pWins);
×
616

617
_end:
×
618
  if (code != TSDB_CODE_SUCCESS) {
×
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
620
  }
621
  return code;
×
622
}
623

624
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
625
  int32_t                      code = TSDB_CODE_SUCCESS;
×
626
  int32_t                      lino = 0;
×
627
  SExprSupp*                   pSup = &pOperator->exprSupp;
×
628
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
629
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
×
630
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
631
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
632
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
×
633
  if (pOperator->status == OP_EXEC_DONE) {
×
634
    (*ppRes) = NULL;
×
635
    return code;
×
636
  } else if (pOperator->status == OP_RES_TO_RETURN) {
×
637
    SSDataBlock* opRes = NULL;
×
638
    code = buildCountResult(pOperator, &opRes);
×
639
    QUERY_CHECK_CODE(code, lino, _end);
×
640
    if (opRes) {
×
641
      (*ppRes) = opRes;
×
642
      return code;
×
643
    }
644

645
    if (pInfo->recvGetAll) {
×
646
      pInfo->recvGetAll = false;
×
647
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
×
648
    }
649

650
    if (pInfo->reCkBlock) {
×
651
      pInfo->reCkBlock = false;
×
652
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
653
      (*ppRes) = pInfo->pCheckpointRes;
×
654
      return code;
×
655
    }
656

657
    setStreamOperatorCompleted(pOperator);
×
658
    (*ppRes) = NULL;
×
659
    return code;
×
660
  }
661

662
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
663
  if (!pInfo->pUpdated) {
×
664
    pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
×
665
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
666
  }
667
  if (!pInfo->pStUpdated) {
×
668
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
669
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
×
670
    QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
×
671
  }
672
  while (1) {
×
673
    SSDataBlock* pBlock = NULL;
×
674
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
×
675
    QUERY_CHECK_CODE(code, lino, _end);
×
676

677
    if (pBlock == NULL) {
×
678
      break;
×
679
    }
680

681
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
682
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
683

684
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
×
685
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
×
686
      code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
×
687
                                 add);
688
      QUERY_CHECK_CODE(code, lino, _end);
×
689
      continue;
×
690
    } else if (pBlock->info.type == STREAM_CLEAR) {
×
691
      doResetCountWindows(&pInfo->streamAggSup, pBlock);
×
692
      continue;
×
693
    } else if (pBlock->info.type == STREAM_GET_ALL) {
×
694
      pInfo->recvGetAll = true;
×
695
      code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
×
696
      QUERY_CHECK_CODE(code, lino, _end);
×
697
      continue;
×
698
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
×
699
      (*ppRes) = pBlock;
×
700
      return code;
×
701
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
×
702
      pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
703
      doStreamCountSaveCheckpoint(pOperator);
×
704
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
705
      QUERY_CHECK_CODE(code, lino, _end);
×
706
      continue;
×
707
    } else {
708
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
×
709
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
710
        QUERY_CHECK_CODE(code, lino, _end);
×
711
      }
712
    }
713

714
    if (pInfo->scalarSupp.pExprInfo != NULL) {
×
715
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
716
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
717
      QUERY_CHECK_CODE(code, lino, _end);
×
718
    }
719
    // the pDataBlock are always the same one, no need to call this again
720
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
×
721
    QUERY_CHECK_CODE(code, lino, _end);
×
722
    doStreamCountAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
×
723
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
×
724
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
×
725
  }
726
  // restore the value
727
  pOperator->status = OP_RES_TO_RETURN;
×
728

729
  code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
×
730
  QUERY_CHECK_CODE(code, lino, _end);
×
731

732
  code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
×
733
  QUERY_CHECK_CODE(code, lino, _end);
×
734

735
  removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
×
736
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
737
  pInfo->pUpdated = NULL;
×
738
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
739
  QUERY_CHECK_CODE(code, lino, _end);
×
740

741
  if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
×
742
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
×
743
    QUERY_CHECK_CODE(code, lino, _end);
×
744
  }
745

746
  SSDataBlock* opRes = NULL;
×
747
  code = buildCountResult(pOperator, &opRes);
×
748
  QUERY_CHECK_CODE(code, lino, _end);
×
749
  if (opRes) {
×
750
    (*ppRes) = opRes;
×
751
    return code;
×
752
  }
753

754
_end:
×
755
  if (code != TSDB_CODE_SUCCESS) {
×
756
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
757
    pTaskInfo->code = code;
×
758
    T_LONG_JMP(pTaskInfo->env, code);
×
759
  }
760
  setStreamOperatorCompleted(pOperator);
×
761
  (*ppRes) = NULL;
×
762
  return code;
×
763
}
764

765
void streamCountReleaseState(SOperatorInfo* pOperator) {
×
766
  int32_t                      code = TSDB_CODE_SUCCESS;
×
767
  int32_t                      lino = 0;
×
768
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
×
769
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
770
  int32_t                      resSize = sizeof(TSKEY);
×
771
  char*                        pBuff = taosMemoryCalloc(1, resSize);
×
772
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
×
773

774
  memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
×
775
  qDebug("===stream=== count window operator relase state. ");
×
776
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
×
777
                                                     strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
778
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
×
779
  taosMemoryFreeClear(pBuff);
×
780
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
781
  if (downstream->fpSet.releaseStreamStateFn) {
×
782
    downstream->fpSet.releaseStreamStateFn(downstream);
×
783
  }
784
_end:
×
785
  if (code != TSDB_CODE_SUCCESS) {
×
786
    terrno = code;
×
787
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
788
  }
789
}
×
790

791
void streamCountReloadState(SOperatorInfo* pOperator) {
×
792
  int32_t                      code = TSDB_CODE_SUCCESS;
×
793
  int32_t                      lino = 0;
×
794
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
795
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
796
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
797
  int32_t                      size = 0;
×
798
  void*                        pBuf = NULL;
×
799

800
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
×
801
                                                strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
802
  QUERY_CHECK_CODE(code, lino, _end);
×
803

804
  TSKEY ts = *(TSKEY*)pBuf;
×
805
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
×
806
  taosMemoryFree(pBuf);
×
807

808
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
809
  if (downstream->fpSet.reloadStreamStateFn) {
×
810
    downstream->fpSet.reloadStreamStateFn(downstream);
×
811
  }
812
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
×
813

814
_end:
×
815
  if (code != TSDB_CODE_SUCCESS) {
×
816
    terrno = code;
×
817
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
818
  }
819
}
×
820

821
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
822
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
823
  QRY_PARAM_CHECK(pOptrInfo);
×
824

825
  SCountWinodwPhysiNode*       pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
×
826
  int32_t                      numOfCols = 0;
×
827
  int32_t                      code = TSDB_CODE_SUCCESS;
×
828
  int32_t                      lino = 0;
×
829
  SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
×
830
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
831
  if (pInfo == NULL || pOperator == NULL) {
×
832
    code = terrno;
×
833
    QUERY_CHECK_CODE(code, lino, _error);
×
834
  }
835

836
  pOperator->pTaskInfo = pTaskInfo;
×
837

838
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
839
  if (pCountNode->window.pExprs != NULL) {
×
840
    int32_t    numOfScalar = 0;
×
841
    SExprInfo* pScalarExprInfo = NULL;
×
842
    code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
843
    QUERY_CHECK_CODE(code, lino, _error);
×
844

845
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
846
    QUERY_CHECK_CODE(code, lino, _error);
×
847
  }
848
  SExprSupp* pExpSup = &pOperator->exprSupp;
×
849

850
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
851
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
852
  pInfo->binfo.pRes = pResBlock;
×
853

854
  SExprInfo*   pExprInfo = NULL;
×
855
  code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
×
856
  QUERY_CHECK_CODE(code, lino, _error);
×
857

858
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
×
859
  QUERY_CHECK_CODE(code, lino, _error);
×
860

861
  pInfo->twAggSup = (STimeWindowAggSupp){
×
862
      .waterMark = pCountNode->window.watermark,
×
863
      .calTrigger = pCountNode->window.triggerType,
×
864
      .maxTs = INT64_MIN,
865
      .minTs = INT64_MAX,
866
      .deleteMark = getDeleteMark(&pCountNode->window, 0),
×
867
  };
868

869
  pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
×
870
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
×
871
                                sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
872
                                GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
×
873
                                STREAM_STATE_BUFF_SORT, 1);
874
  QUERY_CHECK_CODE(code, lino, _error);
×
875

876
  pInfo->streamAggSup.windowCount = pCountNode->windowCount;
×
877
  pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
×
878

879
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
880
  QUERY_CHECK_CODE(code, lino, _error);
×
881

882
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
883
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
×
884
  QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
×
885
  pInfo->pDelIterator = NULL;
×
886

887
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
×
888
  QUERY_CHECK_CODE(code, lino, _error);
×
889

890
  pInfo->ignoreExpiredData = pCountNode->window.igExpired;
×
891
  pInfo->ignoreExpiredDataSaved = false;
×
892
  pInfo->pUpdated = NULL;
×
893
  pInfo->pStUpdated = NULL;
×
894
  pInfo->dataVersion = 0;
×
895
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
×
896
  if (!pInfo->historyWins) {
×
897
    code = terrno;
×
898
    QUERY_CHECK_CODE(code, lino, _error);
×
899
  }
900

901
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
902
  QUERY_CHECK_CODE(code, lino, _error);
×
903

904
  pInfo->recvGetAll = false;
×
905
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
×
906
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
×
907
  pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
×
908

909
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
×
910
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
×
911
                  OP_NOT_OPENED, pInfo, pTaskInfo);
912
  // for stream
913
  void*   buff = NULL;
×
914
  int32_t len = 0;
×
915
  int32_t res =
916
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
917
                                                        strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
918
  if (res == TSDB_CODE_SUCCESS) {
×
919
    code = doStreamCountDecodeOpState(buff, len, pOperator, true);
×
920
    QUERY_CHECK_CODE(code, lino, _error);
×
921
    taosMemoryFree(buff);
×
922
  }
923
  pInfo->pOperator = pOperator;
×
924
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
×
925
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
926
  setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
×
927

928
  if (downstream) {
×
929
    code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
×
930
                          &pInfo->twAggSup, &pInfo->basic);
×
931
    QUERY_CHECK_CODE(code, lino, _error);
×
932

933
    code = appendDownstream(pOperator, &downstream, 1);
×
934
    QUERY_CHECK_CODE(code, lino, _error);
×
935
  }
936

937
  *pOptrInfo = pOperator;
×
938
  return TSDB_CODE_SUCCESS;
×
939

940
_error:
×
941
  if (pInfo != NULL) {
×
942
    destroyStreamCountAggOperatorInfo(pInfo);
×
943
  }
944

945
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
946
  pTaskInfo->code = code;
×
947
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
948
  return code;
×
949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc