• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3633

11 Mar 2025 12:59PM UTC coverage: 0.0% (-60.7%) from 60.719%
#3633

push

travis-ci

web-flow
Merge pull request #30118 from taosdata/wl30

udpate ci workflow

0 of 280412 branches covered (0.0%)

Branch coverage included in aggregate %.

0 of 275582 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamcountwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "function.h"
17
#include "functionMgt.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "streamexecutorInt.h"
21
#include "tchecksum.h"
22
#include "tcommon.h"
23
#include "tdatablock.h"
24
#include "tglobal.h"
25
#include "tlog.h"
26
#include "ttime.h"
27

28
#define STREAM_COUNT_OP_STATE_NAME      "StreamCountHistoryState"
29
#define STREAM_COUNT_OP_CHECKPOINT_NAME "StreamCountOperator_Checkpoint"
30

31
typedef struct SCountWindowInfo {
32
  SResultWindowInfo winInfo;
33
  COUNT_TYPE*       pWindowCount;
34
} SCountWindowInfo;
35

36
typedef enum {
37
  NONE_WINDOW = 0,
38
  CREATE_NEW_WINDOW,
39
  MOVE_NEXT_WINDOW,
40
} BuffOp;
41
typedef struct SBuffInfo {
42
  bool             rebuildWindow;
43
  BuffOp           winBuffOp;
44
  SStreamStateCur* pCur;
45
} SBuffInfo;
46

47
void destroyStreamCountAggOperatorInfo(void* param) {
×
48
  if (param == NULL) {
×
49
    return;
×
50
  }
51
  SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
×
52
  cleanupBasicInfo(&pInfo->binfo);
×
53
  if (pInfo->pOperator) {
×
54
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
×
55
                              &pInfo->groupResInfo);
56
    pInfo->pOperator = NULL;
×
57
  }
58

59
  destroyStreamBasicInfo(&pInfo->basic);
×
60

61
  cleanupExprSupp(&pInfo->scalarSupp);
×
62
  clearGroupResInfo(&pInfo->groupResInfo);
×
63
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
×
64
  pInfo->pUpdated = NULL;
×
65
  destroyStreamAggSupporter(&pInfo->streamAggSup);
×
66

67
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
68
  blockDataDestroy(pInfo->pDelRes);
×
69
  tSimpleHashCleanup(pInfo->pStUpdated);
×
70
  tSimpleHashCleanup(pInfo->pStDeleted);
×
71
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
72

73
  taosArrayDestroy(pInfo->historyWins);
×
74
  blockDataDestroy(pInfo->pCheckpointRes);
×
75

76
  tSimpleHashCleanup(pInfo->pPkDeleted);
×
77

78
  taosMemoryFreeClear(param);
×
79
}
80

81
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
×
82

83
int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
×
84
                          SBuffInfo* pBuffInfo, int32_t* pWinCode) {
85
  int32_t code = TSDB_CODE_SUCCESS;
×
86
  int32_t lino = 0;
×
87
  int32_t size = pAggSup->resultRowSize;
×
88
  pCurWin->winInfo.sessionWin.groupId = groupId;
×
89
  pCurWin->winInfo.sessionWin.win.skey = ts;
×
90
  pCurWin->winInfo.sessionWin.win.ekey = ts;
×
91

92
  if (isSlidingCountWindow(pAggSup)) {
×
93
    if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
×
94
      code =
95
          pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
96
                                                     pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
×
97
      QUERY_CHECK_CODE(code, lino, _end);
×
98

99
      *pWinCode = TSDB_CODE_FAILED;
×
100
    } else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
×
101
      QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
×
102
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
×
103
      *pWinCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
×
104
                                                                   (void**)&pCurWin->winInfo.pStatePos, &size);
×
105
      if (*pWinCode == TSDB_CODE_FAILED) {
×
106
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
107
                                                          pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos,
×
108
                                                          &size);
109
        QUERY_CHECK_CODE(code, lino, _end);
×
110
      } else {
111
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
×
112
      }
113
    } else {
114
      pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
115
                                                                        pAggSup->windowCount);
×
116
      *pWinCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
×
117
                                                                   (void**)&pCurWin->winInfo.pStatePos, &size);
×
118
      if (*pWinCode == TSDB_CODE_FAILED) {
×
119
        code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
120
                                                          pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos,
×
121
                                                          &size);
122
        QUERY_CHECK_CODE(code, lino, _end);
×
123
      } else {
124
        reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
×
125
      }
126
    }
127
    if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
×
128
      pBuffInfo->rebuildWindow = true;
×
129
    }
130
  } else {
131
    code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
×
132
                                                                pAggSup->windowCount,
×
133
                                                                (void**)&pCurWin->winInfo.pStatePos, &size, pWinCode);
×
134
    QUERY_CHECK_CODE(code, lino, _end);
×
135
  }
136

137
  if (*pWinCode == TSDB_CODE_SUCCESS) {
×
138
    pCurWin->winInfo.isOutput = true;
×
139
  }
140
  pCurWin->pWindowCount =
×
141
      (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE)));
×
142

143
  if (*pCurWin->pWindowCount == pAggSup->windowCount) {
×
144
    pBuffInfo->rebuildWindow = true;
×
145
  }
146

147
_end:
×
148
  if (code != TSDB_CODE_SUCCESS) {
×
149
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
150
  }
151
  return code;
×
152
}
153

154
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
×
155
  SSessionKey key = {0};
×
156
  getSessionHashKey(pKey, &key);
×
157
  int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
×
158
  if (code != TSDB_CODE_SUCCESS) {
×
159
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
160
  }
161

162
  code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
×
163
  if (code != TSDB_CODE_SUCCESS) {
×
164
    qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
165
  }
166
}
×
167

168
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
×
169
                                     int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
170
                                     SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
171
  int32_t     code = TSDB_CODE_SUCCESS;
×
172
  int32_t     lino = 0;
×
173
  SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
×
174
  int32_t     num = 0;
×
175
  for (int32_t i = start; i < rows; i++) {
×
176
    if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
×
177
      num++;
×
178
    } else {
179
      break;
×
180
    }
181
  }
182
  int32_t maxNum = TMIN(maxRows - *pWinInfo->pWindowCount, rows - start);
×
183
  if (num > maxNum) {
×
184
    *pRebuild = true;
×
185
  }
186
  *pWinInfo->pWindowCount += maxNum;
×
187
  bool needDelState = false;
×
188
  if (pWinInfo->winInfo.sessionWin.win.skey > pTs[start]) {
×
189
    needDelState = true;
×
190
    if (pStDeleted && pWinInfo->winInfo.isOutput) {
×
191
      code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
×
192
      QUERY_CHECK_CODE(code, lino, _end);
×
193
    }
194

195
    pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
×
196
  }
197

198
  if (pWinInfo->winInfo.sessionWin.win.ekey < pTs[maxNum + start - 1]) {
×
199
    needDelState = true;
×
200
    pWinInfo->winInfo.sessionWin.win.ekey = pTs[maxNum + start - 1];
×
201
  }
202

203
  if (needDelState) {
×
204
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
×
205
    removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
×
206
    if (pWinInfo->winInfo.pStatePos->needFree) {
×
207
      pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
×
208
    }
209
  }
210

211
  (*pWinRows) = maxNum;
×
212

213
_end:
×
214
  if (code != TSDB_CODE_SUCCESS) {
×
215
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
216
  }
217
  return code;
×
218
}
219

220
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
×
221
  *pDelRange = *pKey;
×
222
  SStreamStateCur* pCur = NULL;
×
223
  if (isSlidingCountWindow(pAggSup)) {
×
224
    pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, pKey, pAggSup->windowCount);
×
225
  } else {
226
    pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
×
227
  }
228
  SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
×
229
  int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
×
230
  if (code != TSDB_CODE_SUCCESS) {
×
231
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
232
    return;
×
233
  }
234
  pDelRange->win = tmpKey.win;
×
235
  while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
×
236
    pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
237
    code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
×
238
    if (code != TSDB_CODE_SUCCESS) {
×
239
      break;
×
240
    }
241
    pDelRange->win.ekey = TMAX(pDelRange->win.ekey, tmpKey.win.ekey);
×
242
  }
243
  pAggSup->stateStore.streamStateFreeCur(pCur);
×
244
}
245

246
static void destroySBuffInfo(SStreamAggSupporter* pAggSup, SBuffInfo* pBuffInfo) {
×
247
  pAggSup->stateStore.streamStateFreeCur(pBuffInfo->pCur);
×
248
}
×
249

250
bool inCountCalSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, TSKEY sKey, TSKEY eKey) {
×
251
  if (pAggSup->windowCount == pAggSup->windowSliding) {
×
252
    return true;
×
253
  }
254
  if (sKey <= pWin->skey && pWin->ekey <= eKey) {
×
255
    return true;
×
256
  }
257
  return false;
×
258
}
259

260
bool inCountSlidingWindow(SStreamAggSupporter* pAggSup, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
×
261
  return inCountCalSlidingWindow(pAggSup, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
×
262
}
263

264
static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
×
265
                                 SSHashObj* pStDeleted) {
266
  int32_t                      code = TSDB_CODE_SUCCESS;
×
267
  int32_t                      lino = 0;
×
268
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
269
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
270
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
×
271
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
×
272
  SResultRow*                  pResult = NULL;
×
273
  int32_t                      rows = pSDataBlock->info.rows;
×
274
  int32_t                      winRows = 0;
×
275
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
276
  SBuffInfo                    buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
277

278
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
×
279
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
×
280
  if (pAggSup->winRange.ekey <= 0) {
×
281
    pAggSup->winRange.ekey = INT64_MAX;
×
282
  }
283

284
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
×
285
  if (!pStartTsCol) {
×
286
    code = TSDB_CODE_FAILED;
×
287
    QUERY_CHECK_CODE(code, lino, _end);
×
288
  }
289
  TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
×
290
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
×
291
  QUERY_CHECK_CODE(code, lino, _end);
×
292

293
  SStreamStateCur* pCur = NULL;
×
294
  COUNT_TYPE       slidingRows = 0;
×
295

296
  for (int32_t i = 0; i < rows;) {
×
297
    if (pInfo->ignoreExpiredData &&
×
298
        checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
×
299
                         pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
×
300
      i++;
×
301
      continue;
×
302
    }
303
    SCountWindowInfo curWin = {0};
×
304
    int32_t          winCode = TSDB_CODE_SUCCESS;
×
305
    buffInfo.rebuildWindow = false;
×
306
    code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo, &winCode);
×
307
    QUERY_CHECK_CODE(code, lino, _end);
×
308

309
    if (winCode != TSDB_CODE_SUCCESS &&
×
310
        BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN)) {
×
311
      code = addCountAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, &pInfo->basic.notifyEventSup,
×
312
                                    pTaskInfo->streamInfo.pNotifyEventStat);
313
      QUERY_CHECK_CODE(code, lino, _end);
×
314
    }
315

316
    if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
×
317
      buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
×
318
      continue;
×
319
    }
320
    setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
×
321
    slidingRows = *curWin.pWindowCount;
×
322
    if (!buffInfo.rebuildWindow) {
×
323
      code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
×
324
                                   &buffInfo.rebuildWindow, &winRows);
325
      QUERY_CHECK_CODE(code, lino, _end);
×
326
    }
327
    if (buffInfo.rebuildWindow) {
×
328
      SSessionKey range = {0};
×
329
      if (isSlidingCountWindow(pAggSup)) {
×
330
        curWin.winInfo.sessionWin.win.skey = startTsCols[i];
×
331
        curWin.winInfo.sessionWin.win.ekey = startTsCols[i];
×
332
      }
333
      getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range);
×
334
      range.win.skey = TMIN(startTsCols[i], range.win.skey);
×
335
      range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
×
336
      uint64_t uid = 0;
×
337
      code =
338
          appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
×
339
      QUERY_CHECK_CODE(code, lino, _end);
×
340
      break;
×
341
    }
342
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
×
343
                              pOperator, 0);
344
    QUERY_CHECK_CODE(code, lino, _end);
×
345

346
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
×
347
    QUERY_CHECK_CODE(code, lino, _end);
×
348

349
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
×
350
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
351
      QUERY_CHECK_CODE(code, lino, _end);
×
352
    }
353

354
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
×
355
      code = saveResult(curWin.winInfo, pStUpdated);
×
356
      QUERY_CHECK_CODE(code, lino, _end);
×
357
    }
358
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
×
359
      curWin.winInfo.pStatePos->beUpdated = true;
×
360
      SSessionKey key = {0};
×
361
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
×
362
      code =
363
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
×
364
      QUERY_CHECK_CODE(code, lino, _end);
×
365
    }
366

367
    if (isSlidingCountWindow(pAggSup)) {
×
368
      if (slidingRows <= pAggSup->windowSliding) {
×
369
        if (slidingRows + winRows > pAggSup->windowSliding) {
×
370
          buffInfo.winBuffOp = CREATE_NEW_WINDOW;
×
371
          winRows = pAggSup->windowSliding - slidingRows;
×
372
        }
373
      } else {
374
        buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
×
375
        winRows = 0;
×
376
      }
377
    }
378
    i += winRows;
×
379
  }
380

381
_end:
×
382
  if (code != TSDB_CODE_SUCCESS) {
×
383
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
384
  }
385
  destroySBuffInfo(pAggSup, &buffInfo);
×
386
}
×
387

388
static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
389
  int32_t                      code = TSDB_CODE_SUCCESS;
×
390
  int32_t                      lino = 0;
×
391
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
392
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
393
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
×
394
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
395
  SStreamNotifyEventSupp*      pNotifySup = &pInfo->basic.notifyEventSup;
×
396
  STaskNotifyEventStat*        pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
×
397
  bool                         addNotifyEvent = false;
×
398
  addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
×
399
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
×
400
  if (pInfo->pDelRes->info.rows > 0) {
×
401
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
402
    if (addNotifyEvent) {
×
403
      code = addAggDeleteNotifyEvent(pInfo->pDelRes, pNotifySup, pNotifyEventStat);
×
404
      QUERY_CHECK_CODE(code, lino, _end);
×
405
    }
406
    (*ppRes) = pInfo->pDelRes;
×
407
    return code;
×
408
  }
409

410
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes,
×
411
                       addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
412
  if (pBInfo->pRes->info.rows > 0) {
×
413
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
414
    if (addNotifyEvent) {
×
415
      code = addAggResultNotifyEvent(pBInfo->pRes, pNotifySup->pSessionKeys, pTaskInfo->streamInfo.notifyResultSchema,
×
416
                                     pNotifySup, pNotifyEventStat);
417
      QUERY_CHECK_CODE(code, lino, _end);
×
418
    }
419
    (*ppRes) = pBInfo->pRes;
×
420
    return code;
×
421
  }
422

423
  code = buildNotifyEventBlock(pTaskInfo, pNotifySup, pNotifyEventStat);
×
424
  QUERY_CHECK_CODE(code, lino, _end);
×
425
  if (pNotifySup->pEventBlock && pNotifySup->pEventBlock->info.rows > 0) {
×
426
    printDataBlock(pNotifySup->pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
427
    (*ppRes) = pNotifySup->pEventBlock;
×
428
    return code;
×
429
  }
430

431
  code = removeOutdatedNotifyEvents(&pInfo->twAggSup, pNotifySup, pNotifyEventStat);
×
432
  QUERY_CHECK_CODE(code, lino, _end);
×
433

434
_end:
×
435
  if (code != TSDB_CODE_SUCCESS) {
×
436
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
437
  }
438
  (*ppRes) = NULL;
×
439
  return code;
×
440
}
441

442
int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
443
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
444
  if (!pInfo) {
×
445
    return 0;
×
446
  }
447

448
  void* pData = (buf == NULL) ? NULL : *buf;
×
449

450
  // 1.streamAggSup.pResultRows
451
  int32_t tlen = 0;
×
452
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
453
  tlen += taosEncodeFixedI32(buf, mapSize);
×
454
  void*   pIte = NULL;
×
455
  size_t  keyLen = 0;
×
456
  int32_t iter = 0;
×
457
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
458
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
459
    tlen += encodeSSessionKey(buf, key);
×
460
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
461
  }
462

463
  // 2.twAggSup
464
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
465

466
  // 3.dataVersion
467
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
468

469
  // 4.basicInfo
470
  tlen += encodeStreamBasicInfo(buf, &pInfo->basic);
×
471

472
  // 5.checksum
473
  if (isParent) {
×
474
    if (buf) {
×
475
      uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
476
      tlen += taosEncodeFixedU32(buf, cksum);
×
477
    } else {
478
      tlen += sizeof(uint32_t);
×
479
    }
480
  }
481

482
  return tlen;
×
483
}
484

485
int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
×
486
  int32_t                      code = TSDB_CODE_SUCCESS;
×
487
  int32_t                      lino = 0;
×
488
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
489
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
490
  void*                        pDataEnd = POINTER_SHIFT(buf, len);
×
491
  if (!pInfo) {
×
492
    code = TSDB_CODE_FAILED;
×
493
    QUERY_CHECK_CODE(code, lino, _end);
×
494
  }
495

496
  // 5.checksum
497
  if (isParent) {
×
498
    int32_t dataLen = len - sizeof(uint32_t);
×
499
    void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
500
    if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
501
      code = TSDB_CODE_FAILED;
×
502
      QUERY_CHECK_CODE(code, lino, _end);
×
503
    }
504
    pDataEnd = pCksum;
×
505
  }
506

507
  // 1.streamAggSup.pResultRows
508
  int32_t mapSize = 0;
×
509
  buf = taosDecodeFixedI32(buf, &mapSize);
×
510
  for (int32_t i = 0; i < mapSize; i++) {
×
511
    SSessionKey      key = {0};
×
512
    SCountWindowInfo curWin = {0};
×
513
    int32_t          winCode = TSDB_CODE_SUCCESS;
×
514
    buf = decodeSSessionKey(buf, &key);
×
515
    SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
×
516
    code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo, &winCode);
×
517
    QUERY_CHECK_CODE(code, lino, _end);
×
518

519
    buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
×
520
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
×
521
                          sizeof(SResultWindowInfo));
522
    QUERY_CHECK_CODE(code, lino, _end);
×
523
  }
524

525
  // 2.twAggSup
526
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
527

528
  // 3.dataVersion
529
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
530

531
  // 4.basicInfo
532
  if (buf < pDataEnd) {
×
533
    code = decodeStreamBasicInfo(&buf, &pInfo->basic);
×
534
    QUERY_CHECK_CODE(code, lino, _end);
×
535
  }
536

537
_end:
×
538
  if (code != TSDB_CODE_SUCCESS) {
×
539
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
540
  }
541
  return code;
×
542
}
543

544
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
×
545
  int32_t                      code = TSDB_CODE_SUCCESS;
×
546
  int32_t                      lino = 0;
×
547
  void*                        pBuf = NULL;
×
548
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
549
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
550
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
×
551
    int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
×
552
    pBuf = taosMemoryCalloc(1, len);
×
553
    if (!pBuf) {
×
554
      code = terrno;
×
555
      QUERY_CHECK_CODE(code, lino, _end);
×
556
    }
557
    void* pTmpBuf = pBuf;
×
558
    len = doStreamCountEncodeOpState(&pTmpBuf, len, pOperator, true);
×
559
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
560
                                                       strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), pBuf, len);
561
    saveStreamOperatorStateComplete(&pInfo->basic);
×
562
  }
563

564
_end:
×
565
  taosMemoryFreeClear(pBuf);
×
566
  if (code != TSDB_CODE_SUCCESS) {
×
567
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
568
  }
569
}
×
570

571
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
×
572
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
573
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
×
574
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
575
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
×
576
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
577
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
×
578
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
579
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
×
580
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
581
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
×
582

583
  SRowBuffPos* pPos = NULL;
×
584
  int32_t      size = 0;
×
585
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
586
    SSessionKey      key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]};
×
587
    SStreamStateCur* pCur = NULL;
×
588
    if (isSlidingCountWindow(pAggSup)) {
×
589
      pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount);
×
590
    } else {
591
      pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, &key);
×
592
    }
593
    while (1) {
×
594
      SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN};
×
595
      int32_t     code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size);
×
596
      if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) {
×
597
        pAggSup->stateStore.streamStateFreeCur(pCur);
×
598
        break;
×
599
      }
600
      if (!inCountCalSlidingWindow(pAggSup, &tmpKey.win, calStartDatas[i], calEndDatas[i])) {
×
601
        pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
602
        continue;
×
603
      }
604
      pAggSup->stateStore.streamStateSessionReset(pAggSup->pState, pPos->pRowBuff);
×
605
      pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
×
606
    }
607
  }
608
}
×
609

610
int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
×
611
  int32_t          code = TSDB_CODE_SUCCESS;
×
612
  int32_t          lino = 0;
×
613
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
614
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
×
615
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
616
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
×
617
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
618
  TSKEY*           calStartDatas = (TSKEY*)pStartTsCol->pData;
×
619
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
620
  TSKEY*           calEndDatas = (TSKEY*)pEndTsCol->pData;
×
621
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
622
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
×
623
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
624
    SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]};
×
625
    while (1) {
×
626
      SSessionKey curWin = {0};
×
627
      int32_t     winCode = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin);
×
628
      if (winCode != TSDB_CODE_SUCCESS) {
×
629
        break;
×
630
      }
631
      doDeleteSessionWindow(pAggSup, &curWin);
×
632
      if (result) {
×
633
        code = saveDeleteInfo(result, curWin);
×
634
        QUERY_CHECK_CODE(code, lino, _end);
×
635
      }
636
    }
637
  }
638

639
_end:
×
640
  if (code != TSDB_CODE_SUCCESS) {
×
641
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
642
  }
643
  return code;
×
644
}
645

646
int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
×
647
                            SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd) {
648
  int32_t code = TSDB_CODE_SUCCESS;
×
649
  int32_t lino = 0;
×
650
  SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
×
651
  if (!pWins) {
×
652
    code = terrno;
×
653
    QUERY_CHECK_CODE(code, lino, _end);
×
654
  }
655

656
  if (isSlidingCountWindow(pAggSup)) {
×
657
    code = doDeleteCountWindows(pAggSup, pBlock, pWins);
×
658
    QUERY_CHECK_CODE(code, lino, _end);
×
659
  } else {
660
    code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
×
661
    QUERY_CHECK_CODE(code, lino, _end);
×
662
  }
663
  removeSessionResults(pAggSup, pMapUpdate, pWins);
×
664
  code = copyDeleteWindowInfo(pWins, pMapDelete);
×
665
  QUERY_CHECK_CODE(code, lino, _end);
×
666
  if (needAdd) {
×
667
    code = copyDeleteWindowInfo(pWins, pPkDelete);
×
668
    QUERY_CHECK_CODE(code, lino, _end);
×
669
  }
670
  taosArrayDestroy(pWins);
×
671

672
_end:
×
673
  if (code != TSDB_CODE_SUCCESS) {
×
674
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
675
  }
676
  return code;
×
677
}
678

679
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
680
  int32_t                      code = TSDB_CODE_SUCCESS;
×
681
  int32_t                      lino = 0;
×
682
  SExprSupp*                   pSup = &pOperator->exprSupp;
×
683
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
684
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
×
685
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
686
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
687
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
×
688
  if (pOperator->status == OP_EXEC_DONE) {
×
689
    (*ppRes) = NULL;
×
690
    return code;
×
691
  } else if (pOperator->status == OP_RES_TO_RETURN) {
×
692
    SSDataBlock* opRes = NULL;
×
693
    code = buildCountResult(pOperator, &opRes);
×
694
    QUERY_CHECK_CODE(code, lino, _end);
×
695
    if (opRes) {
×
696
      (*ppRes) = opRes;
×
697
      return code;
×
698
    }
699

700
    if (pInfo->recvGetAll) {
×
701
      pInfo->recvGetAll = false;
×
702
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
×
703
    }
704

705
    if (pInfo->reCkBlock) {
×
706
      pInfo->reCkBlock = false;
×
707
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
708
      (*ppRes) = pInfo->pCheckpointRes;
×
709
      return code;
×
710
    }
711

712
    setStreamOperatorCompleted(pOperator);
×
713
    (*ppRes) = NULL;
×
714
    return code;
×
715
  }
716

717
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
718
  if (!pInfo->pUpdated) {
×
719
    pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
×
720
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
721
  }
722
  if (!pInfo->pStUpdated) {
×
723
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
724
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
×
725
    QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
×
726
  }
727
  while (1) {
×
728
    SSDataBlock* pBlock = NULL;
×
729
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
×
730
    QUERY_CHECK_CODE(code, lino, _end);
×
731

732
    if (pBlock == NULL) {
×
733
      break;
×
734
    }
735

736
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
737
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
738

739
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
×
740
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
×
741
      code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
×
742
                                 add);
743
      QUERY_CHECK_CODE(code, lino, _end);
×
744
      continue;
×
745
    } else if (pBlock->info.type == STREAM_CLEAR) {
×
746
      doResetCountWindows(&pInfo->streamAggSup, pBlock);
×
747
      continue;
×
748
    } else if (pBlock->info.type == STREAM_GET_ALL) {
×
749
      pInfo->recvGetAll = true;
×
750
      code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
×
751
      QUERY_CHECK_CODE(code, lino, _end);
×
752
      continue;
×
753
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
×
754
      (*ppRes) = pBlock;
×
755
      return code;
×
756
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
×
757
      pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
758
      doStreamCountSaveCheckpoint(pOperator);
×
759
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
760
      QUERY_CHECK_CODE(code, lino, _end);
×
761
      continue;
×
762
    } else {
763
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
×
764
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
765
        QUERY_CHECK_CODE(code, lino, _end);
×
766
      }
767
    }
768

769
    if (pInfo->scalarSupp.pExprInfo != NULL) {
×
770
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
771
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
772
      QUERY_CHECK_CODE(code, lino, _end);
×
773
    }
774
    // the pDataBlock are always the same one, no need to call this again
775
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
×
776
    QUERY_CHECK_CODE(code, lino, _end);
×
777
    doStreamCountAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
×
778
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
×
779
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
×
780
  }
781
  // restore the value
782
  pOperator->status = OP_RES_TO_RETURN;
×
783

784
  code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
×
785
  QUERY_CHECK_CODE(code, lino, _end);
×
786

787
  code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
×
788
  QUERY_CHECK_CODE(code, lino, _end);
×
789

790
  removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
×
791
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
792
  pInfo->pUpdated = NULL;
×
793
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
794
  QUERY_CHECK_CODE(code, lino, _end);
×
795

796
  if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
×
797
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
×
798
    QUERY_CHECK_CODE(code, lino, _end);
×
799
  }
800

801
  SSDataBlock* opRes = NULL;
×
802
  code = buildCountResult(pOperator, &opRes);
×
803
  QUERY_CHECK_CODE(code, lino, _end);
×
804
  if (opRes) {
×
805
    (*ppRes) = opRes;
×
806
    return code;
×
807
  }
808

809
_end:
×
810
  if (code != TSDB_CODE_SUCCESS) {
×
811
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
812
    pTaskInfo->code = code;
×
813
    T_LONG_JMP(pTaskInfo->env, code);
×
814
  }
815
  setStreamOperatorCompleted(pOperator);
×
816
  (*ppRes) = NULL;
×
817
  return code;
×
818
}
819

820
void streamCountReleaseState(SOperatorInfo* pOperator) {
×
821
  int32_t                      code = TSDB_CODE_SUCCESS;
×
822
  int32_t                      lino = 0;
×
823
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
×
824
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
825
  int32_t                      resSize = sizeof(TSKEY);
×
826
  char*                        pBuff = taosMemoryCalloc(1, resSize);
×
827
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
×
828

829
  memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
×
830
  qDebug("===stream=== count window operator relase state. ");
×
831
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
×
832
                                                     strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
833
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
×
834
  taosMemoryFreeClear(pBuff);
×
835
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
836
  if (downstream->fpSet.releaseStreamStateFn) {
×
837
    downstream->fpSet.releaseStreamStateFn(downstream);
×
838
  }
839
_end:
×
840
  if (code != TSDB_CODE_SUCCESS) {
×
841
    terrno = code;
×
842
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
843
  }
844
}
×
845

846
void streamCountReloadState(SOperatorInfo* pOperator) {
×
847
  int32_t                      code = TSDB_CODE_SUCCESS;
×
848
  int32_t                      lino = 0;
×
849
  SStreamCountAggOperatorInfo* pInfo = pOperator->info;
×
850
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
851
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
×
852
  int32_t                      size = 0;
×
853
  void*                        pBuf = NULL;
×
854

855
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
×
856
                                                strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
857
  QUERY_CHECK_CODE(code, lino, _end);
×
858

859
  TSKEY ts = *(TSKEY*)pBuf;
×
860
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
×
861
  taosMemoryFree(pBuf);
×
862

863
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
864
  if (downstream->fpSet.reloadStreamStateFn) {
×
865
    downstream->fpSet.reloadStreamStateFn(downstream);
×
866
  }
867
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
×
868

869
_end:
×
870
  if (code != TSDB_CODE_SUCCESS) {
×
871
    terrno = code;
×
872
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
873
  }
874
}
×
875

876
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
877
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
878
  QRY_PARAM_CHECK(pOptrInfo);
×
879

880
  SCountWinodwPhysiNode*       pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
×
881
  int32_t                      numOfCols = 0;
×
882
  int32_t                      code = TSDB_CODE_SUCCESS;
×
883
  int32_t                      lino = 0;
×
884
  SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
×
885
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
886
  if (pInfo == NULL || pOperator == NULL) {
×
887
    code = terrno;
×
888
    QUERY_CHECK_CODE(code, lino, _error);
×
889
  }
890

891
  pOperator->pTaskInfo = pTaskInfo;
×
892

893
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
894
  if (pCountNode->window.pExprs != NULL) {
×
895
    int32_t    numOfScalar = 0;
×
896
    SExprInfo* pScalarExprInfo = NULL;
×
897
    code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
898
    QUERY_CHECK_CODE(code, lino, _error);
×
899

900
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
901
    QUERY_CHECK_CODE(code, lino, _error);
×
902
  }
903
  SExprSupp* pExpSup = &pOperator->exprSupp;
×
904

905
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
906
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
907
  pInfo->binfo.pRes = pResBlock;
×
908

909
  SExprInfo* pExprInfo = NULL;
×
910
  code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
×
911
  QUERY_CHECK_CODE(code, lino, _error);
×
912

913
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
×
914
  QUERY_CHECK_CODE(code, lino, _error);
×
915

916
  pInfo->twAggSup = (STimeWindowAggSupp){
×
917
      .waterMark = pCountNode->window.watermark,
×
918
      .calTrigger = pCountNode->window.triggerType,
×
919
      .maxTs = INT64_MIN,
920
      .minTs = INT64_MAX,
921
      .deleteMark = getDeleteMark(&pCountNode->window, 0),
×
922
  };
923

924
  pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
×
925
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
×
926
                                sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
927
                                GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
×
928
                                STREAM_STATE_BUFF_SORT, 1);
929
  QUERY_CHECK_CODE(code, lino, _error);
×
930

931
  pInfo->streamAggSup.windowCount = pCountNode->windowCount;
×
932
  pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
×
933

934
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
935
  QUERY_CHECK_CODE(code, lino, _error);
×
936

937
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
938
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
×
939
  QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
×
940
  pInfo->pDelIterator = NULL;
×
941

942
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
×
943
  QUERY_CHECK_CODE(code, lino, _error);
×
944

945
  pInfo->ignoreExpiredData = pCountNode->window.igExpired;
×
946
  pInfo->ignoreExpiredDataSaved = false;
×
947
  pInfo->pUpdated = NULL;
×
948
  pInfo->pStUpdated = NULL;
×
949
  pInfo->dataVersion = 0;
×
950
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
×
951
  if (!pInfo->historyWins) {
×
952
    code = terrno;
×
953
    QUERY_CHECK_CODE(code, lino, _error);
×
954
  }
955

956
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
957
  QUERY_CHECK_CODE(code, lino, _error);
×
958

959
  pInfo->recvGetAll = false;
×
960
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
×
961
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
×
962
  pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
×
963

964
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
×
965
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
×
966
                  OP_NOT_OPENED, pInfo, pTaskInfo);
967
  // for stream
968
  void*   buff = NULL;
×
969
  int32_t len = 0;
×
970
  int32_t res =
971
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
×
972
                                                        strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
973
  if (res == TSDB_CODE_SUCCESS) {
×
974
    code = doStreamCountDecodeOpState(buff, len, pOperator, true);
×
975
    QUERY_CHECK_CODE(code, lino, _error);
×
976
    taosMemoryFree(buff);
×
977
  }
978
  pInfo->pOperator = pOperator;
×
979
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
×
980
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
981
  setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
×
982

983
  code = initStreamBasicInfo(&pInfo->basic, pOperator);
×
984
  QUERY_CHECK_CODE(code, lino, _error);
×
985

986
  if (downstream) {
×
987
    code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
×
988
                          &pInfo->twAggSup, &pInfo->basic);
×
989
    QUERY_CHECK_CODE(code, lino, _error);
×
990

991
    code = appendDownstream(pOperator, &downstream, 1);
×
992
    QUERY_CHECK_CODE(code, lino, _error);
×
993
  }
994

995
  *pOptrInfo = pOperator;
×
996
  return TSDB_CODE_SUCCESS;
×
997

998
_error:
×
999
  if (pInfo != NULL) {
×
1000
    destroyStreamCountAggOperatorInfo(pInfo);
×
1001
  }
1002

1003
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1004
  pTaskInfo->code = code;
×
1005
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1006
  return code;
×
1007
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc