• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
#define MAX_SCAN_RANGE_SIZE 102400
24

25
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
×
26
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
×
27
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
×
28
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
×
29
}
30

31
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
×
32
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
×
33
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
×
34
  return sessionRangeKeyCmpr(pWin1, pWin2);
×
35
}
36

37
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
×
38
  int firstPos = 0, lastPos = num - 1, midPos = -1;
×
39
  int numOfRows = 0;
×
40

41
  if (num <= 0) return -1;
×
42
  // find the first position which is smaller or equal than the key.
43
  // if all data is bigger than the key return -1
44
  while (1) {
45
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
×
46
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
×
47
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
×
48

49
    numOfRows = lastPos - firstPos + 1;
×
50
    midPos = (numOfRows >> 1) + firstPos;
×
51

52
    if (cmpFn(key, keyList, midPos) < 0) {
×
53
      lastPos = midPos - 1;
×
54
    } else if (cmpFn(key, keyList, midPos) > 0) {
×
55
      firstPos = midPos + 1;
×
56
    } else {
57
      break;
×
58
    }
59
  }
60

61
  return midPos;
×
62
}
63

64
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
65
  SArray*       pWinInfos = (SArray*)data;
×
66
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
67
  if (ppos != NULL) {
×
68
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
69
    return pWin->win.ekey;
×
70
  } else {
71
    return 0;
×
72
  }
73
}
74

75
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
×
76
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
×
77
    return true;
×
78
  }
79
  return false;
×
80
}
81

82
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
×
83
  SStreamStateCur* pCur = createStreamStateCursor();
×
84
  if (pCur == NULL) {
×
85
    return NULL;
×
86
  }
87
  pCur->pStreamFileState = pFileState;
×
88
  return pCur;
×
89
}
90

91
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
×
92
                                   SRowBuffPos** ppPos) {
93
  int32_t      code = TSDB_CODE_SUCCESS;
×
94
  int32_t      lino = 0;
×
95
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
96
  if (!pNewPos || !pNewPos->pRowBuff) {
×
97
    code = TSDB_CODE_OUT_OF_MEMORY;
×
98
    QUERY_CHECK_CODE(code, lino, _end);
×
99
  }
100

101
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
102
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
×
103
  if (!tmp) {
×
104
    code = terrno;
×
105
    QUERY_CHECK_CODE(code, lino, _end);
×
106
  }
107
  (*ppPos) = pNewPos;
×
108

109
_end:
×
110
  if (code != TSDB_CODE_SUCCESS) {
×
111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
112
  }
113
  return code;
×
114
}
115

116
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
×
117
                                      int32_t index, SRowBuffPos** ppPos) {
118
  int32_t      code = TSDB_CODE_SUCCESS;
×
119
  int32_t      lino = 0;
×
120
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
121
  if (!pNewPos || !pNewPos->pRowBuff) {
×
122
    code = TSDB_CODE_OUT_OF_MEMORY;
×
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
127
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
×
128
  if (!tmp) {
×
129
    code = terrno;
×
130
    QUERY_CHECK_CODE(code, lino, _end);
×
131
  }
132

133
  *ppPos = pNewPos;
×
134

135
_end:
×
136
  if (code != TSDB_CODE_SUCCESS) {
×
137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
138
  }
139
  return code;
×
140
}
141

142
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
×
143
  int32_t      code = TSDB_CODE_SUCCESS;
×
144
  int32_t      lino = 0;
×
145
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
146
  if (!pNewPos || !pNewPos->pRowBuff) {
×
147
    code = TSDB_CODE_OUT_OF_MEMORY;
×
148
    QUERY_CHECK_CODE(code, lino, _end);
×
149
  }
150

151
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
152
  pNewPos->needFree = true;
×
153
  pNewPos->beFlushed = true;
×
154
  int32_t len = getRowStateRowSize(pFileState);
×
155
  if (p) {
×
156
    if (*pVLen > len){
×
157
      qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len);
×
158
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
159
      QUERY_CHECK_CODE(code, lino, _end);
×
160
    }else{
161
      memcpy(pNewPos->pRowBuff, p, *pVLen);
×
162
    }
163
  } else {
164
    memset(pNewPos->pRowBuff, 0, len);
×
165
  }
166

167
_end:
×
168
  taosMemoryFree(p);
×
169
  if (code != TSDB_CODE_SUCCESS) {
×
170
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
171
    return NULL;
×
172
  }
173
  return pNewPos;
×
174
}
175

176
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
×
177
                                int32_t* pWinCode) {
178
  int32_t code = TSDB_CODE_SUCCESS;
×
179
  int32_t lino = 0;
×
180
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
181
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
×
182
  SArray*    pWinStates = NULL;
×
183
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
184
  if (ppBuff) {
×
185
    pWinStates = (SArray*)(*ppBuff);
×
186
  } else {
187
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
188
    if (!pWinStates) {
×
189
      code = terrno;
×
190
      QUERY_CHECK_CODE(code, lino, _end);
×
191
    }
192
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
193
    QUERY_CHECK_CODE(code, lino, _end);
×
194
  }
195

196
  TSKEY startTs = pKey->win.skey;
×
197
  TSKEY endTs = pKey->win.ekey;
×
198

199
  int32_t size = taosArrayGetSize(pWinStates);
×
200
  if (size == 0) {
×
201
    void*   pFileStore = getStateFileStore(pFileState);
×
202
    void*   p = NULL;
×
203
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
×
204
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
205
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
×
206
      if (!(*pVal)) {
×
207
        code = TSDB_CODE_OUT_OF_MEMORY;
×
208
        QUERY_CHECK_CODE(code, lino, _end);
×
209
      }
210

211
      (*pWinCode) = code_file;
×
212
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
×
213
    } else {
214
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
×
215
      (*pWinCode) = TSDB_CODE_FAILED;
×
216
      taosMemoryFree(p);
×
217
      QUERY_CHECK_CODE(code, lino, _end);
×
218
    }
219
    goto _end;
×
220
  }
221

222
  // find the first position which is smaller than the pKey
223
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
224
  SRowBuffPos* pPos = NULL;
×
225

226
  if (index >= 0) {
×
227
    pPos = taosArrayGetP(pWinStates, index);
×
228
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
×
229
      (*pVal) = pPos;
×
230
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
231
      pPos->beUsed = true;
×
232
      pPos->beFlushed = false;
×
233
      *pKey = *pDestWinKey;
×
234
      goto _end;
×
235
    }
236
  }
237

238
  if (index + 1 < size) {
×
239
    pPos = taosArrayGetP(pWinStates, index + 1);
×
240
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
×
241
      (*pVal) = pPos;
×
242
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
243
      pPos->beUsed = true;
×
244
      pPos->beFlushed = false;
×
245
      *pKey = *pDestWinKey;
×
246
      goto _end;
×
247
    }
248
  }
249

250
  if (index + 1 == 0) {
×
251
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
×
252
      void*   p = NULL;
×
253
      void*   pFileStore = getStateFileStore(pFileState);
×
254
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
×
255
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
256
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
×
257
        if (!(*pVal)) {
×
258
          code = TSDB_CODE_OUT_OF_MEMORY;
×
259
          QUERY_CHECK_CODE(code, lino, _end);
×
260
        }
261

262
        (*pWinCode) = code_file;
×
263
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
×
264
        goto _end;
×
265
      } else {
266
        taosMemoryFree(p);
×
267
      }
268
    }
269
  }
270

271
  if (index == size - 1) {
×
272
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
×
273
    QUERY_CHECK_CODE(code, lino, _end);
×
274

275
    (*pWinCode) = TSDB_CODE_FAILED;
×
276
    goto _end;
×
277
  }
278

279
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
×
280
  QUERY_CHECK_CODE(code, lino, _end);
×
281

282
  (*pWinCode) = TSDB_CODE_FAILED;
×
283

284
_end:
×
285
  if (code != TSDB_CODE_SUCCESS) {
×
286
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
287
  }
288
  return code;
×
289
}
290

291
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
292
                          int32_t* pWinCode) {
293
  SWinKey*    pTmpkey = pKey;
×
294
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
×
295
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
×
296
}
297

298
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
299
  int32_t      code = TSDB_CODE_SUCCESS;
×
300
  int32_t      lino = 0;
×
301
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
302
  SSessionKey* pKey = pPos->pKey;
×
303
  SArray*      pWinStates = NULL;
×
304
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
305
  if (ppBuff) {
×
306
    pWinStates = (SArray*)(*ppBuff);
×
307
  } else {
308
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
309
    if (!pWinStates) {
×
310
      code = terrno;
×
311
      QUERY_CHECK_CODE(code, lino, _end);
×
312
    }
313

314
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
315
    QUERY_CHECK_CODE(code, lino, _end);
×
316
  }
317

318
  int32_t size = taosArrayGetSize(pWinStates);
×
319
  if (size == 0) {
×
320
    void* tmp = taosArrayPush(pWinStates, &pPos);
×
321
    if (!tmp) {
×
322
      code = terrno;
×
323
      QUERY_CHECK_CODE(code, lino, _end);
×
324
    }
325
    goto _end;
×
326
  }
327

328
  // find the first position which is smaller than the pKey
329
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
330
  if (index >= 0) {
×
331
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
332
    if (!tmp) {
×
333
      code = terrno;
×
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336
  } else {
337
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
×
338
    if (!tmp) {
×
339
      code = terrno;
×
340
      QUERY_CHECK_CODE(code, lino, _end);
×
341
    }
342
  }
343

344
_end:
×
345
  pPos->needFree = false;
×
346
  if (code != TSDB_CODE_SUCCESS) {
×
347
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
348
  }
349
  return code;
×
350
}
351

352
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
×
353
                              int32_t* pWinCode) {
354
  int32_t      code = TSDB_CODE_SUCCESS;
×
355
  int32_t      lino = 0;
×
356
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
357
  if (!pNewPos || !pNewPos->pRowBuff) {
×
358
    code = TSDB_CODE_OUT_OF_MEMORY;
×
359
    QUERY_CHECK_CODE(code, lino, _end);
×
360
  }
361
  pNewPos->needFree = true;
×
362
  pNewPos->beFlushed = true;
×
363
  void* pBuff = NULL;
×
364
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
×
365
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
×
366
    goto _end;
×
367
  }
368
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
369
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
×
370
  taosMemoryFreeClear(pBuff);
×
371
  (*pVal) = pNewPos;
×
372

373
_end:
×
374
  if (code != TSDB_CODE_SUCCESS) {
×
375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
376
  }
377
  return code;
×
378
}
379

380
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
×
381
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
×
382
  SSessionKey* pWinKey = (SSessionKey*)key;
×
383
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
384
  if (!ppBuff) {
×
385
    return TSDB_CODE_SUCCESS;
×
386
  }
387
  SArray* pWinStates = (SArray*)(*ppBuff);
×
388
  int32_t size = taosArrayGetSize(pWinStates);
×
389
  TSKEY   gap = 0;
×
390
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
391
  if (index >= 0) {
×
392
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
×
393
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
×
394
      pPos->beFlushed = true;
×
395
      taosArrayRemove(pWinStates, index);
×
396
    }
397
  }
398
  return TSDB_CODE_SUCCESS;
×
399
}
400

401
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
402
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
403
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
×
404
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
405
  if (!ppBuff) {
×
406
    return;
×
407
  }
408
  SArray* pWinStates = (SArray*)(*ppBuff);
×
409
  int32_t size = taosArrayGetSize(pWinStates);
×
410
  TSKEY   gap = 0;
×
411
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
412
  if (index >= 0) {
×
413
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
×
414
    if (pItemPos == pPos) {
×
415
      taosArrayRemove(pWinStates, index);
×
416
    }
417
  }
418
}
419

420
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
×
421
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
422
  int32_t      code = TSDB_CODE_SUCCESS;
×
423
  int32_t      lino = 0;
×
424
  SRowBuffPos* pNewPos = NULL;
×
425
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
426
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
427
  SArray*      pWinStates = NULL;
×
428
  if (!ppBuff) {
×
429
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
430
    if (!pWinStates) {
×
431
      code = terrno;
×
432
      QUERY_CHECK_CODE(code, lino, _end);
×
433
    }
434

435
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
436
    QUERY_CHECK_CODE(code, lino, _end);
×
437
  } else {
438
    pWinStates = (SArray*)(*ppBuff);
×
439
  }
440
  if (!pCur) {
×
441
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
×
442
    QUERY_CHECK_CODE(code, lino, _end);
×
443

444
    goto _end;
×
445
  }
446

447
  int32_t size = taosArrayGetSize(pWinStates);
×
448
  if (pCur->buffIndex >= 0) {
×
449
    if (pCur->buffIndex >= size) {
×
450
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
×
451
      QUERY_CHECK_CODE(code, lino, _end);
×
452

453
      goto _end;
×
454
    }
455
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
×
456
    QUERY_CHECK_CODE(code, lino, _end);
×
457

458
    goto _end;
×
459
  } else {
460
    if (size > 0) {
×
461
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
×
462
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
×
463
        // pCur is invalid
464
        SSessionKey pTmpKey = *pWinKey;
×
465
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
466
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
467
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
468
        QUERY_CHECK_CODE(code, lino, _end);
×
469
        goto _end;
×
470
      }
471
    }
472
    pNewPos = getNewRowPosForWrite(pFileState);
×
473
    if (!pNewPos || !pNewPos->pRowBuff) {
×
474
      code = TSDB_CODE_OUT_OF_MEMORY;
×
475
      QUERY_CHECK_CODE(code, lino, _end);
×
476
    }
477

478
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
×
479
    pNewPos->needFree = true;
×
480
    pNewPos->beFlushed = true;
×
481
  }
482

483
_end:
×
484
  (*ppVal) = pNewPos;
×
485
  if (code != TSDB_CODE_SUCCESS) {
×
486
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
487
  }
488
  return code;
×
489
}
490

491
void sessionWinStateClear(SStreamFileState* pFileState) {
×
492
  int32_t buffSize = getRowStateRowSize(pFileState);
×
493
  void*   pIte = NULL;
×
494
  size_t  keyLen = 0;
×
495
  int32_t iter = 0;
×
496
  void*   pBuff = getRowStateBuff(pFileState);
×
497
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
×
498
    SArray* pWinStates = *((void**)pIte);
×
499
    int32_t size = taosArrayGetSize(pWinStates);
×
500
    for (int32_t i = 0; i < size; i++) {
×
501
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
502
      memset(pPos->pRowBuff, 0, buffSize);
×
503
    }
504
  }
505
}
×
506

507
void freeArrayPtr(void* ptr) {
×
508
  SArray* pArray = *(void**)ptr;
×
509
  taosArrayDestroy(pArray);
×
510
}
×
511

512
void sessionWinStateCleanup(void* pBuff) {
×
513
  if (pBuff == NULL) {
×
514
    return;
×
515
  }
516
  tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
×
517
  tSimpleHashCleanup(pBuff);
×
518
}
519

520
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
×
521
                                                SArray** pWins, int32_t* pIndex) {
522
  SStreamStateCur* pCur = NULL;
×
523
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
×
524
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
525
  if (!ppBuff) {
×
526
    return NULL;
×
527
  }
528

529
  SArray* pWinStates = (SArray*)(*ppBuff);
×
530
  int32_t size = taosArrayGetSize(pWinStates);
×
531
  TSKEY   gap = 0;
×
532
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
533

534
  if (pWins) {
×
535
    (*pWins) = pWinStates;
×
536
  }
537

538
  if (size > 0 && index == -1) {
×
539
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
×
540
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
×
541
    if (pWinKey->win.skey == pWin->win.skey) {
×
542
      index = 0;
×
543
    }
544
  }
545

546
  if (index >= 0) {
×
547
    pCur = createStateCursor(pFileState);
×
548
    if (pCur == NULL) {
×
549
      return NULL;
×
550
    }
551
    pCur->buffIndex = index;
×
552
    if (pIndex) {
×
553
      *pIndex = index;
×
554
    }
555
  }
556

557
  return pCur;
×
558
}
559

560
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
561
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
×
562
  if (pCur) {
×
563
    return pCur;
×
564
  }
565

566
  void* pFileStore = getStateFileStore(pFileState);
×
567
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
×
568
  if (!pCur) {
×
569
    return NULL;
×
570
  }
571
  pCur->buffIndex = -1;
×
572
  pCur->pStreamFileState = pFileState;
×
573
  return pCur;
×
574
}
575

576
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
577
  SArray*          pWinStates = NULL;
×
578
  int32_t          index = -1;
×
579
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
580
  if (pCur) {
×
581
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
582
    if (cmpRes > 0) {
×
583
      return pCur;
×
584
    } else if (cmpRes == 0 && index > 0) {
×
585
      sessionWinStateMoveToPrev(pCur);
×
586
      return pCur;
×
587
    }
588
    streamStateFreeCur(pCur);
×
589
    pCur = NULL;
×
590
  }
591

592
  void* pFileStore = getStateFileStore(pFileState);
×
593
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
594
  if (!pCur) {
×
595
    return NULL;
×
596
  }
597
  pCur->buffIndex = -1;
×
598
  pCur->pStreamFileState = pFileState;
×
599
  return pCur;
×
600
}
601

602
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
×
603
  if (!pCur) {
×
604
    return;
×
605
  }
606
  streamStateResetCur(pCur);
×
607
  pCur->buffIndex = 0;
×
608
  pCur->pStreamFileState = pFileState;
×
609
}
610

611
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
×
612
                                    SStreamStateCur** ppCur) {
613
  SSessionKey key = {.groupId = groupId};
×
614
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
×
615
  if (taosArrayGetSize(pWinStates) > 0 &&
×
616
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
×
617
    if (!(*ppCur)) {
×
618
      (*ppCur) = createStateCursor(pFileState);
×
619
    }
620
    transformCursor(pFileState, *ppCur);
×
621
  } else if (*ppCur) {
×
622
    (*ppCur)->buffIndex = -1;
×
623
    (*ppCur)->pStreamFileState = pFileState;
×
624
  }
625
}
×
626

627
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
628
  SArray*          pWinStates = NULL;
×
629
  int32_t          index = -1;
×
630
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
631
  if (pCur) {
×
632
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
×
633
      sessionWinStateMoveToNext(pCur);
×
634
    }
635
    return pCur;
×
636
  }
637

638
  void* pFileStore = getStateFileStore(pFileState);
×
639
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
×
640
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
×
641
  return pCur;
×
642
}
643

644
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
×
645
  SArray*          pWinStates = NULL;
×
646
  int32_t          index = -1;
×
647
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
648
  if (pCur) {
×
649
    sessionWinStateMoveToNext(pCur);
×
650
    return pCur;
×
651
  }
652

653
  void* pFileStore = getStateFileStore(pFileState);
×
654
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
×
655
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
×
656
  return pCur;
×
657
}
658

659
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
×
660
  SArray*          pWinStates = NULL;
×
661
  int32_t          index = -1;
×
662
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
663
  int32_t          resSize = getRowStateRowSize(pFileState);
×
664
  COUNT_TYPE       winCount = 0;
×
665
  if (pBuffCur) {
×
666
    while (index >= 0) {
×
667
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
×
668
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
×
669
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
×
670
        index--;
×
671
      } else if (index >= 0) {
×
672
        pBuffCur->buffIndex = index + 1;
×
673
        return pBuffCur;
×
674
      }
675
    }
676
    pBuffCur->buffIndex = 0;
×
677
  } else if (taosArrayGetSize(pWinStates) > 0) {
×
678
    pBuffCur = createStateCursor(pFileState);
×
679
    if (pBuffCur == NULL) {
×
680
      return NULL;
×
681
    }
682
    pBuffCur->buffIndex = 0;
×
683
  }
684

685
  void*            pFileStore = getStateFileStore(pFileState);
×
686
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
687
  if (pCur) {
×
688
    pCur->pStreamFileState = pFileState;
×
689
    SSessionKey key = {0};
×
690
    void*       pVal = NULL;
×
691
    int         len = 0;
×
692
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
693
    if (code == TSDB_CODE_FAILED) {
×
694
      streamStateFreeCur(pCur);
×
695
      return pBuffCur;
×
696
    }
697
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
698
    taosMemoryFreeClear(pVal);
×
699
    streamStateFreeCur(pBuffCur);
×
700
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
701
      streamStateCurNext(pFileStore, pCur);
×
702
      return pCur;
×
703
    }
704
    streamStateCurPrev(pFileStore, pCur);
×
705
    while (1) {
706
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
707
      if (code == TSDB_CODE_FAILED) {
×
708
        streamStateCurNext(pFileStore, pCur);
×
709
        return pCur;
×
710
      }
711
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
712
      taosMemoryFreeClear(pVal);
×
713
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
714
        streamStateCurPrev(pFileStore, pCur);
×
715
      } else {
716
        streamStateCurNext(pFileStore, pCur);
×
717
        return pCur;
×
718
      }
719
    }
720
  }
721
  return pBuffCur;
×
722
}
723

724
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
×
725
  if (!pCur) {
×
726
    return TSDB_CODE_FAILED;
×
727
  }
728
  int32_t code = TSDB_CODE_SUCCESS;
×
729

730
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
×
731
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
×
732
  SArray*    pWinStates = NULL;
×
733
  if (ppBuff) {
×
734
    pWinStates = (SArray*)(*ppBuff);
×
735
  }
736

737
  if (pCur->buffIndex >= 0) {
×
738
    int32_t size = taosArrayGetSize(pWinStates);
×
739
    if (pCur->buffIndex >= size) {
×
740
      return TSDB_CODE_FAILED;
×
741
    }
742
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
×
743
    if (pVal) {
×
744
      *pVal = pPos;
×
745
    }
746
    *pKey = *(SSessionKey*)(pPos->pKey);
×
747
  } else {
748
    void* pData = NULL;
×
749
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
×
750
    if (taosArrayGetSize(pWinStates) > 0 &&
×
751
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
×
752
      transformCursor(pCur->pStreamFileState, pCur);
×
753
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
×
754
      if (pVal) {
×
755
        *pVal = pPos;
×
756
      }
757
      *pKey = *(SSessionKey*)(pPos->pKey);
×
758
      code = TSDB_CODE_SUCCESS;
×
759
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
×
760
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
×
761
      if (!pNewPos || !pNewPos->pRowBuff) {
×
762
        code = TSDB_CODE_OUT_OF_MEMORY;
×
763
        taosMemoryFreeClear(pData);
×
764
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
765
        return code;
×
766
      }
767
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
×
768
      pNewPos->needFree = true;
×
769
      pNewPos->beFlushed = true;
×
770
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
×
771
      (*pVal) = pNewPos;
×
772
    }
773
    taosMemoryFreeClear(pData);
×
774
  }
775
  return code;
×
776
}
777

778
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
×
779
  qTrace("move cursor to next");
×
780
  if (pCur && pCur->buffIndex >= 0) {
×
781
    pCur->buffIndex++;
×
782
  } else {
783
    streamStateCurNext_rocksdb(pCur);
×
784
  }
785
}
×
786

787
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
788
  qTrace("move cursor to prev");
×
789
  if (pCur && pCur->buffIndex >= 1) {
×
790
    pCur->buffIndex--;
×
791
  } else {
792
    streamStateCurPrev_rocksdb(pCur);
×
793
  }
794
}
×
795

796
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
×
797
                                     range_cmpr_fn cmpFn) {
798
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
×
799
  SSessionKey      tmpKey = *key;
×
800
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
801
  bool             hasCurrentPrev = true;
×
802
  if (code == TSDB_CODE_FAILED) {
×
803
    streamStateFreeCur(pCur);
×
804
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
×
805
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
806
    hasCurrentPrev = false;
×
807
  }
808

809
  if (code == TSDB_CODE_FAILED) {
×
810
    code = TSDB_CODE_FAILED;
×
811
    goto _end;
×
812
  }
813

814
  if (cmpFn(key, &tmpKey) == 0) {
×
815
    *curKey = tmpKey;
×
816
    goto _end;
×
817
  } else if (!hasCurrentPrev) {
×
818
    code = TSDB_CODE_FAILED;
×
819
    goto _end;
×
820
  }
821

822
  sessionWinStateMoveToNext(pCur);
×
823
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
×
824
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
×
825
    *curKey = tmpKey;
×
826
  } else {
827
    code = TSDB_CODE_FAILED;
×
828
  }
829

830
_end:
×
831
  streamStateFreeCur(pCur);
×
832
  return code;
×
833
}
834

835
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
×
836
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
837
  int32_t code = TSDB_CODE_SUCCESS;
×
838
  int32_t lino = 0;
×
839
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
840

841
  SSessionKey* pWinKey = key;
×
842
  TSKEY        gap = 0;
×
843
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
844
  SArray*      pWinStates = NULL;
×
845

846
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
847
  if (ppBuff) {
×
848
    pWinStates = (SArray*)(*ppBuff);
×
849
  } else {
850
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
851
    if (!pWinStates) {
×
852
      code = terrno;
×
853
      QUERY_CHECK_CODE(code, lino, _end);
×
854
    }
855

856
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
857
    QUERY_CHECK_CODE(code, lino, _end);
×
858
  }
859

860
  TSKEY startTs = pWinKey->win.skey;
×
861
  TSKEY endTs = pWinKey->win.ekey;
×
862

863
  int32_t size = taosArrayGetSize(pWinStates);
×
864
  if (size == 0) {
×
865
    void*   pFileStore = getStateFileStore(pFileState);
×
866
    void*   p = NULL;
×
867
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
×
868
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
869
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
×
870
      if (!(*pVal)) {
×
871
        code = TSDB_CODE_OUT_OF_MEMORY;
×
872
        QUERY_CHECK_CODE(code, lino, _end);
×
873
      }
874

875
      (*pWinCode) = code_file;
×
876
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
877
             pWinKey->win.ekey, code_file);
878
    } else {
879
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
×
880
      (*pWinCode) = TSDB_CODE_FAILED;
×
881
      taosMemoryFree(p);
×
882
      QUERY_CHECK_CODE(code, lino, _end);
×
883
    }
884
    goto _end;
×
885
  }
886

887
  // find the first position which is smaller than the pWinKey
888
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
889
  SRowBuffPos* pPos = NULL;
×
890
  int32_t      valSize = *pVLen;
×
891

892
  if (index >= 0) {
×
893
    pPos = taosArrayGetP(pWinStates, index);
×
894
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
×
895
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
×
896
      (*pVal) = pPos;
×
897
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
898
      pPos->beUsed = true;
×
899
      pPos->beFlushed = false;
×
900
      *key = *pDestWinKey;
×
901
      goto _end;
×
902
    }
903
  }
904

905
  if (index + 1 < size) {
×
906
    pPos = taosArrayGetP(pWinStates, index + 1);
×
907
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
×
908
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
×
909
        fn(pKeyData, stateKey) == true) {
×
910
      (*pVal) = pPos;
×
911
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
912
      pPos->beUsed = true;
×
913
      pPos->beFlushed = false;
×
914
      *key = *pDestWinKey;
×
915
      goto _end;
×
916
    }
917
  }
918

919
  if (index + 1 == 0) {
×
920
    if (!isDeteled(pFileState, endTs)) {
×
921
      void*   p = NULL;
×
922
      void*   pFileStore = getStateFileStore(pFileState);
×
923
      int32_t code_file =
924
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
×
925
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
926
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
×
927
        if (!(*pVal)) {
×
928
          code = TSDB_CODE_OUT_OF_MEMORY;
×
929
          QUERY_CHECK_CODE(code, lino, _end);
×
930
        }
931

932
        (*pWinCode) = code_file;
×
933
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
934
               pWinKey->win.ekey, code_file);
935
        goto _end;
×
936
      } else {
937
        taosMemoryFree(p);
×
938
      }
939
    }
940
  }
941

942
  if (index == size - 1) {
×
943
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
×
944
    QUERY_CHECK_CODE(code, lino, _end);
×
945

946
    (*pWinCode) = TSDB_CODE_FAILED;
×
947
    goto _end;
×
948
  }
949
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
×
950
  QUERY_CHECK_CODE(code, lino, _end);
×
951

952
  (*pWinCode) = TSDB_CODE_FAILED;
×
953

954
_end:
×
955
  if (code != TSDB_CODE_SUCCESS) {
×
956
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
957
  }
958
  return code;
×
959
}
960

961
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
×
962
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
×
963
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
964
  streamStateFreeCur(pCur);
×
965
  if (code == TSDB_CODE_SUCCESS) {
×
966
    return code;
×
967
  } else {
968
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
×
969
  }
970

971
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
×
972
  streamStateFreeCur(pCur);
×
973
  return code;
×
974
}
975

976
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
×
977
                              int32_t* pVLen, int32_t* pWinCount) {
978
  int32_t code = TSDB_CODE_SUCCESS;
×
979
  int32_t lino = 0;
×
980
  (*pWinCount) = TSDB_CODE_SUCCESS;
×
981

982
  SSessionKey* pWinKey = pKey;
×
983
  const TSKEY  gap = 0;
×
984
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
985
  SArray*      pWinStates = NULL;
×
986
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
987
  if (ppBuff) {
×
988
    pWinStates = (SArray*)(*ppBuff);
×
989
  } else {
990
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
991
    if (!pWinStates) {
×
992
      code = terrno;
×
993
      QUERY_CHECK_CODE(code, lino, _end);
×
994
    }
995

996
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
997
    QUERY_CHECK_CODE(code, lino, _end);
×
998
  }
999

1000
  TSKEY startTs = pWinKey->win.skey;
×
1001
  TSKEY endTs = pWinKey->win.ekey;
×
1002

1003
  int32_t size = taosArrayGetSize(pWinStates);
×
1004
  if (size == 0) {
×
1005
    void* pFileStore = getStateFileStore(pFileState);
×
1006
    void* pRockVal = NULL;
×
1007
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
×
1008
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
×
1009
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1010
             pWinKey->win.ekey, (*pWinCount));
1011
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
×
1012
        int32_t     valSize = *pVLen;
×
1013
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1014
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
×
1015
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1016
          if (!(*pVal)) {
×
1017
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1018
            QUERY_CHECK_CODE(code, lino, _end);
×
1019
          }
1020

1021
          goto _end;
×
1022
        }
1023
      }
1024
      pWinKey->win.skey = startTs;
×
1025
      pWinKey->win.ekey = endTs;
×
1026
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
×
1027
      taosMemoryFree(pRockVal);
×
1028
      if (!(*pVal)) {
×
1029
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1030
        QUERY_CHECK_CODE(code, lino, _end);
×
1031
      }
1032
    } else {
1033
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1034
      QUERY_CHECK_CODE(code, lino, _end);
×
1035

1036
      (*pWinCount) = TSDB_CODE_FAILED;
×
1037
    }
1038
    goto _end;
×
1039
  }
1040

1041
  // find the first position which is smaller than the pWinKey
1042
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
×
1043
  SRowBuffPos* pPos = NULL;
×
1044
  int32_t      valSize = *pVLen;
×
1045

1046
  if (index >= 0) {
×
1047
    pPos = taosArrayGetP(pWinStates, index);
×
1048
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
×
1049
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
×
1050
      (*pVal) = pPos;
×
1051
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
1052
      pPos->beUsed = true;
×
1053
      pPos->beFlushed = false;
×
1054
      *pWinKey = *pDestWinKey;
×
1055
      goto _end;
×
1056
    }
1057
  }
1058

1059
  if (index == -1) {
×
1060
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
×
1061
      SSessionKey tmpKey = *pWinKey;
×
1062
      void*       pRockVal = NULL;
×
1063
      void*       pFileStore = getStateFileStore(pFileState);
×
1064
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1065
      if (code_file == TSDB_CODE_SUCCESS) {
×
1066
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1067
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1068
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1069
          *pWinKey = tmpKey;
×
1070
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1071
          if (!(*pVal)) {
×
1072
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1073
            QUERY_CHECK_CODE(code, lino, _end);
×
1074
          }
1075

1076
          (*pWinCount) = code_file;
×
1077
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1078
                 pWinKey->win.ekey, code_file);
1079
          goto _end;
×
1080
        }
1081
      }
1082
      taosMemoryFree(pRockVal);
×
1083
    }
1084
  }
1085

1086
  if (index + 1 < size) {
×
1087
    pPos = taosArrayGetP(pWinStates, index + 1);
×
1088
    (*pVal) = pPos;
×
1089
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
×
1090
    pPos->beUsed = true;
×
1091
    pPos->beFlushed = false;
×
1092
    *pWinKey = *pDestWinKey;
×
1093
    goto _end;
×
1094
  }
1095

1096
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1097
  QUERY_CHECK_CODE(code, lino, _end);
×
1098

1099
  (*pWinCount) = TSDB_CODE_FAILED;
×
1100

1101
_end:
×
1102
  if (code != TSDB_CODE_SUCCESS) {
×
1103
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1104
  }
1105
  return code;
×
1106
}
1107

1108
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
×
1109
                                 int32_t* pVLen) {
1110
  SSessionKey* pWinKey = pKey;
×
1111
  const TSKEY  gap = 0;
×
1112
  int32_t      code = TSDB_CODE_SUCCESS;
×
1113
  int32_t      lino = 0;
×
1114
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
×
1115
  SArray*      pWinStates = NULL;
×
1116
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
×
1117
  if (ppBuff) {
×
1118
    pWinStates = (SArray*)(*ppBuff);
×
1119
  } else {
1120
    pWinStates = taosArrayInit(16, POINTER_BYTES);
×
1121
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
1122
    QUERY_CHECK_CODE(code, lino, _end);
×
1123
  }
1124

1125
  TSKEY startTs = pWinKey->win.skey;
×
1126
  TSKEY endTs = pWinKey->win.ekey;
×
1127

1128
  int32_t size = taosArrayGetSize(pWinStates);
×
1129
  if (size == 0) {
×
1130
    void* pFileStore = getStateFileStore(pFileState);
×
1131
    void* pRockVal = NULL;
×
1132

1133
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
×
1134
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
×
1135
      int32_t     valSize = *pVLen;
×
1136
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1137
      if ((*pWinStateCount) == winCount) {
×
1138
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1139
        QUERY_CHECK_CODE(code, lino, _end);
×
1140
      } else {
1141
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1142
        if (!(*pVal)) {
×
1143
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1144
          QUERY_CHECK_CODE(code, lino, _end);
×
1145
        }
1146
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1147
               pWinKey->win.ekey, code_file);
1148
      }
1149
    } else {
1150
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1151
      taosMemoryFree(pRockVal);
×
1152
      QUERY_CHECK_CODE(code, lino, _end);
×
1153
    }
1154
  } else {
1155
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1156
    QUERY_CHECK_CODE(code, lino, _end);
×
1157
  }
1158

1159
_end:
×
1160
  if (code != TSDB_CODE_SUCCESS) {
×
1161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1162
  }
1163
  return code;
×
1164
}
1165

1166
static int compareRangeKey(const void* pKey, const void* data, int index) {
×
1167
  SScanRange* pRange1 = (SScanRange*) pKey;
×
1168
  SScanRange* pRange2 = taosArrayGet((SArray*)data, index);
×
1169
  if (pRange1->win.skey > pRange2->win.skey) {
×
1170
    return 1;
×
1171
  } else if (pRange1->win.skey < pRange2->win.skey) {
×
1172
    return -1;
×
1173
  }
1174

1175
  if (pRange1->win.ekey > pRange2->win.ekey) {
×
1176
    return 1;
×
1177
  } else if (pRange1->win.ekey < pRange2->win.ekey) {
×
1178
    return -1;
×
1179
  }
1180

1181
  return 0;
×
1182
}
1183

1184
static int32_t scanRangeKeyCmpr(const SScanRange* pRange1, const SScanRange* pRange2) {
×
1185
  if (pRange1->win.skey > pRange2->win.ekey) {
×
1186
    return 1;
×
1187
  } else if (pRange1->win.ekey < pRange2->win.skey) {
×
1188
    return -1;
×
1189
  }
1190

1191
  return 0;
×
1192
}
1193

1194
static int32_t putRangeIdInfo(SScanRange* pRangeKey, uint64_t gpId, uint64_t uId) {
×
1195
  int32_t code = TSDB_CODE_SUCCESS;
×
1196
  int32_t lino = 0;
×
1197

1198
  code = tSimpleHashPut(pRangeKey->pUIds, &uId, sizeof(uint64_t), NULL, 0);
×
1199
  QUERY_CHECK_CODE(code, lino, _end);
×
1200
  code = tSimpleHashPut(pRangeKey->pGroupIds, &gpId, sizeof(uint64_t), NULL, 0);
×
1201
  QUERY_CHECK_CODE(code, lino, _end);
×
1202
_end:
×
1203
  if (code != TSDB_CODE_SUCCESS) {
×
1204
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1205
  }
1206
  return code;
×
1207
}
1208

1209
void mergeRangeKey(SScanRange* pRangeDest, SScanRange* pRangeSrc) {
×
1210
  pRangeDest->win.skey = TMIN(pRangeDest->win.skey, pRangeSrc->win.skey);
×
1211
  pRangeDest->win.ekey = TMAX(pRangeDest->win.ekey, pRangeSrc->win.ekey);
×
1212
  pRangeDest->calWin.skey = TMIN(pRangeDest->calWin.skey, pRangeSrc->calWin.skey);
×
1213
  pRangeDest->calWin.ekey = TMAX(pRangeDest->calWin.ekey, pRangeSrc->calWin.ekey);
×
1214
}
×
1215

1216
int32_t mergeScanRange(SArray* pRangeArray, SScanRange* pRangeKey, uint64_t gpId, uint64_t uId, int32_t* pIndex, bool* pRes, char* idStr) {
×
1217
  int32_t code = TSDB_CODE_SUCCESS;
×
1218
  int32_t lino = 0;
×
1219
  int32_t size = taosArrayGetSize(pRangeArray);
×
1220
  int32_t index = binarySearch(pRangeArray, size, pRangeKey, compareRangeKey);
×
1221
  if (index >= 0) {
×
1222
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index);
×
1223
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1224
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1225
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1226
      QUERY_CHECK_CODE(code, lino, _end);
×
1227
      *pRes = true;
×
1228
      goto _end;
×
1229
    }
1230
  }
1231
  if (index + 1 < size) {
×
1232
    SScanRange* pFindRangeKey = (SScanRange*) taosArrayGet(pRangeArray, index + 1);
×
1233
    if (scanRangeKeyCmpr(pFindRangeKey, pRangeKey) == 0) {
×
1234
      mergeRangeKey(pFindRangeKey, pRangeKey);
×
1235
      code = putRangeIdInfo(pFindRangeKey, gpId, uId);
×
1236
      QUERY_CHECK_CODE(code, lino, _end);
×
1237
      *pRes = true;
×
1238
      goto _end;
×
1239
    }
1240
  }
1241
  (*pIndex) = index;
×
1242
  *pRes = false;
×
1243

1244
_end:
×
1245
  qDebug("===stream===%s mergeScanRange start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64 ", uid:%" PRIu64
×
1246
         ", res:%d", idStr, pRangeKey->win.skey, pRangeKey->win.ekey, gpId, uId, *pRes);
1247
  if (code != TSDB_CODE_SUCCESS) {
×
1248
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1249
  }
1250
  return code;
×
1251
}
1252

1253
int32_t mergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId, SRecDataInfo* pRecData, int32_t recLen) {
×
1254
  int32_t code = TSDB_CODE_SUCCESS;
×
1255
  int32_t lino = 0;
×
1256

1257
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1258
  uint64_t uId = pRecData->tableUid;
×
1259

1260
  int32_t index = 0;
×
1261
  bool merge = false;
×
1262
  SScanRange rangeKey = {.win = *pWin, .calWin = pRecData->calWin, .pUIds = NULL, .pGroupIds = NULL};
×
1263
  code = mergeScanRange(pRangeArray, &rangeKey, gpId, uId, &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1264
  QUERY_CHECK_CODE(code, lino, _end);
×
1265
  if (merge == true) {
×
1266
    goto _end;
×
1267
  }
1268

1269
  int32_t size = taosArrayGetSize(pRangeArray);
×
1270
  if (size > MAX_SCAN_RANGE_SIZE) {
×
1271
    SSessionKey sesKey = {.win = *pWin, .groupId = gpId};
×
1272
    void* pVal = NULL;
×
1273
    int32_t len = 0;
×
1274
    int32_t winCode = streamStateSessionGet_rocksdb(pTsDataState->pState, &sesKey, &pVal, &len);
×
1275
    if (winCode != TSDB_CODE_SUCCESS) {
×
1276
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, &uId, sizeof(uint64_t));
×
1277
    } else {
1278
      char* pTempBuf = taosMemoryRealloc(pVal, len + sizeof(uint64_t));
×
1279
      QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1280
      memcpy(pTempBuf+len, &uId, sizeof(uint64_t));
×
1281
      code = streamStateSessionPut_rocksdb(pTsDataState->pState, &sesKey, pTempBuf, len + sizeof(uint64_t));
×
1282
      taosMemFreeClear(pTempBuf);
×
1283
    }
1284
    QUERY_CHECK_CODE(code, lino, _end);
×
1285
    goto _end;
×
1286
  }
1287
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1288
  rangeKey.pGroupIds = tSimpleHashInit(8, hashFn);
×
1289
  rangeKey.pUIds = tSimpleHashInit(8, hashFn);
×
1290
  code = putRangeIdInfo(&rangeKey, gpId, uId);
×
1291
  QUERY_CHECK_CODE(code, lino, _end);
×
1292
  index++;
×
1293
  taosArrayInsert(pRangeArray, index, &rangeKey);
×
1294

1295
_end:
×
1296
  if (code != TSDB_CODE_SUCCESS) {
×
1297
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1298
  }
1299
  return code;
×
1300
}
1301

1302
int32_t mergeSimpleHashMap(SSHashObj* pDest, SSHashObj* pSource) {
×
1303
  int32_t code = TSDB_CODE_SUCCESS;
×
1304
  int32_t lino = 0;
×
1305
  void*   pIte = NULL;
×
1306
  int32_t iter = 0;
×
1307
  while ((pIte = tSimpleHashIterate(pSource, pIte, &iter)) != NULL) {
×
1308
    size_t keyLen = 0;
×
1309
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1310
    code = tSimpleHashPut(pDest, pKey, keyLen, pIte, sizeof(uint64_t));
×
1311
    QUERY_CHECK_CODE(code, lino, _end);
×
1312
  }
1313
  tSimpleHashCleanup(pSource);
×
1314

1315
_end:
×
1316
  if (code != TSDB_CODE_SUCCESS) {
×
1317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1318
  }
1319
  return code;
×
1320
}
1321

1322
int32_t mergeAllScanRange(STableTsDataState* pTsDataState) {
×
1323
  int32_t code = TSDB_CODE_SUCCESS;
×
1324
  int32_t lino = 0;
×
1325
  SStreamStateCur* pCur = NULL;
×
1326
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1327
  if (taosArrayGetSize(pRangeArray) < 2) {
×
1328
    return code;
×
1329
  }
1330

1331
  for (int32_t i = 0; i < taosArrayGetSize(pRangeArray) - 1;) {
×
1332
    SScanRange* pCurRange = taosArrayGet(pRangeArray, i);
×
1333
    SScanRange* pNextRange = taosArrayGet(pRangeArray, i + 1);
×
1334
    if (scanRangeKeyCmpr(pCurRange, pNextRange) == 0) {
×
1335
      pCurRange->win.skey = TMIN(pCurRange->win.skey, pNextRange->win.skey);
×
1336
      pCurRange->win.ekey = TMAX(pCurRange->win.ekey, pNextRange->win.ekey);
×
1337
      code = mergeSimpleHashMap(pCurRange->pGroupIds, pNextRange->pGroupIds);
×
1338
      QUERY_CHECK_CODE(code, lino, _end);
×
1339
      code = mergeSimpleHashMap(pCurRange->pUIds, pNextRange->pUIds);
×
1340
      QUERY_CHECK_CODE(code, lino, _end);
×
1341
      taosArrayRemove(pRangeArray, i+1);
×
1342
      continue;
×
1343
    }
1344
    i++;
×
1345
  }
1346

1347
  int32_t winRes = TSDB_CODE_SUCCESS;
×
1348
  pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
1349
  while (winRes == TSDB_CODE_SUCCESS) {
×
1350
    void*       pVal = NULL;
×
1351
    int32_t     vlen = 0;
×
1352
    SSessionKey key = {0};
×
1353
    winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, &pVal, &vlen);
×
1354
    if (winRes != TSDB_CODE_SUCCESS) {
×
1355
      break;
×
1356
    }
1357
    int32_t index = 0;
×
1358
    bool merge = false;
×
1359
    SScanRange tmpRange = {.win = key.win, .pUIds = NULL, .pGroupIds = NULL};
×
1360
    int32_t num = vlen / sizeof(uint64_t);
×
1361
    uint64_t* pUids = (uint64_t*) pVal;
×
1362
    for (int32_t i = 0; i < num; i++) {
×
1363
      code = mergeScanRange(pRangeArray, &tmpRange, key.groupId, pUids[i], &index, &merge, pTsDataState->pState->pTaskIdStr);
×
1364
      QUERY_CHECK_CODE(code, lino, _end);
×
1365
    }
1366
    if (merge == true) {
×
1367
      code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1368
      QUERY_CHECK_CODE(code, lino, _end);
×
1369
    }
1370
  }
1371

1372
_end:
×
1373
  streamStateFreeCur(pCur);
×
1374
  if (code != TSDB_CODE_SUCCESS) {
×
1375
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1376
  }
1377
  return code;
×
1378
}
1379

1380
int32_t popScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
×
1381
  int32_t code = TSDB_CODE_SUCCESS;
×
1382
  int32_t lino = 0;
×
1383
  SStreamStateCur* pCur = NULL;
×
1384
  SArray* pRangeArray = pTsDataState->pScanRanges;
×
1385
  if (taosArrayGetSize(pRangeArray) > 0) {
×
1386
    (*pRange) = *(SScanRange*) taosArrayGet(pRangeArray, 0);
×
1387
    taosArrayRemove(pRangeArray, 0);
×
1388
    goto _end;
×
1389
  }
1390

1391
  {
1392
    pCur = streamStateSessionSeekToLast_rocksdb(pTsDataState->pState, INT64_MAX);
×
1393
    SRecDataInfo* pRecVal = NULL;
×
1394
    int32_t       vlen = 0;
×
1395
    SSessionKey   key = {0};
×
1396
    int32_t winRes = streamStateSessionGetKVByCur_rocksdb(pTsDataState->pState, pCur, &key, (void**)&pRecVal, &vlen);
×
1397
    if (winRes != TSDB_CODE_SUCCESS) {
×
1398
      goto _end;
×
1399
    }
1400
    qDebug("===stream===get scan range from disc start ts:%" PRId64 ",end ts:%" PRId64 ",group id:%" PRIu64,
×
1401
           key.win.skey, key.win.ekey, key.groupId);
1402

1403
    pRange->win = key.win;
×
1404
    pRange->calWin = pRecVal->calWin;
×
1405
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1406
    if (pRange->pGroupIds == NULL) {
×
1407
      pRange->pGroupIds = tSimpleHashInit(8, hashFn);
×
1408
    }
1409
    if (pRange->pUIds == NULL) {
×
1410
      pRange->pUIds = tSimpleHashInit(8, hashFn);
×
1411
    }
1412
    code = putRangeIdInfo(pRange, key.groupId, pRecVal->tableUid);
×
1413
    QUERY_CHECK_CODE(code, lino, _end);
×
1414
    code = streamStateSessionDel_rocksdb(pTsDataState->pState, &key);
×
1415
    QUERY_CHECK_CODE(code, lino, _end);
×
1416
  }
1417

1418
_end:
×
1419
  streamStateFreeCur(pCur);
×
1420
  if (code != TSDB_CODE_SUCCESS) {
×
1421
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1422
  }
1423
  return code;
×
1424
}
1425

1426
int32_t getNLastSessionStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
1427
  int32_t code = TSDB_CODE_SUCCESS;
×
1428
  int32_t lino = 0;
×
1429
  if (pCur->pHashData == NULL) {
×
1430
    return TSDB_CODE_FAILED;
×
1431
  }
1432
  SArray*  pWinStates = *((void**)pCur->pHashData);
×
1433
  int32_t size = taosArrayGetSize(pWinStates);
×
1434
  if (size == 0) {
×
1435
    return TSDB_CODE_FAILED;
×
1436
  }
1437

1438
  int32_t i = TMAX(size - num, 0);
×
1439

1440
  for ( ; i < size; i++) {
×
1441
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
1442
    QUERY_CHECK_NULL(pPos, code, lino, _end, terrno);
×
1443

1444
    SResultWindowInfo winInfo = {0};
×
1445
    winInfo.pStatePos = pPos;
×
1446
    winInfo.sessionWin = *(SSessionKey*)pPos->pKey;
×
1447

1448
    void* pTempRes = taosArrayPush(pRes, &winInfo);
×
1449
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1450
  }
1451

1452
_end:
×
1453
  if (code != TSDB_CODE_SUCCESS) {
×
1454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1455
  }
1456
  return code;
×
1457
}
1458

1459
bool hasSessionState(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
1460
  SSHashObj*   pRowStateBuff = getRowStateBuff(pFileState);
×
1461
  void**       ppBuff = (void**)tSimpleHashGet(pRowStateBuff, &pKey->groupId, sizeof(uint64_t));
×
1462
  if (ppBuff == NULL) {
×
1463
    return false;
×
1464
  }
1465
  SArray*      pWinStates = (SArray*)(*ppBuff);
×
1466
  int32_t      size = taosArrayGetSize(pWinStates);
×
1467
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
×
1468
  SRowBuffPos* pPos = NULL;
×
1469

1470
  if (index >= 0) {
×
1471
    pPos = taosArrayGetP(pWinStates, index);
×
1472
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap)) {
×
1473
      *pKey = *((SSessionKey*)pPos->pKey);
×
1474
      *pIsLast = (index == size - 1);
×
1475
      return true;
×
1476
    }
1477
  }
1478

1479
  if (index + 1 < size) {
×
1480
    pPos = taosArrayGetP(pWinStates, index + 1);
×
1481
    if (inSessionWindow(pPos->pKey, pKey->win.skey, gap) ||
×
1482
        (pKey->win.ekey != INT64_MIN && inSessionWindow(pPos->pKey, pKey->win.ekey, gap))) {
×
1483
      *pKey = *((SSessionKey*)pPos->pKey);
×
1484
      *pIsLast = (index + 1 == size - 1);
×
1485
      return true;
×
1486
    }
1487
  }
1488
  return false;
×
1489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc