• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.74
/source/libs/executor/src/streameventwindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "streamexecutorInt.h"
22
#include "tchecksum.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "tfill.h"
27
#include "tglobal.h"
28
#include "tlog.h"
29
#include "ttime.h"
30

31
#define IS_NORMAL_EVENT_OP(op)          ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT)
32
#define STREAM_EVENT_OP_STATE_NAME      "StreamEventHistoryState"
33
#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint"
34

35
typedef struct SEventWinfowFlag {
36
  bool startFlag;
37
  bool endFlag;
38
} SEventWinfowFlag;
39

40
typedef struct SEventWindowInfo {
41
  SResultWindowInfo winInfo;
42
  SEventWinfowFlag* pWinFlag;
43
} SEventWindowInfo;
44

45
void destroyStreamEventOperatorInfo(void* param) {
16✔
46
  if (param == NULL) {
16!
47
    return;
×
48
  }
49
  SStreamEventAggOperatorInfo* pInfo = (SStreamEventAggOperatorInfo*)param;
16✔
50
  cleanupBasicInfo(&pInfo->binfo);
16✔
51
  if (pInfo->pOperator) {
16!
52
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
16✔
53
                              &pInfo->groupResInfo);
54
    pInfo->pOperator = NULL;
16✔
55
  }
56
  destroyStreamAggSupporter(&pInfo->streamAggSup);
16✔
57
  clearGroupResInfo(&pInfo->groupResInfo);
16✔
58
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
16✔
59
  pInfo->pUpdated = NULL;
16✔
60

61
  cleanupExprSupp(&pInfo->scalarSupp);
16✔
62
  if (pInfo->pChildren != NULL) {
16!
63
    int32_t size = taosArrayGetSize(pInfo->pChildren);
×
64
    for (int32_t i = 0; i < size; i++) {
×
65
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
×
66
      destroyOperator(pChild);
×
67
    }
68
    taosArrayDestroy(pInfo->pChildren);
×
69
  }
70
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
16✔
71
  blockDataDestroy(pInfo->pDelRes);
16✔
72
  tSimpleHashCleanup(pInfo->pSeUpdated);
16✔
73
  tSimpleHashCleanup(pInfo->pAllUpdated);
16✔
74
  tSimpleHashCleanup(pInfo->pSeDeleted);
16✔
75
  cleanupGroupResInfo(&pInfo->groupResInfo);
16✔
76

77
  taosArrayDestroy(pInfo->historyWins);
16✔
78
  blockDataDestroy(pInfo->pCheckpointRes);
16✔
79

80
  tSimpleHashCleanup(pInfo->pPkDeleted);
16✔
81

82
  if (pInfo->pStartCondInfo != NULL) {
16!
83
    filterFreeInfo(pInfo->pStartCondInfo);
16✔
84
    pInfo->pStartCondInfo = NULL;
16✔
85
  }
86

87
  if (pInfo->pEndCondInfo != NULL) {
16!
88
    filterFreeInfo(pInfo->pEndCondInfo);
16✔
89
    pInfo->pEndCondInfo = NULL;
16✔
90
  }
91

92
  taosMemoryFreeClear(param);
16!
93
}
94

95
void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo) {
98✔
96
  char* pFlagInfo = (char*)pWinInfo->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize);
98✔
97
  pWinInfo->pWinFlag = (SEventWinfowFlag*)pFlagInfo;
98✔
98
}
98✔
99

100
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos,
90✔
101
                        SEventWindowInfo* pWinInfo) {
102
  pWinInfo->winInfo.sessionWin = *pKey;
90✔
103
  pWinInfo->winInfo.pStatePos = pPos;
90✔
104
  setEventWindowFlag(pAggSup, pWinInfo);
90✔
105
}
90✔
106

107
int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) {
6✔
108
  for (int32_t i = start; i < rows; i++) {
12✔
109
    if (pEnd[i]) {
8✔
110
      return i;
2✔
111
    }
112
  }
113
  return -1;
4✔
114
}
115

116
static bool isWindowIncomplete(SEventWindowInfo* pWinInfo) {
101✔
117
  return !(pWinInfo->pWinFlag->startFlag && pWinInfo->pWinFlag->endFlag);
101✔
118
}
119
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
130✔
120
  pAPI->streamStateReleaseBuf(pState, pPos, true);
130✔
121
}
130✔
122

123
int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd,
72✔
124
                          int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
125
  int32_t code = TSDB_CODE_SUCCESS;
72✔
126
  int32_t lino = 0;
72✔
127
  int32_t winCode = TSDB_CODE_SUCCESS;
72✔
128
  int32_t size = pAggSup->resultRowSize;
72✔
129
  TSKEY   ts = pTs[index];
72✔
130
  bool    start = pStart[index];
72✔
131
  bool    end = pEnd[index];
72✔
132
  pCurWin->winInfo.sessionWin.groupId = groupId;
72✔
133
  pCurWin->winInfo.sessionWin.win.skey = ts;
72✔
134
  pCurWin->winInfo.sessionWin.win.ekey = ts;
72✔
135
  SStreamStateCur* pCur =
136
      pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
72✔
137
  SSessionKey leftWinKey = {.groupId = groupId};
72✔
138
  void*       pVal = NULL;
72✔
139
  int32_t     len = 0;
72✔
140
  winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len);
72✔
141
  if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win)) {
72!
142
    bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0);
44✔
143
    setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
44✔
144
    if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) {
44✔
145
      pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
26✔
146
      goto _end;
26✔
147
    }
148
  }
149
  pAggSup->stateStore.streamStateFreeCur(pCur);
46✔
150
  pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
46✔
151
  SSessionKey rightWinKey = {.groupId = groupId};
46✔
152
  winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
46✔
153
  bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0);
46✔
154
  if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) {
46!
155
    int32_t endi = getEndCondIndex(pEnd, index, rows);
6✔
156
    if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
6!
157
      setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
4✔
158
      pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
4✔
159
      goto _end;
4✔
160
    }
161
  }
162

163
  SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
42✔
164
  code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
42✔
165
  QUERY_CHECK_CODE(code, lino, _error);
42!
166

167
  setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
42✔
168
  pCurWin->pWinFlag->startFlag = start;
42✔
169
  pCurWin->pWinFlag->endFlag = end;
42✔
170
  pCurWin->winInfo.isOutput = false;
42✔
171

172
_end:
72✔
173
  reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
72✔
174
  pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
72✔
175
  pNextWinKey->groupId = groupId;
72✔
176
  winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0);
72✔
177
  if (winCode != TSDB_CODE_SUCCESS) {
72✔
178
    SET_SESSION_WIN_KEY_INVALID(pNextWinKey);
61✔
179
  }
180
  if (pCurWin->winInfo.pStatePos->needFree) {
72✔
181
    pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
5✔
182
  }
183
  pAggSup->stateStore.streamStateFreeCur(pCur);
72✔
184
  qDebug("===stream===set event cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
72!
185
         pCurWin->winInfo.sessionWin.win.ekey);
186

187
_error:
×
188
  if (code != TSDB_CODE_SUCCESS) {
72!
189
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
190
  }
191
  return code;
72✔
192
}
193

194
int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey,
72✔
195
                              TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start,
196
                              SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild,
197
                              int32_t* pWinRow) {
198
  int32_t code = TSDB_CODE_SUCCESS;
72✔
199
  int32_t lino = 0;
72✔
200
  *pRebuild = false;
72✔
201
  if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) {
72✔
202
    (*pWinRow) = 1;
12✔
203
    goto _end;
12✔
204
  }
205

206
  TSKEY        maxTs = INT64_MAX;
60✔
207
  STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win;
60✔
208
  if (pWinInfo->pWinFlag->endFlag) {
60✔
209
    maxTs = pWin->ekey + 1;
17✔
210
  }
211
  if (!IS_INVALID_SESSION_WIN_KEY(*pNextWinKey)) {
60✔
212
    maxTs = TMIN(maxTs, pNextWinKey->win.skey);
9✔
213
  }
214

215
  for (int32_t i = start; i < rows; ++i) {
106✔
216
    if (pTsData[i] >= maxTs) {
79!
217
      (*pWinRow) = i - start;
×
218
      goto _end;
×
219
    }
220

221
    if (pWin->skey > pTsData[i]) {
79✔
222
      if (pStDeleted && pWinInfo->winInfo.isOutput) {
4!
223
        code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
1✔
224
        QUERY_CHECK_CODE(code, lino, _end);
1!
225
      }
226
      removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
4✔
227
      pWin->skey = pTsData[i];
4✔
228
      pWinInfo->pWinFlag->startFlag = starts[i];
4✔
229
    } else if (pWin->skey == pTsData[i]) {
75✔
230
      pWinInfo->pWinFlag->startFlag |= starts[i];
31✔
231
    }
232

233
    if (pWin->ekey < pTsData[i]) {
79✔
234
      pWin->ekey = pTsData[i];
39✔
235
      pWinInfo->pWinFlag->endFlag = ends[i];
39✔
236
    } else if (pWin->ekey == pTsData[i]) {
40✔
237
      pWinInfo->pWinFlag->endFlag |= ends[i];
33✔
238
    } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
7!
239
      *pRebuild = true;
×
240
      pWinInfo->pWinFlag->endFlag |= ends[i];
×
241
      (*pWinRow) = i + 1 - start;
×
242
      goto _end;
×
243
    }
244

245
    memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
79✔
246

247
    if (ends[i]) {
79✔
248
      if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey) {
33!
249
        *pRebuild = true;
1✔
250
      }
251
      (*pWinRow) = i + 1 - start;
33✔
252
      goto _end;
33✔
253
    }
254
  }
255
  (*pWinRow) = rows - start;
27✔
256

257
_end:
72✔
258
  if (code != TSDB_CODE_SUCCESS) {
72!
259
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
260
  }
261
  return code;
72✔
262
}
263

264
static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated,
73✔
265
                                  SSHashObj* pStDeleted, bool addGap) {
266
  int32_t                      code = TSDB_CODE_SUCCESS;
73✔
267
  int32_t                      lino = 0;
73✔
268
  SExprSupp*                   pSup = &pOperator->exprSupp;
73✔
269
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
73✔
270
  SStorageAPI*                 pAPI = &pOperator->pTaskInfo->storageAPI;
73✔
271
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
73✔
272
  SResultRow*                  pCurResult = NULL;
73✔
273
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
73✔
274
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
73✔
275
  while (1) {
4✔
276
    if (!pCurWin->pWinFlag->startFlag || pCurWin->pWinFlag->endFlag) {
77✔
277
      break;
278
    }
279
    SEventWindowInfo nextWinInfo = {0};
26✔
280
    getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo);
26✔
281
    if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) ||
26✔
282
        !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
4!
283
      releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
22✔
284
      break;
22✔
285
    }
286
    setEventWindowFlag(pAggSup, &nextWinInfo);
4✔
287
    code = compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo,
4✔
288
                             pStUpdated, pStDeleted, false);
289
    QUERY_CHECK_CODE(code, lino, _end);
4!
290

291
    pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag;
4✔
292
  }
293

294
_end:
73✔
295
  if (code != TSDB_CODE_SUCCESS) {
73!
296
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
297
  }
298
  return code;
73✔
299
}
300

301
void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SSessionKey* pKey) {
1✔
302
  pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, pKey);
1✔
303
  removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
1✔
304
}
1✔
305

306
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
67✔
307
                                 SSHashObj* pStDeleted) {
308
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
67✔
309
  SStorageAPI*                 pAPI = &pOperator->pTaskInfo->storageAPI;
67✔
310
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
67✔
311
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
67✔
312
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
67✔
313
  int32_t                      code = TSDB_CODE_SUCCESS;
67✔
314
  int32_t                      lino = 0;
67✔
315
  TSKEY*                       tsCols = NULL;
67✔
316
  SResultRow*                  pResult = NULL;
67✔
317
  int32_t                      winRows = 0;
67✔
318
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
67✔
319
  SColumnInfoData*             pColStart = NULL;
67✔
320
  SColumnInfoData*             pColEnd = NULL;
67✔
321

322
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
67✔
323
  pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
67✔
324
  if (pAggSup->winRange.ekey <= 0) {
67!
325
    pAggSup->winRange.ekey = INT64_MAX;
×
326
  }
327

328
  if (pSDataBlock->pDataBlock != NULL) {
67!
329
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
67✔
330
    if (!pColDataInfo) {
67!
331
      code = TSDB_CODE_FAILED;
×
332
      QUERY_CHECK_CODE(code, lino, _end);
×
333
    }
334
    tsCols = (int64_t*)pColDataInfo->pData;
67✔
335
  } else {
336
    return;
×
337
  }
338

339
  SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
67✔
340
                                   .pDataBlock = pSDataBlock->pDataBlock};
67✔
341
  code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &paramStart);
67✔
342
  if (code != TSDB_CODE_SUCCESS) {
67!
343
    qError("set data from start slotId error.");
×
344
    goto _end;
×
345
  }
346
  int32_t statusStart = 0;
67✔
347
  code = filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart);
67✔
348
  QUERY_CHECK_CODE(code, lino, _end);
67!
349

350
  SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
67✔
351
                                 .pDataBlock = pSDataBlock->pDataBlock};
67✔
352
  code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &paramEnd);
67✔
353
  if (code != TSDB_CODE_SUCCESS) {
67!
354
    qError("set data from end slotId error.");
×
355
    goto _end;
×
356
  }
357

358
  int32_t statusEnd = 0;
67✔
359
  code = filterExecute(pInfo->pEndCondInfo, pSDataBlock, &pColEnd, NULL, paramEnd.numOfCols, &statusEnd);
67✔
360
  QUERY_CHECK_CODE(code, lino, _end);
67!
361

362
  int32_t rows = pSDataBlock->info.rows;
67✔
363
  code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
67✔
364
  QUERY_CHECK_CODE(code, lino, _end);
67!
365

366
  for (int32_t i = 0; i < rows; i += winRows) {
139✔
367
    if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
76!
368
                                                     &pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i], NULL, 0)) {
4✔
369
      i++;
×
370
      continue;
33✔
371
    }
372
    int32_t          winIndex = 0;
72✔
373
    bool             allEqual = true;
72✔
374
    SEventWindowInfo curWin = {0};
72✔
375
    SSessionKey      nextWinKey = {0};
72✔
376
    code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
72✔
377
                             &nextWinKey);
378
    QUERY_CHECK_CODE(code, lino, _end);
72!
379

380
    setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
72✔
381
    bool rebuild = false;
72✔
382
    code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
72✔
383
                                 rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows);
384
    QUERY_CHECK_CODE(code, lino, _end);
72!
385

386
    if (rebuild) {
72✔
387
      uint64_t uid = 0;
1✔
388
      code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
1✔
389
                                      &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
390
      QUERY_CHECK_CODE(code, lino, _end);
1!
391

392
      int32_t tmpRes = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
1✔
393
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
1!
394

395
      doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
1✔
396
      if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) &&
1!
397
          !isWindowIncomplete(&curWin)) {
×
398
        code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
399
        QUERY_CHECK_CODE(code, lino, _end);
×
400
      }
401
      releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
1✔
402
      SSessionKey tmpSeInfo = {0};
1✔
403
      getSessionHashKey(&curWin.winInfo.sessionWin, &tmpSeInfo);
1✔
404
      code = tSimpleHashPut(pStDeleted, &tmpSeInfo, sizeof(SSessionKey), NULL, 0);
1✔
405
      QUERY_CHECK_CODE(code, lino, _end);
1!
406
      continue;
1✔
407
    }
408
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
71✔
409
                              pOperator, 0);
410
    QUERY_CHECK_CODE(code, lino, _end);
71!
411

412
    code = compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
71✔
413
    QUERY_CHECK_CODE(code, lino, _end);
71!
414

415
    code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
71✔
416
    QUERY_CHECK_CODE(code, lino, _end);
71!
417

418
    if (pInfo->isHistoryOp) {
71✔
419
      code = saveResult(curWin.winInfo, pInfo->pAllUpdated);
7✔
420
      QUERY_CHECK_CODE(code, lino, _end);
7!
421
    }
422

423
    if (isWindowIncomplete(&curWin)) {
71✔
424
      releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
32✔
425
      continue;
32✔
426
    }
427

428
    if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator)) {
39!
429
      code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
×
430
      QUERY_CHECK_CODE(code, lino, _end);
×
431
    }
432

433
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
39✔
434
      code = saveResult(curWin.winInfo, pSeUpdated);
37✔
435
      QUERY_CHECK_CODE(code, lino, _end);
37!
436
    }
437

438
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
39✔
439
      curWin.winInfo.pStatePos->beUpdated = true;
2✔
440
      SSessionKey key = {0};
2✔
441
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
2✔
442
      code =
443
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
2✔
444
      QUERY_CHECK_CODE(code, lino, _end);
2!
445
    }
446
  }
447

448
_end:
67✔
449
  colDataDestroy(pColStart);
67✔
450
  taosMemoryFree(pColStart);
67✔
451
  colDataDestroy(pColEnd);
67✔
452
  taosMemoryFree(pColEnd);
67✔
453
  if (code != TSDB_CODE_SUCCESS) {
67!
454
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
455
  }
456
}
457

458
int32_t doStreamEventEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
×
459
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
×
460
  if (!pInfo) {
×
461
    return 0;
×
462
  }
463

464
  void* pData = (buf == NULL) ? NULL : *buf;
×
465

466
  // 1.streamAggSup.pResultRows
467
  int32_t tlen = 0;
×
468
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
469
  tlen += taosEncodeFixedI32(buf, mapSize);
×
470
  void*   pIte = NULL;
×
471
  size_t  keyLen = 0;
×
472
  int32_t iter = 0;
×
473
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
474
    void* key = tSimpleHashGetKey(pIte, &keyLen);
×
475
    tlen += encodeSSessionKey(buf, key);
×
476
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
477
  }
478

479
  // 2.twAggSup
480
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
481

482
  // 3.dataVersion
483
  tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
×
484

485
  // 4.checksum
486
  if (buf) {
×
487
    uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
488
    tlen += taosEncodeFixedU32(buf, cksum);
×
489
  } else {
490
    tlen += sizeof(uint32_t);
×
491
  }
492

493
  return tlen;
×
494
}
495

496
int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
×
497
  int32_t                      code = TSDB_CODE_SUCCESS;
×
498
  int32_t                      lino = 0;
×
499
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
×
500
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
×
501
  if (!pInfo) {
×
502
    code = TSDB_CODE_FAILED;
×
503
    QUERY_CHECK_CODE(code, lino, _end);
×
504
  }
505
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
×
506

507
  // 4.checksum
508
  int32_t dataLen = len - sizeof(uint32_t);
×
509
  void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
510
  if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
511
    qError("stream event state is invalid");
×
512
    code = TSDB_CODE_FAILED;
×
513
    QUERY_CHECK_CODE(code, lino, _end);
×
514
  }
515

516
  // 1.streamAggSup.pResultRows
517
  int32_t mapSize = 0;
×
518
  buf = taosDecodeFixedI32(buf, &mapSize);
×
519
  for (int32_t i = 0; i < mapSize; i++) {
×
520
    SResultWindowInfo winfo = {0};
×
521
    buf = decodeSSessionKey(buf, &winfo.sessionWin);
×
522
    int32_t winCode = TSDB_CODE_SUCCESS;
×
523
    code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
×
524
        pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
525
    QUERY_CHECK_CODE(code, lino, _end);
×
526

527
    buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
×
528
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
×
529
                          sizeof(SResultWindowInfo));
530
    QUERY_CHECK_CODE(code, lino, _end);
×
531
  }
532

533
  // 2.twAggSup
534
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
535

536
  // 3.dataVersion
537
  buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
×
538

539
_end:
×
540
  if (code != TSDB_CODE_SUCCESS) {
×
541
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
542
  }
543
  return code;
×
544
}
545

546
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
1✔
547
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
1✔
548
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
1!
549
    int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
×
550
    void*   buf = taosMemoryCalloc(1, len);
×
551
    if (!buf) {
×
552
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
553
      return;
×
554
    }
555
    void* pBuf = buf;
×
556
    len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
×
557
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
×
558
                                                       strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
559
    taosMemoryFree(buf);
×
560
    saveStreamOperatorStateComplete(&pInfo->basic);
×
561
  }
562
}
563

564
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
105✔
565
  int32_t                      code = TSDB_CODE_SUCCESS;
105✔
566
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
105✔
567
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
105✔
568
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
105✔
569

570
  doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
105✔
571
  if (pInfo->pDelRes->info.rows > 0) {
105✔
572
    printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
573
    (*ppRes) = pInfo->pDelRes;
10✔
574
    return code;
10✔
575
  }
576

577
  doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
95✔
578
  if (pBInfo->pRes->info.rows > 0) {
95✔
579
    printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
27✔
580
    (*ppRes) = pBInfo->pRes;
27✔
581
    return code;
27✔
582
  }
583
  (*ppRes) = NULL;
68✔
584
  return code;
68✔
585
}
586

587
static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
108✔
588
  if (pOperator->status == OP_EXEC_DONE) {
108!
589
    (*ppRes) = NULL;
×
590
    return TSDB_CODE_SUCCESS;
×
591
  }
592

593
  int32_t                      code = TSDB_CODE_SUCCESS;
108✔
594
  int32_t                      lino = 0;
108✔
595
  SExprSupp*                   pSup = &pOperator->exprSupp;
108✔
596
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
108✔
597
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
108✔
598
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
108✔
599
  qDebug("===stream=== stream event agg. history task:%d, taskId:%s", pInfo->isHistoryOp, GET_TASKID(pTaskInfo));
108!
600
  if (pOperator->status == OP_RES_TO_RETURN) {
108✔
601
    SSDataBlock* resBlock = NULL;
38✔
602
    code = buildEventResult(pOperator, &resBlock);
38✔
603
    QUERY_CHECK_CODE(code, lino, _end);
38!
604
    if (resBlock != NULL) {
38✔
605
      (*ppRes) = resBlock;
7✔
606
      return code;
38✔
607
    }
608

609
    if (pInfo->recvGetAll) {
31✔
610
      pInfo->recvGetAll = false;
1✔
611
      resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
1✔
612
    }
613

614
    if (pInfo->reCkBlock) {
31✔
615
      pInfo->reCkBlock = false;
1✔
616
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
1✔
617
      (*ppRes) = pInfo->pCheckpointRes;
1✔
618
      return code;
1✔
619
    }
620

621
    setStreamOperatorCompleted(pOperator);
30✔
622
    (*ppRes) = NULL;
30✔
623
    return code;
30✔
624
  }
625

626
  SOperatorInfo* downstream = pOperator->pDownstream[0];
70✔
627
  if (!pInfo->pUpdated) {
70✔
628
    pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo));
67✔
629
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
67!
630
  }
631
  if (!pInfo->pSeUpdated) {
70✔
632
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
66✔
633
    pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
66✔
634
    QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
66!
635
  }
636
  while (1) {
82✔
637
    SSDataBlock* pBlock = NULL;
152✔
638

639
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
152✔
640
    QUERY_CHECK_CODE(code, lino, _end);
152!
641

642
    if (pBlock == NULL) {
152✔
643
      break;
67✔
644
    }
645

646
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
85✔
647
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
85✔
648

649
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
85✔
650
        pBlock->info.type == STREAM_CLEAR) {
81✔
651
      bool add = pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator);
12!
652
      code = deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted,
12✔
653
                                   pInfo->pPkDeleted, add);
654
      QUERY_CHECK_CODE(code, lino, _end);
12!
655
      continue;
15✔
656
    } else if (pBlock->info.type == STREAM_GET_ALL) {
73✔
657
      pInfo->recvGetAll = true;
2✔
658
      code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
2✔
659
      QUERY_CHECK_CODE(code, lino, _end);
2!
660
      continue;
2✔
661
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
71✔
662
      (*ppRes) = pBlock;
3✔
663
      return code;
3✔
664
    } else if (pBlock->info.type == STREAM_CHECKPOINT) {
68✔
665
      pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
1✔
666
      doStreamEventSaveCheckpoint(pOperator);
1✔
667
      pInfo->reCkBlock = true;
1✔
668
      code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
1✔
669
      QUERY_CHECK_CODE(code, lino, _end);
1!
670
      continue;
1✔
671
    } else {
672
      if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
67!
673
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
674
        QUERY_CHECK_CODE(code, lino, _end);
×
675
      }
676
    }
677

678
    if (pInfo->scalarSupp.pExprInfo != NULL) {
67!
679
      SExprSupp* pExprSup = &pInfo->scalarSupp;
×
680
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
681
      QUERY_CHECK_CODE(code, lino, _end);
×
682
    }
683
    // the pDataBlock are always the same one, no need to call this again
684
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
67✔
685
    QUERY_CHECK_CODE(code, lino, _end);
67!
686
    doStreamEventAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
67✔
687
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
67✔
688
  }
689
  // restore the value
690
  pOperator->status = OP_RES_TO_RETURN;
67✔
691

692
  code = closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
67✔
693
  QUERY_CHECK_CODE(code, lino, _end);
67!
694

695
  code = copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
67✔
696
  QUERY_CHECK_CODE(code, lino, _end);
67!
697

698
  removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated);
67✔
699

700
  if (pInfo->isHistoryOp) {
67✔
701
    SArray* pHisWins = taosArrayInit(16, sizeof(SEventWindowInfo));
1✔
702
    if (!pHisWins) {
1!
703
      code = terrno;
×
704
      QUERY_CHECK_CODE(code, lino, _end);
×
705
    }
706

707
    code = copyUpdateResult(&pInfo->pAllUpdated, pHisWins, sessionKeyCompareAsc);
1✔
708
    QUERY_CHECK_CODE(code, lino, _end);
1!
709

710
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1✔
711
    pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
1✔
712
    QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _end, terrno);
1!
713

714
    code = getMaxTsWins(pHisWins, pInfo->historyWins);
1✔
715
    QUERY_CHECK_CODE(code, lino, _end);
1!
716

717
    taosArrayDestroy(pHisWins);
1✔
718
  }
719
  if (pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator)) {
67!
720
    code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
×
721
    QUERY_CHECK_CODE(code, lino, _end);
×
722
  }
723

724
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
67✔
725
  pInfo->pUpdated = NULL;
67✔
726
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
67✔
727
  QUERY_CHECK_CODE(code, lino, _end);
67!
728

729
  SSDataBlock* resBlock = NULL;
67✔
730
  code = buildEventResult(pOperator, &resBlock);
67✔
731
  QUERY_CHECK_CODE(code, lino, _end);
67!
732
  if (resBlock != NULL) {
67✔
733
    (*ppRes) = resBlock;
30✔
734
    return code;
30✔
735
  }
736

737
_end:
37✔
738
  if (code != TSDB_CODE_SUCCESS) {
37!
739
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
740
    pTaskInfo->code = code;
×
741
    T_LONG_JMP(pTaskInfo->env, code);
×
742
  }
743
  setStreamOperatorCompleted(pOperator);
37✔
744
  (*ppRes) = NULL;
37✔
745
  return code;
37✔
746
}
747

748
void streamEventReleaseState(SOperatorInfo* pOperator) {
1✔
749
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
1✔
750
  int32_t                      winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
1✔
751
  int32_t                      resSize = winSize + sizeof(TSKEY);
1✔
752
  char*                        pBuff = taosMemoryCalloc(1, resSize);
1✔
753
  if (!pBuff) {
1!
754
    return ;
×
755
  }
756
  memcpy(pBuff, pInfo->historyWins->pData, winSize);
1✔
757
  memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
1✔
758
  qDebug("===stream=== event window operator relase state. save result count:%d",
1!
759
         (int32_t)taosArrayGetSize(pInfo->historyWins));
760
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME,
1✔
761
                                                     strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize);
762
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
1✔
763
  taosMemoryFreeClear(pBuff);
1!
764

765
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1✔
766
  if (downstream->fpSet.releaseStreamStateFn) {
1!
767
    downstream->fpSet.releaseStreamStateFn(downstream);
1✔
768
  }
769
}
770

771
void streamEventReloadState(SOperatorInfo* pOperator) {
1✔
772
  int32_t                      code = TSDB_CODE_SUCCESS;
1✔
773
  int32_t                      lino = 0;
1✔
774
  SStreamEventAggOperatorInfo* pInfo = pOperator->info;
1✔
775
  SStreamAggSupporter*         pAggSup = &pInfo->streamAggSup;
1✔
776
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
1✔
777
  resetWinRange(&pAggSup->winRange);
1✔
778

779
  SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
1✔
780
  int32_t     size = 0;
1✔
781
  void*       pBuf = NULL;
1✔
782
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_EVENT_OP_STATE_NAME,
1✔
783
                                                strlen(STREAM_EVENT_OP_STATE_NAME), &pBuf, &size);
784
  QUERY_CHECK_CODE(code, lino, _end);
1!
785

786
  int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
1✔
787
  qDebug("===stream=== event window operator reload state. get result count:%d", num);
1!
788
  SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
1✔
789

790
  TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
1✔
791
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
1✔
792
  pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
1✔
793

794
  if (!pInfo->pSeUpdated && num > 0) {
1!
795
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1✔
796
    pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
1✔
797
    QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
1!
798
  }
799
  if (!pInfo->pSeDeleted && num > 0) {
1!
800
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
801
    pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
×
802
    QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno);
×
803
  }
804
  for (int32_t i = 0; i < num; i++) {
5✔
805
    SEventWindowInfo curInfo = {0};
4✔
806
    qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
4!
807
           pSeKeyBuf[i].groupId, i);
808
    code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
4✔
809
    QUERY_CHECK_CODE(code, lino, _end);
4!
810

811
    // event window has been deleted
812
    if (!IS_VALID_SESSION_WIN(curInfo.winInfo)) {
4!
813
      continue;
4✔
814
    }
815
    setEventWindowFlag(pAggSup, &curInfo);
4✔
816
    if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) {
4✔
817
      code = saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
2✔
818
      QUERY_CHECK_CODE(code, lino, _end);
2!
819
      continue;
2✔
820
    }
821

822
    code = compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
2✔
823
    qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
2!
824
           curInfo.winInfo.sessionWin.groupId);
825
    QUERY_CHECK_CODE(code, lino, _end);
2!
826

827
    if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
2!
828
      code = saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
2✔
829
      QUERY_CHECK_CODE(code, lino, _end);
2!
830
    }
831

832
    if (!curInfo.pWinFlag->endFlag) {
2!
833
      continue;
2✔
834
    }
835

836
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
×
837
      code = saveResult(curInfo.winInfo, pInfo->pSeUpdated);
×
838
      QUERY_CHECK_CODE(code, lino, _end);
×
839
    } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
×
840
      if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) {
×
841
        code = saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
×
842
        QUERY_CHECK_CODE(code, lino, _end);
×
843
      }
844
      SSessionKey key = {0};
×
845
      getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
×
846
      code =
847
          tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
×
848
      QUERY_CHECK_CODE(code, lino, _end);
×
849
    }
850
  }
851
  taosMemoryFree(pBuf);
1✔
852

853
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1✔
854
  if (downstream->fpSet.reloadStreamStateFn) {
1!
855
    downstream->fpSet.reloadStreamStateFn(downstream);
1✔
856
  }
857
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
1✔
858

859
_end:
1✔
860
  if (code != TSDB_CODE_SUCCESS) {
1!
861
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
862
  }
863
}
1✔
864

865
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
16✔
866
                                         SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
867
  QRY_PARAM_CHECK(pOptrInfo);
16!
868

869
  SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
16✔
870
  int32_t                      tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;
16✔
871
  int32_t                      code = TSDB_CODE_SUCCESS;
16✔
872
  int32_t                      lino = 0;
16✔
873
  SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo));
16✔
874
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
16✔
875
  if (pInfo == NULL || pOperator == NULL) {
16!
876
    code = terrno;
×
877
    goto _error;
×
878
  }
879

880
  initResultSizeInfo(&pOperator->resultInfo, 4096);
16✔
881
  if (pEventNode->window.pExprs != NULL) {
16!
882
    int32_t    numOfScalar = 0;
×
883
    SExprInfo* pScalarExprInfo = NULL;
×
884
    code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
885
    QUERY_CHECK_CODE(code, lino, _error);
×
886

887
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
888
    if (code != TSDB_CODE_SUCCESS) {
×
889
      goto _error;
×
890
    }
891
  }
892

893
  pInfo->twAggSup = (STimeWindowAggSupp){
16✔
894
      .waterMark = pEventNode->window.watermark,
16✔
895
      .calTrigger = pEventNode->window.triggerType,
16✔
896
      .maxTs = INT64_MIN,
897
      .minTs = INT64_MAX,
898
      .deleteMark = getDeleteMark(&pEventNode->window, 0),
16✔
899
  };
900

901
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
16✔
902
  QUERY_CHECK_CODE(code, lino, _error);
16!
903

904
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
16✔
905
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
16!
906
  pInfo->binfo.pRes = pResBlock;
16✔
907

908
  SExprSupp*   pExpSup = &pOperator->exprSupp;
16✔
909
  int32_t      numOfCols = 0;
16✔
910
  SExprInfo*   pExprInfo = NULL;
16✔
911
  code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
16✔
912
  QUERY_CHECK_CODE(code, lino, _error);
16!
913

914
  code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
16✔
915
  QUERY_CHECK_CODE(code, lino, _error);
16!
916

917
  pInfo->primaryTsIndex = tsSlotId;
16✔
918
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
16✔
919
                                sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
920
                                &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex,
16✔
921
                                STREAM_STATE_BUFF_SORT, 1);
922
  QUERY_CHECK_CODE(code, lino, _error);
16!
923

924
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
16✔
925
  pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
16✔
926
  QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno);
16!
927
  pInfo->pDelIterator = NULL;
16✔
928
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
16✔
929
  QUERY_CHECK_CODE(code, lino, _error);
16!
930

931
  pInfo->pChildren = NULL;
16✔
932
  pInfo->ignoreExpiredData = pEventNode->window.igExpired;
16✔
933
  pInfo->ignoreExpiredDataSaved = false;
16✔
934
  pInfo->pUpdated = NULL;
16✔
935
  pInfo->pSeUpdated = NULL;
16✔
936
  pInfo->dataVersion = 0;
16✔
937
  pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
16✔
938
  if (!pInfo->historyWins) {
16!
939
    goto _error;
×
940
  }
941
  if (pHandle) {
16!
942
    pInfo->isHistoryOp = pHandle->fillHistory;
16✔
943
  }
944

945
  if (pInfo->isHistoryOp) {
16✔
946
    pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
1✔
947
    QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _error, terrno);
1!
948
  } else {
949
    pInfo->pAllUpdated = NULL;
15✔
950
  }
951

952
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
16✔
953
  QUERY_CHECK_CODE(code, lino, _error);
16!
954

955
  pInfo->reCkBlock = false;
16✔
956
  pInfo->recvGetAll = false;
16✔
957
  pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
16✔
958
  QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
16!
959
  pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey;
16✔
960

961
  pInfo->pOperator = pOperator;
16✔
962
  setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
16✔
963
                  pInfo, pTaskInfo);
964
  // for stream
965
  void*   buff = NULL;
16✔
966
  int32_t len = 0;
16✔
967
  int32_t res =
968
      pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
16✔
969
                                                        strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), &buff, &len);
970
  if (res == TSDB_CODE_SUCCESS) {
16!
971
    code = doStreamEventDecodeOpState(buff, len, pOperator);
×
972
    taosMemoryFree(buff);
×
973
    QUERY_CHECK_CODE(code, lino, _error);
×
974
  }
975

976
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAggNext, NULL, destroyStreamEventOperatorInfo,
16✔
977
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
978
  setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
16✔
979
  code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
16✔
980
                        &pInfo->twAggSup, &pInfo->basic);
16✔
981
  QUERY_CHECK_CODE(code, lino, _error);
16!
982

983
  code = appendDownstream(pOperator, &downstream, 1);
16✔
984
  QUERY_CHECK_CODE(code, lino, _error);
16!
985

986
  code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0);
16✔
987
  QUERY_CHECK_CODE(code, lino, _error);
15!
988

989
  code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
15✔
990
  QUERY_CHECK_CODE(code, lino, _error);
16!
991

992
  *pOptrInfo = pOperator;
16✔
993
  return TSDB_CODE_SUCCESS;
16✔
994

995
_error:
×
996
  if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
×
997
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
998
  pTaskInfo->code = code;
×
999
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1000
  return code;
×
1001
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc