• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4131

20 May 2025 07:22AM UTC coverage: 63.096% (+0.7%) from 62.384%
#4131

push

travis-ci

web-flow
docs(datain): add topic meta options docs in tmq (#31147)

157751 of 318088 branches covered (49.59%)

Branch coverage included in aggregate %.

243052 of 317143 relevant lines covered (76.64%)

18743283.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.86
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
#define MAX_SCAN_RANGE_SIZE 102400
24

25
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
43,995✔
26
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
43,995✔
27
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
43,983✔
28
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
43,983✔
29
}
30

31
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
1,045✔
32
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
1,045✔
33
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
1,045✔
34
  return sessionRangeKeyCmpr(pWin1, pWin2);
1,045✔
35
}
36

37
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
32,617✔
38
  int firstPos = 0, lastPos = num - 1, midPos = -1;
32,617✔
39
  int numOfRows = 0;
32,617✔
40

41
  if (num <= 0) return -1;
32,617✔
42
  // find the first position which is smaller or equal than the key.
43
  // if all data is bigger than the key return -1
44
  while (1) {
45
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
33,668✔
46
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
8,026✔
47
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
6,955✔
48

49
    numOfRows = lastPos - firstPos + 1;
3,706✔
50
    midPos = (numOfRows >> 1) + firstPos;
3,706✔
51

52
    if (cmpFn(key, keyList, midPos) < 0) {
3,706✔
53
      lastPos = midPos - 1;
1,918✔
54
    } else if (cmpFn(key, keyList, midPos) > 0) {
1,785✔
55
      firstPos = midPos + 1;
1,037✔
56
    } else {
57
      break;
756✔
58
    }
59
  }
60

61
  return midPos;
756✔
62
}
63

64
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
65
  SArray*       pWinInfos = (SArray*)data;
×
66
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
67
  if (ppos != NULL) {
×
68
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
69
    return pWin->win.ekey;
×
70
  } else {
71
    return 0;
×
72
  }
73
}
74

75
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
11,123✔
76
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
11,123✔
77
    return true;
2,350✔
78
  }
79
  return false;
8,773✔
80
}
81

82
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
16,298✔
83
  SStreamStateCur* pCur = createStreamStateCursor();
16,298✔
84
  if (pCur == NULL) {
16,304!
85
    return NULL;
×
86
  }
87
  pCur->pStreamFileState = pFileState;
16,304✔
88
  return pCur;
16,304✔
89
}
90

91
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
6,490✔
92
                                   SRowBuffPos** ppPos) {
93
  int32_t      code = TSDB_CODE_SUCCESS;
6,490✔
94
  int32_t      lino = 0;
6,490✔
95
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
6,490✔
96
  if (!pNewPos || !pNewPos->pRowBuff) {
6,489!
97
    code = TSDB_CODE_OUT_OF_MEMORY;
×
98
    QUERY_CHECK_CODE(code, lino, _end);
×
99
  }
100

101
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
6,489✔
102
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
6,489✔
103
  if (!tmp) {
6,489!
104
    code = terrno;
×
105
    QUERY_CHECK_CODE(code, lino, _end);
×
106
  }
107
  (*ppPos) = pNewPos;
6,489✔
108

109
_end:
6,489✔
110
  if (code != TSDB_CODE_SUCCESS) {
6,489!
111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
112
  }
113
  return code;
6,489✔
114
}
115

116
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
201✔
117
                                      int32_t index, SRowBuffPos** ppPos) {
118
  int32_t      code = TSDB_CODE_SUCCESS;
201✔
119
  int32_t      lino = 0;
201✔
120
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
201✔
121
  if (!pNewPos || !pNewPos->pRowBuff) {
201!
122
    code = TSDB_CODE_OUT_OF_MEMORY;
×
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
201✔
127
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
201✔
128
  if (!tmp) {
201!
129
    code = terrno;
×
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

133
  *ppPos = pNewPos;
201✔
134

135
_end:
201✔
136
  if (code != TSDB_CODE_SUCCESS) {
201!
137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
138
  }
139
  return code;
201✔
140
}
141

142
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
456✔
143
  int32_t      code = TSDB_CODE_SUCCESS;
456✔
144
  int32_t      lino = 0;
456✔
145
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
456✔
146
  if (!pNewPos || !pNewPos->pRowBuff) {
456!
147
    code = TSDB_CODE_OUT_OF_MEMORY;
×
148
    QUERY_CHECK_CODE(code, lino, _end);
×
149
  }
150

151
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
456✔
152
  pNewPos->needFree = true;
456✔
153
  pNewPos->beFlushed = true;
456✔
154
  int32_t len = getRowStateRowSize(pFileState);
456✔
155
  if (p) {
456✔
156
    if (*pVLen > len){
395!
157
      qError("[StreamInternal] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len);
×
158
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
159
      QUERY_CHECK_CODE(code, lino, _end);
×
160
    }else{
161
      memcpy(pNewPos->pRowBuff, p, *pVLen);
395✔
162
    }
163
  } else {
164
    memset(pNewPos->pRowBuff, 0, len);
61✔
165
  }
166

167
_end:
456✔
168
  taosMemoryFree(p);
456!
169
  if (code != TSDB_CODE_SUCCESS) {
456!
170
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
171
    return NULL;
×
172
  }
173
  return pNewPos;
456✔
174
}
175

176
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
3,998✔
177
                                int32_t* pWinCode) {
178
  int32_t code = TSDB_CODE_SUCCESS;
3,998✔
179
  int32_t lino = 0;
3,998✔
180
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,998✔
181
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
3,998✔
182
  SArray*    pWinStates = NULL;
3,999✔
183
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
3,999✔
184
  if (ppBuff) {
4,001✔
185
    pWinStates = (SArray*)(*ppBuff);
2,636✔
186
  } else {
187
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,365✔
188
    if (!pWinStates) {
1,365!
189
      code = terrno;
×
190
      QUERY_CHECK_CODE(code, lino, _end);
×
191
    }
192
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,365✔
193
    QUERY_CHECK_CODE(code, lino, _end);
1,364!
194
  }
195

196
  TSKEY startTs = pKey->win.skey;
4,000✔
197
  TSKEY endTs = pKey->win.ekey;
4,000✔
198

199
  int32_t size = taosArrayGetSize(pWinStates);
4,000✔
200
  if (size == 0) {
3,999✔
201
    void*   pFileStore = getStateFileStore(pFileState);
1,454✔
202
    void*   p = NULL;
1,454✔
203
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
1,454✔
204
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,455✔
205
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
60✔
206
      if (!(*pVal)) {
60!
207
        code = TSDB_CODE_OUT_OF_MEMORY;
×
208
        QUERY_CHECK_CODE(code, lino, _end);
×
209
      }
210

211
      (*pWinCode) = code_file;
60✔
212
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
60✔
213
    } else {
214
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,395✔
215
      (*pWinCode) = TSDB_CODE_FAILED;
1,395✔
216
      taosMemoryFree(p);
1,395!
217
      QUERY_CHECK_CODE(code, lino, _end);
1,395!
218
    }
219
    goto _end;
1,455✔
220
  }
221

222
  // find the first position which is smaller than the pKey
223
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
2,545✔
224
  SRowBuffPos* pPos = NULL;
2,544✔
225

226
  if (index >= 0) {
2,544✔
227
    pPos = taosArrayGetP(pWinStates, index);
2,235✔
228
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
2,234✔
229
      (*pVal) = pPos;
944✔
230
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
944✔
231
      pPos->beUsed = true;
944✔
232
      pPos->beFlushed = false;
944✔
233
      *pKey = *pDestWinKey;
944✔
234
      goto _end;
944✔
235
    }
236
  }
237

238
  if (index + 1 < size) {
1,601✔
239
    pPos = taosArrayGetP(pWinStates, index + 1);
447✔
240
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
447!
241
      (*pVal) = pPos;
92✔
242
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
92✔
243
      pPos->beUsed = true;
92✔
244
      pPos->beFlushed = false;
92✔
245
      *pKey = *pDestWinKey;
92✔
246
      goto _end;
92✔
247
    }
248
  }
249

250
  if (index + 1 == 0) {
1,509✔
251
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
287!
252
      void*   p = NULL;
245✔
253
      void*   pFileStore = getStateFileStore(pFileState);
245✔
254
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
246✔
255
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
246!
256
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
246✔
257
        if (!(*pVal)) {
246!
258
          code = TSDB_CODE_OUT_OF_MEMORY;
×
259
          QUERY_CHECK_CODE(code, lino, _end);
246!
260
        }
261

262
        (*pWinCode) = code_file;
246✔
263
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
246✔
264
        goto _end;
246✔
265
      } else {
266
        taosMemoryFree(p);
×
267
      }
268
    }
269
  }
270

271
  if (index == size - 1) {
1,264✔
272
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,155✔
273
    QUERY_CHECK_CODE(code, lino, _end);
1,154!
274

275
    (*pWinCode) = TSDB_CODE_FAILED;
1,154✔
276
    goto _end;
1,154✔
277
  }
278

279
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
109✔
280
  QUERY_CHECK_CODE(code, lino, _end);
109!
281

282
  (*pWinCode) = TSDB_CODE_FAILED;
109✔
283

284
_end:
4,000✔
285
  if (code != TSDB_CODE_SUCCESS) {
4,000!
286
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
287
  }
288
  return code;
3,999✔
289
}
290

291
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
55✔
292
                          int32_t* pWinCode) {
293
  SWinKey*    pTmpkey = pKey;
55✔
294
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
55✔
295
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
55✔
296
}
297

298
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
145✔
299
  int32_t      code = TSDB_CODE_SUCCESS;
145✔
300
  int32_t      lino = 0;
145✔
301
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
145✔
302
  SSessionKey* pKey = pPos->pKey;
145✔
303
  SArray*      pWinStates = NULL;
145✔
304
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
145✔
305
  if (ppBuff) {
145✔
306
    pWinStates = (SArray*)(*ppBuff);
116✔
307
  } else {
308
    pWinStates = taosArrayInit(16, POINTER_BYTES);
29✔
309
    if (!pWinStates) {
29!
310
      code = terrno;
×
311
      QUERY_CHECK_CODE(code, lino, _end);
×
312
    }
313

314
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
29✔
315
    QUERY_CHECK_CODE(code, lino, _end);
29!
316
  }
317

318
  int32_t size = taosArrayGetSize(pWinStates);
145✔
319
  if (size == 0) {
145✔
320
    void* tmp = taosArrayPush(pWinStates, &pPos);
131✔
321
    if (!tmp) {
131!
322
      code = terrno;
×
323
      QUERY_CHECK_CODE(code, lino, _end);
×
324
    }
325
    goto _end;
131✔
326
  }
327

328
  // find the first position which is smaller than the pKey
329
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
14✔
330
  if (index >= 0) {
14!
331
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
332
    if (!tmp) {
×
333
      code = terrno;
×
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336
  } else {
337
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
14✔
338
    if (!tmp) {
14!
339
      code = terrno;
×
340
      QUERY_CHECK_CODE(code, lino, _end);
×
341
    }
342
  }
343

344
_end:
14✔
345
  pPos->needFree = false;
145✔
346
  if (code != TSDB_CODE_SUCCESS) {
145!
347
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
348
  }
349
  return code;
145✔
350
}
351

352
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
393✔
353
                              int32_t* pWinCode) {
354
  int32_t      code = TSDB_CODE_SUCCESS;
393✔
355
  int32_t      lino = 0;
393✔
356
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
393✔
357
  if (!pNewPos || !pNewPos->pRowBuff) {
393!
358
    code = TSDB_CODE_OUT_OF_MEMORY;
×
359
    QUERY_CHECK_CODE(code, lino, _end);
×
360
  }
361
  pNewPos->needFree = true;
393✔
362
  pNewPos->beFlushed = true;
393✔
363
  void* pBuff = NULL;
393✔
364
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
393✔
365
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
393!
366
    goto _end;
×
367
  }
368
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
393✔
369
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
393✔
370
  taosMemoryFreeClear(pBuff);
393!
371
  (*pVal) = pNewPos;
393✔
372

373
_end:
393✔
374
  if (code != TSDB_CODE_SUCCESS) {
393!
375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
376
  }
377
  return code;
393✔
378
}
379

380
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
2,279✔
381
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
2,279✔
382
  SSessionKey* pWinKey = (SSessionKey*)key;
2,279✔
383
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
2,279✔
384
  if (!ppBuff) {
2,279✔
385
    return TSDB_CODE_SUCCESS;
611✔
386
  }
387
  SArray* pWinStates = (SArray*)(*ppBuff);
1,668✔
388
  int32_t size = taosArrayGetSize(pWinStates);
1,668✔
389
  TSKEY   gap = 0;
1,668✔
390
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
1,668✔
391
  if (index >= 0) {
1,667✔
392
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
1,248✔
393
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
1,248!
394
      pPos->beFlushed = true;
1,248✔
395
      taosArrayRemove(pWinStates, index);
1,248✔
396
    }
397
  }
398
  return TSDB_CODE_SUCCESS;
1,667✔
399
}
400

401
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
246✔
402
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
246✔
403
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
246✔
404
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
246✔
405
  if (!ppBuff) {
246!
406
    return;
×
407
  }
408
  SArray* pWinStates = (SArray*)(*ppBuff);
246✔
409
  int32_t size = taosArrayGetSize(pWinStates);
246✔
410
  TSKEY   gap = 0;
246✔
411
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
246✔
412
  if (index >= 0) {
246✔
413
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
48✔
414
    if (pItemPos == pPos) {
48!
415
      taosArrayRemove(pWinStates, index);
48✔
416
    }
417
  }
418
}
419

420
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
484✔
421
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
422
  int32_t      code = TSDB_CODE_SUCCESS;
484✔
423
  int32_t      lino = 0;
484✔
424
  SRowBuffPos* pNewPos = NULL;
484✔
425
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
484✔
426
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
484✔
427
  SArray*      pWinStates = NULL;
484✔
428
  if (!ppBuff) {
484✔
429
    pWinStates = taosArrayInit(16, POINTER_BYTES);
107✔
430
    if (!pWinStates) {
107!
431
      code = terrno;
×
432
      QUERY_CHECK_CODE(code, lino, _end);
×
433
    }
434

435
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
107✔
436
    QUERY_CHECK_CODE(code, lino, _end);
107!
437
  } else {
438
    pWinStates = (SArray*)(*ppBuff);
377✔
439
  }
440
  if (!pCur) {
484✔
441
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
109✔
442
    QUERY_CHECK_CODE(code, lino, _end);
109!
443

444
    goto _end;
109✔
445
  }
446

447
  int32_t size = taosArrayGetSize(pWinStates);
375✔
448
  if (pCur->buffIndex >= 0) {
375✔
449
    if (pCur->buffIndex >= size) {
373✔
450
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
307✔
451
      QUERY_CHECK_CODE(code, lino, _end);
307!
452

453
      goto _end;
307✔
454
    }
455
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
66✔
456
    QUERY_CHECK_CODE(code, lino, _end);
66!
457

458
    goto _end;
66✔
459
  } else {
460
    if (size > 0) {
2!
461
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
×
462
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
×
463
        // pCur is invalid
464
        SSessionKey pTmpKey = *pWinKey;
×
465
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
466
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
467
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
468
        QUERY_CHECK_CODE(code, lino, _end);
×
469
        goto _end;
×
470
      }
471
    }
472
    pNewPos = getNewRowPosForWrite(pFileState);
2✔
473
    if (!pNewPos || !pNewPos->pRowBuff) {
2!
474
      code = TSDB_CODE_OUT_OF_MEMORY;
×
475
      QUERY_CHECK_CODE(code, lino, _end);
×
476
    }
477

478
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
2✔
479
    pNewPos->needFree = true;
2✔
480
    pNewPos->beFlushed = true;
2✔
481
  }
482

483
_end:
484✔
484
  (*ppVal) = pNewPos;
484✔
485
  if (code != TSDB_CODE_SUCCESS) {
484!
486
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
487
  }
488
  return code;
484✔
489
}
490

491
void sessionWinStateClear(SStreamFileState* pFileState) {
658✔
492
  int32_t buffSize = getRowStateRowSize(pFileState);
658✔
493
  void*   pIte = NULL;
658✔
494
  size_t  keyLen = 0;
658✔
495
  int32_t iter = 0;
658✔
496
  void*   pBuff = getRowStateBuff(pFileState);
658✔
497
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
2,657✔
498
    SArray* pWinStates = *((void**)pIte);
2,000✔
499
    int32_t size = taosArrayGetSize(pWinStates);
2,000✔
500
    for (int32_t i = 0; i < size; i++) {
4,172✔
501
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
2,172✔
502
      memset(pPos->pRowBuff, 0, buffSize);
2,172✔
503
    }
504
  }
505
}
659✔
506

507
void freeArrayPtr(void* ptr) {
3,927✔
508
  SArray* pArray = *(void**)ptr;
3,927✔
509
  taosArrayDestroy(pArray);
3,927✔
510
}
3,927✔
511

512
void sessionWinStateCleanup(void* pBuff) {
7,611✔
513
  if (pBuff == NULL) {
7,611✔
514
    return;
5,122✔
515
  }
516
  tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
2,489✔
517
  tSimpleHashCleanup(pBuff);
2,490✔
518
}
519

520
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
16,513✔
521
                                                SArray** pWins, int32_t* pIndex) {
522
  SStreamStateCur* pCur = NULL;
16,513✔
523
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
16,513✔
524
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
16,513✔
525
  if (!ppBuff) {
16,523✔
526
    return NULL;
1,259✔
527
  }
528

529
  SArray* pWinStates = (SArray*)(*ppBuff);
15,264✔
530
  int32_t size = taosArrayGetSize(pWinStates);
15,264✔
531
  TSKEY   gap = 0;
15,264✔
532
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
15,264✔
533

534
  if (pWins) {
15,251✔
535
    (*pWins) = pWinStates;
9,755✔
536
  }
537

538
  if (size > 0 && index == -1) {
15,251✔
539
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
1,780✔
540
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
1,780✔
541
    if (pWinKey->win.skey == pWin->win.skey) {
1,780✔
542
      index = 0;
381✔
543
    }
544
  }
545

546
  if (index >= 0) {
15,251✔
547
    pCur = createStateCursor(pFileState);
12,382✔
548
    if (pCur == NULL) {
12,391!
549
      return NULL;
×
550
    }
551
    pCur->buffIndex = index;
12,391✔
552
    if (pIndex) {
12,391✔
553
      *pIndex = index;
8,411✔
554
    }
555
  }
556

557
  return pCur;
15,260✔
558
}
559

560
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
5,832✔
561
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
5,832✔
562
  if (pCur) {
5,836✔
563
    return pCur;
3,980✔
564
  }
565

566
  void* pFileStore = getStateFileStore(pFileState);
1,856✔
567
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
1,856✔
568
  if (!pCur) {
1,857✔
569
    return NULL;
869✔
570
  }
571
  pCur->buffIndex = -1;
988✔
572
  pCur->pStreamFileState = pFileState;
988✔
573
  return pCur;
988✔
574
}
575

576
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
577
  SArray*          pWinStates = NULL;
×
578
  int32_t          index = -1;
×
579
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
580
  if (pCur) {
×
581
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
582
    if (cmpRes > 0) {
×
583
      return pCur;
×
584
    } else if (cmpRes == 0 && index > 0) {
×
585
      sessionWinStateMoveToPrev(pCur);
×
586
      return pCur;
×
587
    }
588
    streamStateFreeCur(pCur);
×
589
    pCur = NULL;
×
590
  }
591

592
  void* pFileStore = getStateFileStore(pFileState);
×
593
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
594
  if (!pCur) {
×
595
    return NULL;
×
596
  }
597
  pCur->buffIndex = -1;
×
598
  pCur->pStreamFileState = pFileState;
×
599
  return pCur;
×
600
}
601

602
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
514✔
603
  if (!pCur) {
514!
604
    return;
×
605
  }
606
  streamStateResetCur(pCur);
514✔
607
  pCur->buffIndex = 0;
514✔
608
  pCur->pStreamFileState = pFileState;
514✔
609
}
610

611
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
2,051✔
612
                                    SStreamStateCur** ppCur) {
613
  SSessionKey key = {.groupId = groupId};
2,051✔
614
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
2,051✔
615
  if (taosArrayGetSize(pWinStates) > 0 &&
2,051✔
616
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
99!
617
    if (!(*ppCur)) {
482✔
618
      (*ppCur) = createStateCursor(pFileState);
480✔
619
    }
620
    transformCursor(pFileState, *ppCur);
482✔
621
  } else if (*ppCur) {
1,569✔
622
    (*ppCur)->buffIndex = -1;
600✔
623
    (*ppCur)->pStreamFileState = pFileState;
600✔
624
  }
625
}
2,051✔
626

627
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
321✔
628
  SArray*          pWinStates = NULL;
321✔
629
  int32_t          index = -1;
321✔
630
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
321✔
631
  if (pCur) {
321✔
632
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
145✔
633
      sessionWinStateMoveToNext(pCur);
28✔
634
    }
635
    return pCur;
145✔
636
  }
637

638
  void* pFileStore = getStateFileStore(pFileState);
176✔
639
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
176✔
640
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
176✔
641
  return pCur;
176✔
642
}
643

644
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
9,987✔
645
  SArray*          pWinStates = NULL;
9,987✔
646
  int32_t          index = -1;
9,987✔
647
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
9,987✔
648
  if (pCur) {
9,989✔
649
    sessionWinStateMoveToNext(pCur);
8,115✔
650
    return pCur;
8,112✔
651
  }
652

653
  void* pFileStore = getStateFileStore(pFileState);
1,874✔
654
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
1,874✔
655
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
1,875✔
656
  return pCur;
1,876✔
657
}
658

659
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
374✔
660
  SArray*          pWinStates = NULL;
374✔
661
  int32_t          index = -1;
374✔
662
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
374✔
663
  int32_t          resSize = getRowStateRowSize(pFileState);
374✔
664
  COUNT_TYPE       winCount = 0;
374✔
665
  if (pBuffCur) {
374✔
666
    while (index >= 0) {
317✔
667
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
198✔
668
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
198✔
669
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
198✔
670
        index--;
166✔
671
      } else if (index >= 0) {
32!
672
        pBuffCur->buffIndex = index + 1;
32✔
673
        return pBuffCur;
32✔
674
      }
675
    }
676
    pBuffCur->buffIndex = 0;
119✔
677
  } else if (taosArrayGetSize(pWinStates) > 0) {
223!
678
    pBuffCur = createStateCursor(pFileState);
×
679
    if (pBuffCur == NULL) {
×
680
      return NULL;
×
681
    }
682
    pBuffCur->buffIndex = 0;
×
683
  }
684

685
  void*            pFileStore = getStateFileStore(pFileState);
342✔
686
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
342✔
687
  if (pCur) {
342!
688
    pCur->pStreamFileState = pFileState;
×
689
    SSessionKey key = {0};
×
690
    void*       pVal = NULL;
×
691
    int         len = 0;
×
692
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
693
    if (code == TSDB_CODE_FAILED) {
×
694
      streamStateFreeCur(pCur);
×
695
      return pBuffCur;
×
696
    }
697
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
698
    taosMemoryFreeClear(pVal);
×
699
    streamStateFreeCur(pBuffCur);
×
700
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
701
      streamStateCurNext(pFileStore, pCur);
×
702
      return pCur;
×
703
    }
704
    streamStateCurPrev(pFileStore, pCur);
×
705
    while (1) {
706
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
707
      if (code == TSDB_CODE_FAILED) {
×
708
        streamStateCurNext(pFileStore, pCur);
×
709
        return pCur;
×
710
      }
711
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
712
      taosMemoryFreeClear(pVal);
×
713
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
714
        streamStateCurPrev(pFileStore, pCur);
×
715
      } else {
716
        streamStateCurNext(pFileStore, pCur);
×
717
        return pCur;
×
718
      }
719
    }
720
  }
721
  return pBuffCur;
342✔
722
}
723

724
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
17,399✔
725
  if (!pCur) {
17,399✔
726
    return TSDB_CODE_FAILED;
1,249✔
727
  }
728
  int32_t code = TSDB_CODE_SUCCESS;
16,150✔
729

730
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
16,150✔
731
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
16,150✔
732
  SArray*    pWinStates = NULL;
16,155✔
733
  if (ppBuff) {
16,155✔
734
    pWinStates = (SArray*)(*ppBuff);
15,713✔
735
  }
736

737
  if (pCur->buffIndex >= 0) {
16,155✔
738
    int32_t size = taosArrayGetSize(pWinStates);
14,472✔
739
    if (pCur->buffIndex >= size) {
14,472✔
740
      return TSDB_CODE_FAILED;
8,598✔
741
    }
742
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
5,874✔
743
    if (pVal) {
5,871✔
744
      *pVal = pPos;
1,247✔
745
    }
746
    *pKey = *(SSessionKey*)(pPos->pKey);
5,871✔
747
  } else {
748
    void* pData = NULL;
1,683✔
749
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
1,683✔
750
    if (taosArrayGetSize(pWinStates) > 0 &&
1,683✔
751
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
702!
752
      transformCursor(pCur->pStreamFileState, pCur);
32✔
753
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
32✔
754
      if (pVal) {
32!
755
        *pVal = pPos;
×
756
      }
757
      *pKey = *(SSessionKey*)(pPos->pKey);
32✔
758
      code = TSDB_CODE_SUCCESS;
32✔
759
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
1,651✔
760
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
126✔
761
      if (!pNewPos || !pNewPos->pRowBuff) {
126!
762
        code = TSDB_CODE_OUT_OF_MEMORY;
×
763
        taosMemoryFreeClear(pData);
×
764
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
765
        return code;
×
766
      }
767
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
126✔
768
      pNewPos->needFree = true;
126✔
769
      pNewPos->beFlushed = true;
126✔
770
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
126✔
771
      (*pVal) = pNewPos;
126✔
772
    }
773
    taosMemoryFreeClear(pData);
1,683!
774
  }
775
  return code;
7,554✔
776
}
777

778
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
13,551✔
779
  qTrace("move cursor to next");
13,551✔
780
  if (pCur && pCur->buffIndex >= 0) {
13,551✔
781
    pCur->buffIndex++;
9,753✔
782
  } else {
783
    streamStateCurNext_rocksdb(pCur);
3,798✔
784
  }
785
}
13,554✔
786

787
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
788
  qTrace("move cursor to prev");
×
789
  if (pCur && pCur->buffIndex >= 1) {
×
790
    pCur->buffIndex--;
×
791
  } else {
792
    streamStateCurPrev_rocksdb(pCur);
×
793
  }
794
}
×
795

796
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
5,249✔
797
                                     range_cmpr_fn cmpFn) {
798
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
5,249✔
799
  SSessionKey      tmpKey = *key;
5,254✔
800
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
5,254✔
801
  bool             hasCurrentPrev = true;
5,252✔
802
  if (code == TSDB_CODE_FAILED) {
5,252✔
803
    streamStateFreeCur(pCur);
966✔
804
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
966✔
805
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
966✔
806
    hasCurrentPrev = false;
966✔
807
  }
808

809
  if (code == TSDB_CODE_FAILED) {
5,252✔
810
    code = TSDB_CODE_FAILED;
715✔
811
    goto _end;
715✔
812
  }
813

814
  if (cmpFn(key, &tmpKey) == 0) {
4,537✔
815
    *curKey = tmpKey;
3,299✔
816
    goto _end;
3,299✔
817
  } else if (!hasCurrentPrev) {
1,238✔
818
    code = TSDB_CODE_FAILED;
230✔
819
    goto _end;
230✔
820
  }
821

822
  sessionWinStateMoveToNext(pCur);
1,008✔
823
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
1,008✔
824
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
1,009✔
825
    *curKey = tmpKey;
446✔
826
  } else {
827
    code = TSDB_CODE_FAILED;
563✔
828
  }
829

830
_end:
5,253✔
831
  streamStateFreeCur(pCur);
5,253✔
832
  return code;
5,252✔
833
}
834

835
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
4,127✔
836
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
837
  int32_t code = TSDB_CODE_SUCCESS;
4,127✔
838
  int32_t lino = 0;
4,127✔
839
  (*pWinCode) = TSDB_CODE_SUCCESS;
4,127✔
840

841
  SSessionKey* pWinKey = key;
4,127✔
842
  TSKEY        gap = 0;
4,127✔
843
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
4,127✔
844
  SArray*      pWinStates = NULL;
4,128✔
845

846
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
4,128✔
847
  if (ppBuff) {
4,128✔
848
    pWinStates = (SArray*)(*ppBuff);
3,558✔
849
  } else {
850
    pWinStates = taosArrayInit(16, POINTER_BYTES);
570✔
851
    if (!pWinStates) {
570!
852
      code = terrno;
×
853
      QUERY_CHECK_CODE(code, lino, _end);
×
854
    }
855

856
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
570✔
857
    QUERY_CHECK_CODE(code, lino, _end);
570!
858
  }
859

860
  TSKEY startTs = pWinKey->win.skey;
4,128✔
861
  TSKEY endTs = pWinKey->win.ekey;
4,128✔
862

863
  int32_t size = taosArrayGetSize(pWinStates);
4,128✔
864
  if (size == 0) {
4,128✔
865
    void*   pFileStore = getStateFileStore(pFileState);
981✔
866
    void*   p = NULL;
981✔
867
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
981✔
868
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
981✔
869
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
33✔
870
      if (!(*pVal)) {
33!
871
        code = TSDB_CODE_OUT_OF_MEMORY;
×
872
        QUERY_CHECK_CODE(code, lino, _end);
×
873
      }
874

875
      (*pWinCode) = code_file;
33✔
876
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
33✔
877
             pWinKey->win.ekey, code_file);
878
    } else {
879
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
948✔
880
      (*pWinCode) = TSDB_CODE_FAILED;
948✔
881
      taosMemoryFree(p);
948!
882
      QUERY_CHECK_CODE(code, lino, _end);
948!
883
    }
884
    goto _end;
981✔
885
  }
886

887
  // find the first position which is smaller than the pWinKey
888
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,147✔
889
  SRowBuffPos* pPos = NULL;
3,148✔
890
  int32_t      valSize = *pVLen;
3,148✔
891

892
  if (index >= 0) {
3,148✔
893
    pPos = taosArrayGetP(pWinStates, index);
3,079✔
894
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
3,079✔
895
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
3,079✔
896
      (*pVal) = pPos;
2,549✔
897
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,549✔
898
      pPos->beUsed = true;
2,549✔
899
      pPos->beFlushed = false;
2,549✔
900
      *key = *pDestWinKey;
2,549✔
901
      goto _end;
2,549✔
902
    }
903
  }
904

905
  if (index + 1 < size) {
599✔
906
    pPos = taosArrayGetP(pWinStates, index + 1);
88✔
907
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
88✔
908
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
174!
909
        fn(pKeyData, stateKey) == true) {
86✔
910
      (*pVal) = pPos;
7✔
911
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
7✔
912
      pPos->beUsed = true;
7✔
913
      pPos->beFlushed = false;
7✔
914
      *key = *pDestWinKey;
7✔
915
      goto _end;
7✔
916
    }
917
  }
918

919
  if (index + 1 == 0) {
592✔
920
    if (!isDeteled(pFileState, endTs)) {
64!
921
      void*   p = NULL;
64✔
922
      void*   pFileStore = getStateFileStore(pFileState);
64✔
923
      int32_t code_file =
924
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
64✔
925
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
64✔
926
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
55✔
927
        if (!(*pVal)) {
55!
928
          code = TSDB_CODE_OUT_OF_MEMORY;
×
929
          QUERY_CHECK_CODE(code, lino, _end);
55!
930
        }
931

932
        (*pWinCode) = code_file;
55✔
933
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
55✔
934
               pWinKey->win.ekey, code_file);
935
        goto _end;
55✔
936
      } else {
937
        taosMemoryFree(p);
9!
938
      }
939
    }
940
  }
941

942
  if (index == size - 1) {
537✔
943
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
511✔
944
    QUERY_CHECK_CODE(code, lino, _end);
511!
945

946
    (*pWinCode) = TSDB_CODE_FAILED;
511✔
947
    goto _end;
511✔
948
  }
949
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
26✔
950
  QUERY_CHECK_CODE(code, lino, _end);
26!
951

952
  (*pWinCode) = TSDB_CODE_FAILED;
26✔
953

954
_end:
4,129✔
955
  if (code != TSDB_CODE_SUCCESS) {
4,129!
956
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
957
  }
958
  return code;
4,129✔
959
}
960

961
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1,545✔
962
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
1,545✔
963
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,543✔
964
  streamStateFreeCur(pCur);
1,544✔
965
  if (code == TSDB_CODE_SUCCESS) {
1,543!
966
    return code;
×
967
  } else {
968
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
1,543✔
969
  }
970

971
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,546✔
972
  streamStateFreeCur(pCur);
1,546✔
973
  return code;
1,546✔
974
}
975

976
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
4,862✔
977
                              int32_t* pVLen, int32_t* pWinCount) {
978
  int32_t code = TSDB_CODE_SUCCESS;
4,862✔
979
  int32_t lino = 0;
4,862✔
980
  (*pWinCount) = TSDB_CODE_SUCCESS;
4,862✔
981

982
  SSessionKey* pWinKey = pKey;
4,862✔
983
  const TSKEY  gap = 0;
4,862✔
984
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
4,862✔
985
  SArray*      pWinStates = NULL;
4,862✔
986
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
4,862✔
987
  if (ppBuff) {
4,863✔
988
    pWinStates = (SArray*)(*ppBuff);
3,550✔
989
  } else {
990
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,313✔
991
    if (!pWinStates) {
1,313!
992
      code = terrno;
×
993
      QUERY_CHECK_CODE(code, lino, _end);
×
994
    }
995

996
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,313✔
997
    QUERY_CHECK_CODE(code, lino, _end);
1,313!
998
  }
999

1000
  TSKEY startTs = pWinKey->win.skey;
4,863✔
1001
  TSKEY endTs = pWinKey->win.ekey;
4,863✔
1002

1003
  int32_t size = taosArrayGetSize(pWinStates);
4,863✔
1004
  if (size == 0) {
4,863✔
1005
    void* pFileStore = getStateFileStore(pFileState);
1,323✔
1006
    void* pRockVal = NULL;
1,323✔
1007
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
1,323✔
1008
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,323✔
1009
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
56✔
1010
             pWinKey->win.ekey, (*pWinCount));
1011
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
56✔
1012
        int32_t     valSize = *pVLen;
55✔
1013
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
55✔
1014
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
55!
1015
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
45✔
1016
          if (!(*pVal)) {
45!
1017
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1018
            QUERY_CHECK_CODE(code, lino, _end);
×
1019
          }
1020

1021
          goto _end;
45✔
1022
        }
1023
      }
1024
      pWinKey->win.skey = startTs;
11✔
1025
      pWinKey->win.ekey = endTs;
11✔
1026
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
11✔
1027
      taosMemoryFree(pRockVal);
10!
1028
      if (!(*pVal)) {
10!
1029
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1030
        QUERY_CHECK_CODE(code, lino, _end);
×
1031
      }
1032
    } else {
1033
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1,267✔
1034
      QUERY_CHECK_CODE(code, lino, _end);
1,268!
1035

1036
      (*pWinCount) = TSDB_CODE_FAILED;
1,268✔
1037
    }
1038
    goto _end;
1,278✔
1039
  }
1040

1041
  // find the first position which is smaller than the pWinKey
1042
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,540✔
1043
  SRowBuffPos* pPos = NULL;
3,540✔
1044
  int32_t      valSize = *pVLen;
3,540✔
1045

1046
  if (index >= 0) {
3,540✔
1047
    pPos = taosArrayGetP(pWinStates, index);
3,528✔
1048
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
3,528✔
1049
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
3,528✔
1050
      (*pVal) = pPos;
2,948✔
1051
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,948✔
1052
      pPos->beUsed = true;
2,948✔
1053
      pPos->beFlushed = false;
2,948✔
1054
      *pWinKey = *pDestWinKey;
2,948✔
1055
      goto _end;
2,948✔
1056
    }
1057
  }
1058

1059
  if (index == -1) {
592✔
1060
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
12!
1061
      SSessionKey tmpKey = *pWinKey;
×
1062
      void*       pRockVal = NULL;
×
1063
      void*       pFileStore = getStateFileStore(pFileState);
×
1064
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1065
      if (code_file == TSDB_CODE_SUCCESS) {
×
1066
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1067
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1068
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1069
          *pWinKey = tmpKey;
×
1070
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1071
          if (!(*pVal)) {
×
1072
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1073
            QUERY_CHECK_CODE(code, lino, _end);
×
1074
          }
1075

1076
          (*pWinCount) = code_file;
×
1077
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1078
                 pWinKey->win.ekey, code_file);
1079
          goto _end;
×
1080
        }
1081
      }
1082
      taosMemoryFree(pRockVal);
×
1083
    }
1084
  }
1085

1086
  if (index + 1 < size) {
592✔
1087
    pPos = taosArrayGetP(pWinStates, index + 1);
56✔
1088
    (*pVal) = pPos;
56✔
1089
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
56✔
1090
    pPos->beUsed = true;
56✔
1091
    pPos->beFlushed = false;
56✔
1092
    *pWinKey = *pDestWinKey;
56✔
1093
    goto _end;
56✔
1094
  }
1095

1096
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
536✔
1097
  QUERY_CHECK_CODE(code, lino, _end);
536!
1098

1099
  (*pWinCount) = TSDB_CODE_FAILED;
536✔
1100

1101
_end:
4,863✔
1102
  if (code != TSDB_CODE_SUCCESS) {
4,863!
1103
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1104
  }
1105
  return code;
4,862✔
1106
}
1107

1108
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
261✔
1109
                                 int32_t* pVLen) {
1110
  SSessionKey* pWinKey = pKey;
261✔
1111
  const TSKEY  gap = 0;
261✔
1112
  int32_t      code = TSDB_CODE_SUCCESS;
261✔
1113
  int32_t      lino = 0;
261✔
1114
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
261✔
1115
  SArray*      pWinStates = NULL;
261✔
1116
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
261✔
1117
  if (ppBuff) {
261✔
1118
    pWinStates = (SArray*)(*ppBuff);
45✔
1119
  } else {
1120
    pWinStates = taosArrayInit(16, POINTER_BYTES);
216✔
1121
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
216✔
1122
    QUERY_CHECK_CODE(code, lino, _end);
216!
1123
  }
1124

1125
  TSKEY startTs = pWinKey->win.skey;
261✔
1126
  TSKEY endTs = pWinKey->win.ekey;
261✔
1127

1128
  int32_t size = taosArrayGetSize(pWinStates);
261✔
1129
  if (size == 0) {
261✔
1130
    void* pFileStore = getStateFileStore(pFileState);
223✔
1131
    void* pRockVal = NULL;
223✔
1132

1133
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
223✔
1134
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
223!
1135
      int32_t     valSize = *pVLen;
×
1136
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1137
      if ((*pWinStateCount) == winCount) {
×
1138
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1139
        QUERY_CHECK_CODE(code, lino, _end);
×
1140
      } else {
1141
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1142
        if (!(*pVal)) {
×
1143
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1144
          QUERY_CHECK_CODE(code, lino, _end);
×
1145
        }
1146
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1147
               pWinKey->win.ekey, code_file);
1148
      }
1149
    } else {
1150
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
223✔
1151
      taosMemoryFree(pRockVal);
223!
1152
      QUERY_CHECK_CODE(code, lino, _end);
223!
1153
    }
1154
  } else {
1155
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
38✔
1156
    QUERY_CHECK_CODE(code, lino, _end);
38!
1157
  }
1158

1159
_end:
38✔
1160
  if (code != TSDB_CODE_SUCCESS) {
261!
1161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1162
  }
1163
  return code;
261✔
1164
}
1165

1166
static int compareRangeKey(const void* pKey, const void* data, int index) {
×
1167
  SScanRange* pRange1 = (SScanRange*) pKey;
×
1168
  SScanRange* pRange2 = taosArrayGet((SArray*)data, index);
×
1169
  if (pRange1->win.skey > pRange2->win.skey) {
×
1170
    return 1;
×
1171
  } else if (pRange1->win.skey < pRange2->win.skey) {
×
1172
    return -1;
×
1173
  }
1174

1175
  if (pRange1->win.ekey > pRange2->win.ekey) {
×
1176
    return 1;
×
1177
  } else if (pRange1->win.ekey < pRange2->win.ekey) {
×
1178
    return -1;
×
1179
  }
1180

1181
  return 0;
×
1182
}
1183

1184
static int32_t scanRangeKeyCmpr(const SScanRange* pRange1, const SScanRange* pRange2) {
×
1185
  if (pRange1->win.skey > pRange2->win.ekey) {
×
1186
    return 1;
×
1187
  } else if (pRange1->win.ekey < pRange2->win.skey) {
×
1188
    return -1;
×
1189
  }
1190

1191
  return 0;
×
1192
}
1193

1194
static int32_t putRangeIdInfo(SScanRange* pRangeKey, uint64_t gpId, uint64_t uId) {
×
1195
  int32_t code = TSDB_CODE_SUCCESS;
×
1196
  int32_t lino = 0;
×
1197

1198
  code = tSimpleHashPut(pRangeKey->pUIds, &uId, sizeof(uint64_t), NULL, 0);
×
1199
  QUERY_CHECK_CODE(code, lino, _end);
×
1200
  code = tSimpleHashPut(pRangeKey->pGroupIds, &gpId, sizeof(uint64_t), NULL, 0);
×
1201
  QUERY_CHECK_CODE(code, lino, _end);
×
1202
_end:
×
1203
  if (code != TSDB_CODE_SUCCESS) {
×
1204
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1205
  }
1206
  return code;
×
1207
}
1208

1209
void mergeRangeKey(SScanRange* pRangeDest, SScanRange* pRangeSrc) {
×
1210
  pRangeDest->win.skey = TMIN(pRangeDest->win.skey, pRangeSrc->win.skey);
×
1211
  pRangeDest->win.ekey = TMAX(pRangeDest->win.ekey, pRangeSrc->win.ekey);
×
1212
  pRangeDest->calWin.skey = TMIN(pRangeDest->calWin.skey, pRangeSrc->calWin.skey);
×
1213
  pRangeDest->calWin.ekey = TMAX(pRangeDest->calWin.ekey, pRangeSrc->calWin.ekey);
×
1214
}
×
1215

1216
int32_t mergeScanRange(SArray* pRangeArray, SScanRange* pRangeKey, uint64_t gpId, uint64_t uId, int32_t* pIndex, bool* pRes, char* idStr) {
×
1217
  int32_t code = TSDB_CODE_SUCCESS;
×
1218
  int32_t lino = 0;
×
1219
  int32_t size = taosArrayGetSize(pRangeArray);
×
1220
  int32_t index = binarySearch(pRangeArray, size, pRangeKey, compareRangeKey);
×
1221
  if (index >= 0) {
×
1222
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index);
×
1223
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1224
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1225
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1226
      QUERY_CHECK_CODE(code, lino, _end);
×
1227
      *pRes = true;
×
1228
      goto _end;
×
1229
    }
1230
  }
1231
  if (index + 1 < size) {
×
1232
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index + 1);
×
1233
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1234
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1235
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1236
      QUERY_CHECK_CODE(code, lino, _end);
×
1237
      *pRes = true;
×
1238
      goto _end;
×
1239
    }
1240
  }
1241
  (*pIndex) = index;
×
1242
  *pRes = false;
×
1243

1244
_end:
×
1245
  qDebug("===stream===%s mergeScanRange start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64 ", uid:%" PRIu64
×
1246
         ", res:%d", idStr, pRangeKey->win.skey, pRangeKey->win.ekey, gpId, uId, *pRes);
1247
  if (code != TSDB_CODE_SUCCESS) {
×
1248
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1249
  }
1250
  return code;
×
1251
}
1252

1253
int32_t mergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId, SRecDataInfo* pRecData, int32_t recLen) {
×
1254
  int32_t code = TSDB_CODE_SUCCESS;
×
1255
  int32_t lino = 0;
×
1256

1257
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1258
  uint64_t uId = pRecData->tableUid;
×
1259

1260
  int32_t index = 0;
×
1261
  bool merge = false;
×
1262
  SScanRange rangeKey = {.win = *pWin, .calWin = pRecData->calWin, .pUIds = NULL, .pGroupIds = NULL};
×
1263
  code = mergeScanRange(pRangeArray, &rangeKey, gpId, uId, &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1264
  QUERY_CHECK_CODE(code, lino, _end);
×
1265
  if (merge == true) {
×
1266
    goto _end;
×
1267
  }
1268

1269
  int32_t size = taosArrayGetSize(pRangeArray);
×
1270
  if (size > MAX_SCAN_RANGE_SIZE) {
×
1271
    SSessionKey sesKey = {.win = *pWin, .groupId = gpId};
×
1272
    void* pVal = NULL;
×
1273
    int32_t len = 0;
×
1274
    int32_t winCode = streamStateSessionGet_rocksdb(pTsDataState->pState, &sesKey, &pVal, &len);
×
1275
    if (winCode != TSDB_CODE_SUCCESS) {
×
1276
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, &uId, sizeof(uint64_t));
×
1277
    } else {
1278
      char* pTempBuf = taosMemoryRealloc(pVal, len + sizeof(uint64_t));
×
1279
      QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1280
      memcpy(pTempBuf+len, &uId, sizeof(uint64_t));
×
1281
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, pTempBuf, len + sizeof(uint64_t));
×
1282
      taosMemFreeClear(pTempBuf);
×
1283
    }
1284
    QUERY_CHECK_CODE(code, lino, _end);
×
1285
    goto _end;
×
1286
  }
1287
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1288
  rangeKey.pGroupIds = tSimpleHashInit(8, hashFn);
×
1289
  rangeKey.pUIds = tSimpleHashInit(8, hashFn);
×
1290
  code = putRangeIdInfo(&rangeKey, gpId, uId);
×
1291
  QUERY_CHECK_CODE(code, lino, _end);
×
1292
  index++;
×
1293
  taosArrayInsert(pRangeArray, index, &rangeKey);
×
1294

1295
_end:
×
1296
  if (code != TSDB_CODE_SUCCESS) {
×
1297
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1298
  }
1299
  return code;
×
1300
}
1301

1302
int32_t mergeSimpleHashMap(SSHashObj* pDest, SSHashObj* pSource) {
×
1303
  int32_t code = TSDB_CODE_SUCCESS;
×
1304
  int32_t lino = 0;
×
1305
  void*   pIte = NULL;
×
1306
  int32_t iter = 0;
×
1307
  while ((pIte = tSimpleHashIterate(pSource, pIte, &iter)) != NULL) {
×
1308
    size_t keyLen = 0;
×
1309
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1310
    code = tSimpleHashPut(pDest, pKey, keyLen, pIte, sizeof(uint64_t));
×
1311
    QUERY_CHECK_CODE(code, lino, _end);
×
1312
  }
1313
  tSimpleHashCleanup(pSource);
×
1314

1315
_end:
×
1316
  if (code != TSDB_CODE_SUCCESS) {
×
1317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1318
  }
1319
  return code;
×
1320
}
1321

1322
int32_t mergeAllScanRange(STableTsDataState* pTsDataState) {
×
1323
  int32_t code = TSDB_CODE_SUCCESS;
×
1324
  int32_t lino = 0;
×
1325
  SStreamStateCur* pCur = NULL;
×
1326
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1327
  if (taosArrayGetSize(pRangeArray) < 2) {
×
1328
    return code;
×
1329
  }
1330

1331
  for (int32_t i = 0; i < taosArrayGetSize(pRangeArray) - 1;) {
×
1332
    SScanRange* pCurRange = taosArrayGet(pRangeArray, i);
×
1333
    SScanRange* pNextRange = taosArrayGet(pRangeArray, i + 1);
×
1334
    if (scanRangeKeyCmpr(pCurRange, pNextRange) == 0) {
×
1335
      pCurRange->win.skey = TMIN(pCurRange->win.skey, pNextRange->win.skey);
×
1336
      pCurRange->win.ekey = TMAX(pCurRange->win.ekey, pNextRange->win.ekey);
×
1337
      code = mergeSimpleHashMap(pCurRange->pGroupIds, pNextRange->pGroupIds);
×
1338
      QUERY_CHECK_CODE(code, lino, _end);
×
1339
      code = mergeSimpleHashMap(pCurRange->pUIds, pNextRange->pUIds);
×
1340
      QUERY_CHECK_CODE(code, lino, _end);
×
1341
      taosArrayRemove(pRangeArray, i+1);
×
1342
      continue;
×
1343
    }
1344
    i++;
×
1345
  }
1346

1347
  int32_t winRes = TSDB_CODE_SUCCESS;
×
1348
  pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
1349
  while (winRes == TSDB_CODE_SUCCESS) {
×
1350
    void*       pVal = NULL;
×
1351
    int32_t     vlen = 0;
×
1352
    SSessionKey key = {0};
×
1353
    winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, &pVal, &vlen);
×
1354
    if (winRes != TSDB_CODE_SUCCESS) {
×
1355
      break;
×
1356
    }
1357
    int32_t index = 0;
×
1358
    bool merge = false;
×
1359
    SScanRange tmpRange = {.win = key.win, .pUIds = NULL, .pGroupIds = NULL};
×
1360
    int32_t num = vlen / sizeof(uint64_t);
×
1361
    uint64_t* pUids = (uint64_t*) pVal;
×
1362
    for (int32_t i = 0; i < num; i++) {
×
1363
      code = mergeScanRange(pRangeArray, &tmpRange, key.groupId, pUids[i], &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1364
      QUERY_CHECK_CODE(code, lino, _end);
×
1365
    }
1366
    if (merge == true) {
×
1367
      code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1368
      QUERY_CHECK_CODE(code, lino, _end);
×
1369
    }
1370
  }
1371

1372
_end:
×
1373
  streamStateFreeCur(pCur);
×
1374
  if (code != TSDB_CODE_SUCCESS) {
×
1375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1376
  }
1377
  return code;
×
1378
}
1379

1380
int32_t popScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
×
1381
  int32_t code = TSDB_CODE_SUCCESS;
×
1382
  int32_t lino = 0;
×
1383
  SStreamStateCur* pCur = NULL;
×
1384
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1385
  if (taosArrayGetSize(pRangeArray) > 0) {
×
1386
    (*pRange) = *(SScanRange*) taosArrayGet(pRangeArray, 0);
×
1387
    taosArrayRemove(pRangeArray, 0);
×
1388
    goto _end;
×
1389
  }
1390

1391
  {
1392
    pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
1393
    SRecDataInfo* pRecVal = NULL;
×
1394
    int32_t       vlen = 0;
×
1395
    SSessionKey   key = {0};
×
1396
    int32_t winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, (void**)&pRecVal, &vlen);
×
1397
    if (winRes != TSDB_CODE_SUCCESS) {
×
1398
      goto _end;
×
1399
    }
1400
    qDebug("===stream===get scan range from disc start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64,
×
1401
           key.win.skey, key.win.ekey, key.groupId);
1402

1403
    pRange->win = key.win;
×
1404
    pRange->calWin = pRecVal->calWin;
×
1405
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1406
    if (pRange->pGroupIds == NULL) {
×
1407
      pRange->pGroupIds = tSimpleHashInit(8, hashFn);
×
1408
    }
1409
    if (pRange->pUIds == NULL) {
×
1410
      pRange->pUIds = tSimpleHashInit(8, hashFn);
×
1411
    }
1412
    code = putRangeIdInfo(pRange, key.groupId, pRecVal->tableUid);
×
1413
    QUERY_CHECK_CODE(code, lino, _end);
×
1414
    code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1415
    QUERY_CHECK_CODE(code, lino, _end);
×
1416
  }
1417

1418
_end:
×
1419
  streamStateFreeCur(pCur);
×
1420
  if (code != TSDB_CODE_SUCCESS) {
×
1421
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1422
  }
1423
  return code;
×
1424
}
1425

1426
int32_t getNLastSessionStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
1427
  int32_t code = TSDB_CODE_SUCCESS;
×
1428
  int32_t lino = 0;
×
1429
  if (pCur->pHashData == NULL) {
×
1430
    return TSDB_CODE_FAILED;
×
1431
  }
1432
  SArray*  pWinStates = *((void**)pCur->pHashData);
×
1433
  int32_t size = taosArrayGetSize(pWinStates);
×
1434
  if (size == 0) {
×
1435
    return TSDB_CODE_FAILED;
×
1436
  }
1437

1438
  int32_t i = TMAX(size - num, 0);
×
1439

1440
  for ( ; i < size; i++) {
×
1441
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
1442
    QUERY_CHECK_NULL(pPos, code, lino, _end, terrno);
×
1443

1444
    SResultWindowInfo winInfo = {0};
×
1445
    winInfo.pStatePos = pPos;
×
1446
    winInfo.sessionWin = *(SSessionKey*)pPos->pKey;
×
1447

1448
    void* pTempRes = taosArrayPush(pRes, &winInfo);
×
1449
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1450
  }
1451

1452
_end:
×
1453
  if (code != TSDB_CODE_SUCCESS) {
×
1454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1455
  }
1456
  return code;
×
1457
}
1458

1459
bool hasSessionState(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
1460
  SSHashObj*   pRowStateBuff = getRowStateBuff(pFileState);
×
1461
  void**       ppBuff = (void**)tSimpleHashGet(pRowStateBuff, &pKey->groupId, sizeof(uint64_t));
×
1462
  if (ppBuff == NULL) {
×
1463
    return false;
×
1464
  }
1465
  SArray*      pWinStates = (SArray*)(*ppBuff);
×
1466
  int32_t      size = taosArrayGetSize(pWinStates);
×
1467
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
1468
  SRowBuffPos* pPos = NULL;
×
1469

1470
  if (index >= 0) {
×
1471
    pPos = taosArrayGetP(pWinStates, index);
×
1472
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap)) {
×
1473
      *pKey = *((SSessionKey*)pPos->pKey);
×
1474
      *pIsLast = (index == size - 1);
×
1475
      return true;
×
1476
    }
1477
  }
1478

1479
  if (index + 1 < size) {
×
1480
    pPos = taosArrayGetP(pWinStates, index + 1);
×
1481
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap) ||
×
1482
        (pKey->win.ekey != INT64_MIN && inSessionWindow(pPos->pKey, pKey->win.ekey, gap))) {
×
1483
      *pKey = *((SSessionKey*)pPos->pKey);
×
1484
      *pIsLast = (index + 1 == size - 1);
×
1485
      return true;
×
1486
    }
1487
  }
1488
  return false;
×
1489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc