• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3615

18 Feb 2025 07:41AM UTC coverage: 62.953% (+1.6%) from 61.4%
#3615

push

travis-ci

web-flow
Merge pull request #29812 from taosdata/doc/analysis

doc: update tdgpt doc.

146885 of 299602 branches covered (49.03%)

Branch coverage included in aggregate %.

230802 of 300346 relevant lines covered (76.85%)

17263824.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.92
/source/libs/stream/src/streamSessionState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "tcommon.h"
21
#include "tsimplehash.h"
22

23
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
38,276✔
24
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
38,276✔
25
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
38,269✔
26
  return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
38,269✔
27
}
28

29
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
1,166✔
30
  SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
1,166✔
31
  SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
1,166✔
32
  return sessionRangeKeyCmpr(pWin1, pWin2);
1,166✔
33
}
34

35
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn) {
29,606✔
36
  int firstPos = 0, lastPos = num - 1, midPos = -1;
29,606✔
37
  int numOfRows = 0;
29,606✔
38

39
  if (num <= 0) return -1;
29,606✔
40
  // find the first position which is smaller or equal than the key.
41
  // if all data is bigger than the key return -1
42
  while (1) {
43
    if (cmpFn(key, keyList, lastPos) >= 0) return lastPos;
30,104✔
44
    if (cmpFn(key, keyList, firstPos) == 0) return firstPos;
7,295✔
45
    if (cmpFn(key, keyList, firstPos) < 0) return firstPos - 1;
6,403✔
46

47
    numOfRows = lastPos - firstPos + 1;
2,899✔
48
    midPos = (numOfRows >> 1) + firstPos;
2,899✔
49

50
    if (cmpFn(key, keyList, midPos) < 0) {
2,899✔
51
      lastPos = midPos - 1;
1,589✔
52
    } else if (cmpFn(key, keyList, midPos) > 0) {
1,310✔
53
      firstPos = midPos + 1;
656✔
54
    } else {
55
      break;
660✔
56
    }
57
  }
58

59
  return midPos;
660✔
60
}
61

62
int64_t getSessionWindowEndkey(void* data, int32_t index) {
×
63
  SArray*       pWinInfos = (SArray*)data;
×
64
  SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
×
65
  if (ppos != NULL) {
×
66
    SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
×
67
    return pWin->win.ekey;
×
68
  } else {
69
    return 0;
×
70
  }
71
}
72

73
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
10,105✔
74
  if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
10,105✔
75
    return true;
2,219✔
76
  }
77
  return false;
7,886✔
78
}
79

80
SStreamStateCur* createStateCursor(SStreamFileState* pFileState) {
13,153✔
81
  SStreamStateCur* pCur = createStreamStateCursor();
13,153✔
82
  if (pCur == NULL) {
13,157!
83
    return NULL;
×
84
  }
85
  pCur->pStreamFileState = pFileState;
13,157✔
86
  return pCur;
13,157✔
87
}
88

89
static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
5,785✔
90
                                   SRowBuffPos** ppPos) {
91
  int32_t      code = TSDB_CODE_SUCCESS;
5,785✔
92
  int32_t      lino = 0;
5,785✔
93
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
5,785✔
94
  if (!pNewPos || !pNewPos->pRowBuff) {
5,788!
95
    code = TSDB_CODE_OUT_OF_MEMORY;
×
96
    QUERY_CHECK_CODE(code, lino, _end);
×
97
  }
98

99
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
5,788✔
100
  void* tmp = taosArrayPush(pWinInfos, &pNewPos);
5,788✔
101
  if (!tmp) {
5,788!
102
    code = terrno;
×
103
    QUERY_CHECK_CODE(code, lino, _end);
×
104
  }
105
  (*ppPos) = pNewPos;
5,788✔
106

107
_end:
5,788✔
108
  if (code != TSDB_CODE_SUCCESS) {
5,788!
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
  }
111
  return code;
5,788✔
112
}
113

114
static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
136✔
115
                                      int32_t index, SRowBuffPos** ppPos) {
116
  int32_t      code = TSDB_CODE_SUCCESS;
136✔
117
  int32_t      lino = 0;
136✔
118
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
136✔
119
  if (!pNewPos || !pNewPos->pRowBuff) {
136!
120
    code = TSDB_CODE_OUT_OF_MEMORY;
×
121
    QUERY_CHECK_CODE(code, lino, _end);
×
122
  }
123

124
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
136✔
125
  void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
136✔
126
  if (!tmp) {
136!
127
    code = terrno;
×
128
    QUERY_CHECK_CODE(code, lino, _end);
×
129
  }
130

131
  *ppPos = pNewPos;
136✔
132

133
_end:
136✔
134
  if (code != TSDB_CODE_SUCCESS) {
136!
135
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
136
  }
137
  return code;
136✔
138
}
139

140
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
472✔
141
  int32_t      code = TSDB_CODE_SUCCESS;
472✔
142
  int32_t      lino = 0;
472✔
143
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
472✔
144
  if (!pNewPos || !pNewPos->pRowBuff) {
472!
145
    code = TSDB_CODE_OUT_OF_MEMORY;
×
146
    QUERY_CHECK_CODE(code, lino, _end);
×
147
  }
148

149
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
472✔
150
  pNewPos->needFree = true;
472✔
151
  pNewPos->beFlushed = true;
472✔
152
  if (p) {
472✔
153
    memcpy(pNewPos->pRowBuff, p, *pVLen);
463✔
154
  } else {
155
    int32_t len = getRowStateRowSize(pFileState);
9✔
156
    memset(pNewPos->pRowBuff, 0, len);
9✔
157
  }
158

159
_end:
472✔
160
  taosMemoryFree(p);
472!
161
  if (code != TSDB_CODE_SUCCESS) {
472!
162
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
163
    return NULL;
×
164
  }
165
  return pNewPos;
472✔
166
}
167

168
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen,
3,533✔
169
                                int32_t* pWinCode) {
170
  int32_t code = TSDB_CODE_SUCCESS;
3,533✔
171
  int32_t lino = 0;
3,533✔
172
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,533✔
173
  SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
3,533✔
174
  SArray*    pWinStates = NULL;
3,532✔
175
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
3,532✔
176
  if (ppBuff) {
3,535✔
177
    pWinStates = (SArray*)(*ppBuff);
2,373✔
178
  } else {
179
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,162✔
180
    if (!pWinStates) {
1,162!
181
      code = terrno;
×
182
      QUERY_CHECK_CODE(code, lino, _end);
×
183
    }
184
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,162✔
185
    QUERY_CHECK_CODE(code, lino, _end);
1,162!
186
  }
187

188
  TSKEY startTs = pKey->win.skey;
3,535✔
189
  TSKEY endTs = pKey->win.ekey;
3,535✔
190

191
  int32_t size = taosArrayGetSize(pWinStates);
3,535✔
192
  if (size == 0) {
3,535✔
193
    void*   pFileStore = getStateFileStore(pFileState);
1,270✔
194
    void*   p = NULL;
1,270✔
195
    int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
1,270✔
196
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,270✔
197
      (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
59✔
198
      if (!(*pVal)) {
59!
199
        code = TSDB_CODE_OUT_OF_MEMORY;
×
200
        QUERY_CHECK_CODE(code, lino, _end);
×
201
      }
202

203
      (*pWinCode) = code_file;
59✔
204
      qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
59✔
205
    } else {
206
      code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
1,211✔
207
      (*pWinCode) = TSDB_CODE_FAILED;
1,211✔
208
      taosMemoryFree(p);
1,211!
209
      QUERY_CHECK_CODE(code, lino, _end);
1,211!
210
    }
211
    goto _end;
1,270✔
212
  }
213

214
  // find the first position which is smaller than the pKey
215
  int32_t      index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
2,265✔
216
  SRowBuffPos* pPos = NULL;
2,265✔
217

218
  if (index >= 0) {
2,265✔
219
    pPos = taosArrayGetP(pWinStates, index);
1,937✔
220
    if (inSessionWindow(pPos->pKey, startTs, gap)) {
1,937✔
221
      (*pVal) = pPos;
923✔
222
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
923✔
223
      pPos->beUsed = true;
923✔
224
      pPos->beFlushed = false;
923✔
225
      *pKey = *pDestWinKey;
923✔
226
      goto _end;
923✔
227
    }
228
  }
229

230
  if (index + 1 < size) {
1,342✔
231
    pPos = taosArrayGetP(pWinStates, index + 1);
456✔
232
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
456!
233
      (*pVal) = pPos;
128✔
234
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
128✔
235
      pPos->beUsed = true;
128✔
236
      pPos->beFlushed = false;
128✔
237
      *pKey = *pDestWinKey;
128✔
238
      goto _end;
128✔
239
    }
240
  }
241

242
  if (index + 1 == 0) {
1,214✔
243
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
278!
244
      void*   p = NULL;
250✔
245
      void*   pFileStore = getStateFileStore(pFileState);
250✔
246
      int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
249✔
247
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
249!
248
        (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
249✔
249
        if (!(*pVal)) {
249!
250
          code = TSDB_CODE_OUT_OF_MEMORY;
×
251
          QUERY_CHECK_CODE(code, lino, _end);
249!
252
        }
253

254
        (*pWinCode) = code_file;
249✔
255
        qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
249✔
256
        goto _end;
249✔
257
      } else {
258
        taosMemoryFree(p);
×
259
      }
260
    }
261
  }
262

263
  if (index == size - 1) {
964✔
264
    code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
885✔
265
    QUERY_CHECK_CODE(code, lino, _end);
887!
266

267
    (*pWinCode) = TSDB_CODE_FAILED;
887✔
268
    goto _end;
887✔
269
  }
270

271
  code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
79✔
272
  QUERY_CHECK_CODE(code, lino, _end);
79!
273

274
  (*pWinCode) = TSDB_CODE_FAILED;
79✔
275

276
_end:
3,536✔
277
  if (code != TSDB_CODE_SUCCESS) {
3,536!
278
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
279
  }
280
  return code;
3,536✔
281
}
282

283
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
220✔
284
                          int32_t* pWinCode) {
285
  SWinKey*    pTmpkey = pKey;
220✔
286
  SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
220✔
287
  return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
220✔
288
}
289

290
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
123✔
291
  int32_t      code = TSDB_CODE_SUCCESS;
123✔
292
  int32_t      lino = 0;
123✔
293
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
123✔
294
  SSessionKey* pKey = pPos->pKey;
123✔
295
  SArray*      pWinStates = NULL;
123✔
296
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
123✔
297
  if (ppBuff) {
123✔
298
    pWinStates = (SArray*)(*ppBuff);
93✔
299
  } else {
300
    pWinStates = taosArrayInit(16, POINTER_BYTES);
30✔
301
    if (!pWinStates) {
30!
302
      code = terrno;
×
303
      QUERY_CHECK_CODE(code, lino, _end);
×
304
    }
305

306
    code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
30✔
307
    QUERY_CHECK_CODE(code, lino, _end);
30!
308
  }
309

310
  int32_t size = taosArrayGetSize(pWinStates);
123✔
311
  if (size == 0) {
123✔
312
    void* tmp = taosArrayPush(pWinStates, &pPos);
110✔
313
    if (!tmp) {
110!
314
      code = terrno;
×
315
      QUERY_CHECK_CODE(code, lino, _end);
×
316
    }
317
    goto _end;
110✔
318
  }
319

320
  // find the first position which is smaller than the pKey
321
  int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
13✔
322
  if (index >= 0) {
13!
323
    void* tmp = taosArrayInsert(pWinStates, index, &pPos);
×
324
    if (!tmp) {
×
325
      code = terrno;
×
326
      QUERY_CHECK_CODE(code, lino, _end);
×
327
    }
328
  } else {
329
    void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
13✔
330
    if (!tmp) {
13!
331
      code = terrno;
×
332
      QUERY_CHECK_CODE(code, lino, _end);
×
333
    }
334
  }
335

336
_end:
13✔
337
  pPos->needFree = false;
123✔
338
  if (code != TSDB_CODE_SUCCESS) {
123!
339
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
340
  }
341
  return code;
123✔
342
}
343

344
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen,
346✔
345
                              int32_t* pWinCode) {
346
  int32_t      code = TSDB_CODE_SUCCESS;
346✔
347
  int32_t      lino = 0;
346✔
348
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
346✔
349
  if (!pNewPos || !pNewPos->pRowBuff) {
346!
350
    code = TSDB_CODE_OUT_OF_MEMORY;
×
351
    QUERY_CHECK_CODE(code, lino, _end);
×
352
  }
353
  pNewPos->needFree = true;
346✔
354
  pNewPos->beFlushed = true;
346✔
355
  void* pBuff = NULL;
346✔
356
  (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
346✔
357
  if ((*pWinCode) != TSDB_CODE_SUCCESS) {
346!
358
    goto _end;
×
359
  }
360
  memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
346✔
361
  memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
346✔
362
  taosMemoryFreeClear(pBuff);
346!
363
  (*pVal) = pNewPos;
346✔
364

365
_end:
346✔
366
  if (code != TSDB_CODE_SUCCESS) {
346!
367
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
368
  }
369
  return code;
346✔
370
}
371

372
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
2,065✔
373
  SSHashObj*   pSessionBuff = (SSHashObj*)pBuff;
2,065✔
374
  SSessionKey* pWinKey = (SSessionKey*)key;
2,065✔
375
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
2,065✔
376
  if (!ppBuff) {
2,065✔
377
    return TSDB_CODE_SUCCESS;
525✔
378
  }
379
  SArray* pWinStates = (SArray*)(*ppBuff);
1,540✔
380
  int32_t size = taosArrayGetSize(pWinStates);
1,540✔
381
  TSKEY   gap = 0;
1,540✔
382
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
1,540✔
383
  if (index >= 0) {
1,540✔
384
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
1,091✔
385
    if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
1,091!
386
      pPos->beFlushed = true;
1,091✔
387
      taosArrayRemove(pWinStates, index);
1,091✔
388
    }
389
  }
390
  return TSDB_CODE_SUCCESS;
1,540✔
391
}
392

393
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
387✔
394
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
387✔
395
  SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
387✔
396
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
387✔
397
  if (!ppBuff) {
387!
398
    return;
×
399
  }
400
  SArray* pWinStates = (SArray*)(*ppBuff);
387✔
401
  int32_t size = taosArrayGetSize(pWinStates);
387✔
402
  TSKEY   gap = 0;
387✔
403
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
387✔
404
  if (index >= 0) {
387✔
405
    SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
87✔
406
    if (pItemPos == pPos) {
87✔
407
      taosArrayRemove(pWinStates, index);
81✔
408
    }
409
  }
410
}
411

412
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
371✔
413
                                           const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
414
  int32_t      code = TSDB_CODE_SUCCESS;
371✔
415
  int32_t      lino = 0;
371✔
416
  SRowBuffPos* pNewPos = NULL;
371✔
417
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
371✔
418
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
373✔
419
  SArray*      pWinStates = NULL;
373✔
420
  if (!ppBuff) {
373✔
421
    pWinStates = taosArrayInit(16, POINTER_BYTES);
85✔
422
    if (!pWinStates) {
85!
423
      code = terrno;
×
424
      QUERY_CHECK_CODE(code, lino, _end);
×
425
    }
426

427
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
85✔
428
    QUERY_CHECK_CODE(code, lino, _end);
85!
429
  } else {
430
    pWinStates = (SArray*)(*ppBuff);
288✔
431
  }
432
  if (!pCur) {
373✔
433
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
88✔
434
    QUERY_CHECK_CODE(code, lino, _end);
88!
435

436
    goto _end;
88✔
437
  }
438

439
  int32_t size = taosArrayGetSize(pWinStates);
285✔
440
  if (pCur->buffIndex >= 0) {
284✔
441
    if (pCur->buffIndex >= size) {
281✔
442
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
254✔
443
      QUERY_CHECK_CODE(code, lino, _end);
254!
444

445
      goto _end;
254✔
446
    }
447
    code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
27✔
448
    QUERY_CHECK_CODE(code, lino, _end);
27!
449

450
    goto _end;
27✔
451
  } else {
452
    if (size > 0) {
3✔
453
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
2✔
454
      if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
2!
455
        // pCur is invalid
456
        SSessionKey pTmpKey = *pWinKey;
×
457
        int32_t     winCode = TSDB_CODE_SUCCESS;
×
458
        code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
×
459
        QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
460
        QUERY_CHECK_CODE(code, lino, _end);
×
461
        goto _end;
×
462
      }
463
    }
464
    pNewPos = getNewRowPosForWrite(pFileState);
3✔
465
    if (!pNewPos || !pNewPos->pRowBuff) {
4!
466
      code = TSDB_CODE_OUT_OF_MEMORY;
×
467
      QUERY_CHECK_CODE(code, lino, _end);
×
468
    }
469

470
    memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
4✔
471
    pNewPos->needFree = true;
4✔
472
    pNewPos->beFlushed = true;
4✔
473
  }
474

475
_end:
373✔
476
  (*ppVal) = pNewPos;
373✔
477
  if (code != TSDB_CODE_SUCCESS) {
373!
478
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
479
  }
480
  return code;
373✔
481
}
482

483
void sessionWinStateClear(SStreamFileState* pFileState) {
618✔
484
  int32_t buffSize = getRowStateRowSize(pFileState);
618✔
485
  void*   pIte = NULL;
618✔
486
  size_t  keyLen = 0;
618✔
487
  int32_t iter = 0;
618✔
488
  void*   pBuff = getRowStateBuff(pFileState);
618✔
489
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
2,626✔
490
    SArray* pWinStates = *((void**)pIte);
2,008✔
491
    int32_t size = taosArrayGetSize(pWinStates);
2,008✔
492
    for (int32_t i = 0; i < size; i++) {
4,116✔
493
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
2,108✔
494
      memset(pPos->pRowBuff, 0, buffSize);
2,108✔
495
    }
496
  }
497
}
618✔
498

499
void sessionWinStateCleanup(void* pBuff) {
6,759✔
500
  void*   pIte = NULL;
6,759✔
501
  size_t  keyLen = 0;
6,759✔
502
  int32_t iter = 0;
6,759✔
503
  while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
10,310✔
504
    SArray* pWinStates = (SArray*)(*(void**)pIte);
3,551✔
505
    taosArrayDestroy(pWinStates);
3,551✔
506
  }
507
  tSimpleHashCleanup(pBuff);
6,759✔
508
}
6,760✔
509

510
static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, const SSessionKey* pWinKey,
13,921✔
511
                                                SArray** pWins, int32_t* pIndex) {
512
  SStreamStateCur* pCur = NULL;
13,921✔
513
  SSHashObj*       pSessionBuff = getRowStateBuff(pFileState);
13,921✔
514
  void**           ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
13,923✔
515
  if (!ppBuff) {
13,928✔
516
    return NULL;
1,139✔
517
  }
518

519
  SArray* pWinStates = (SArray*)(*ppBuff);
12,789✔
520
  int32_t size = taosArrayGetSize(pWinStates);
12,789✔
521
  TSKEY   gap = 0;
12,788✔
522
  int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
12,788✔
523

524
  if (pWins) {
12,785✔
525
    (*pWins) = pWinStates;
8,038✔
526
  }
527

528
  if (size > 0 && index == -1) {
12,785✔
529
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
1,733✔
530
    SSessionKey* pWin = (SSessionKey*)pPos->pKey;
1,733✔
531
    if (pWinKey->win.skey == pWin->win.skey) {
1,733✔
532
      index = 0;
301✔
533
    }
534
  }
535

536
  if (index >= 0) {
12,785✔
537
    pCur = createStateCursor(pFileState);
10,006✔
538
    if (pCur == NULL) {
10,009!
539
      return NULL;
×
540
    }
541
    pCur->buffIndex = index;
10,009✔
542
    if (pIndex) {
10,009✔
543
      *pIndex = index;
6,737✔
544
    }
545
  }
546

547
  return pCur;
12,788✔
548
}
549

550
SStreamStateCur* sessionWinStateSeekKeyCurrentPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
5,042✔
551
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, NULL, NULL);
5,042✔
552
  if (pCur) {
5,043✔
553
    return pCur;
3,271✔
554
  }
555

556
  void* pFileStore = getStateFileStore(pFileState);
1,772✔
557
  pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pFileStore, pWinKey);
1,772✔
558
  if (!pCur) {
1,772✔
559
    return NULL;
686✔
560
  }
561
  pCur->buffIndex = -1;
1,086✔
562
  pCur->pStreamFileState = pFileState;
1,086✔
563
  return pCur;
1,086✔
564
}
565

566
SStreamStateCur* sessionWinStateSeekKeyPrev(SStreamFileState *pFileState, const SSessionKey *pWinKey) {
×
567
  SArray*          pWinStates = NULL;
×
568
  int32_t          index = -1;
×
569
  SStreamStateCur *pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
×
570
  if (pCur) {
×
571
    int32_t cmpRes= sessionStateRangeKeyCompare(pWinKey, pWinStates, index);
×
572
    if (cmpRes > 0) {
×
573
      return pCur;
×
574
    } else if (cmpRes == 0 && index > 0) {
×
575
      sessionWinStateMoveToPrev(pCur);
×
576
      return pCur;
×
577
    }
578
    streamStateFreeCur(pCur);
×
579
    pCur = NULL;
×
580
  }
581

582
  void* pFileStore = getStateFileStore(pFileState);
×
583
  pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
×
584
  if (!pCur) {
×
585
    return NULL;
×
586
  }
587
  pCur->buffIndex = -1;
×
588
  pCur->pStreamFileState = pFileState;
×
589
  return pCur;
×
590
}
591

592
static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) {
471✔
593
  if (!pCur) {
471!
594
    return;
×
595
  }
596
  streamStateResetCur(pCur);
471✔
597
  pCur->buffIndex = 0;
471✔
598
  pCur->pStreamFileState = pFileState;
471✔
599
}
600

601
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
1,915✔
602
                                    SStreamStateCur** ppCur) {
603
  SSessionKey key = {.groupId = groupId};
1,915✔
604
  int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
1,915✔
605
  if (taosArrayGetSize(pWinStates) > 0 &&
1,915✔
606
      (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
141!
607
    if (!(*ppCur)) {
452✔
608
      (*ppCur) = createStateCursor(pFileState);
451✔
609
    }
610
    transformCursor(pFileState, *ppCur);
452✔
611
  } else if (*ppCur) {
1,463✔
612
    (*ppCur)->buffIndex = -1;
614✔
613
    (*ppCur)->pStreamFileState = pFileState;
614✔
614
  }
615
}
1,915✔
616

617
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
343✔
618
  SArray*          pWinStates = NULL;
343✔
619
  int32_t          index = -1;
343✔
620
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
343✔
621
  if (pCur) {
344✔
622
    if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) > 0) {
159✔
623
      sessionWinStateMoveToNext(pCur);
23✔
624
    }
625
    return pCur;
159✔
626
  }
627

628
  void* pFileStore = getStateFileStore(pFileState);
185✔
629
  pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pFileStore, (SSessionKey*)pWinKey);
185✔
630
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
185✔
631
  return pCur;
185✔
632
}
633

634
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
8,157✔
635
  SArray*          pWinStates = NULL;
8,157✔
636
  int32_t          index = -1;
8,157✔
637
  SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
8,157✔
638
  if (pCur) {
8,160✔
639
    sessionWinStateMoveToNext(pCur);
6,430✔
640
    return pCur;
6,428✔
641
  }
642

643
  void* pFileStore = getStateFileStore(pFileState);
1,730✔
644
  pCur = streamStateSessionSeekKeyNext_rocksdb(pFileStore, pWinKey);
1,730✔
645
  checkAndTransformCursor(pFileState, pWinKey->groupId, pWinStates, &pCur);
1,730✔
646
  return pCur;
1,729✔
647
}
648

649
SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SSessionKey* pWinKey, COUNT_TYPE count) {
380✔
650
  SArray*          pWinStates = NULL;
380✔
651
  int32_t          index = -1;
380✔
652
  SStreamStateCur* pBuffCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
380✔
653
  int32_t          resSize = getRowStateRowSize(pFileState);
380✔
654
  COUNT_TYPE       winCount = 0;
380✔
655
  if (pBuffCur) {
380✔
656
    while (index >= 0) {
316✔
657
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
200✔
658
      winCount = *((COUNT_TYPE*)((char*)pPos->pRowBuff + (resSize - sizeof(COUNT_TYPE))));
200✔
659
      if (sessionStateRangeKeyCompare(pWinKey, pWinStates, index) == 0 || winCount < count) {
200✔
660
        index--;
168✔
661
      } else if (index >= 0) {
32!
662
        pBuffCur->buffIndex = index + 1;
32✔
663
        return pBuffCur;
32✔
664
      }
665
    }
666
    pBuffCur->buffIndex = 0;
116✔
667
  } else if (taosArrayGetSize(pWinStates) > 0) {
232✔
668
    pBuffCur = createStateCursor(pFileState);
6✔
669
    if (pBuffCur == NULL) {
6!
670
      return NULL;
×
671
    }
672
    pBuffCur->buffIndex = 0;
6✔
673
  }
674

675
  void*            pFileStore = getStateFileStore(pFileState);
348✔
676
  SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey);
348✔
677
  if (pCur) {
348!
678
    pCur->pStreamFileState = pFileState;
×
679
    SSessionKey key = {0};
×
680
    void*       pVal = NULL;
×
681
    int         len = 0;
×
682
    int32_t     code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
×
683
    if (code == TSDB_CODE_FAILED) {
×
684
      streamStateFreeCur(pCur);
×
685
      return pBuffCur;
×
686
    }
687
    winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
688
    taosMemoryFreeClear(pVal);
×
689
    streamStateFreeCur(pBuffCur);
×
690
    if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
×
691
      streamStateCurNext(pFileStore, pCur);
×
692
      return pCur;
×
693
    }
694
    streamStateCurPrev(pFileStore, pCur);
×
695
    while (1) {
696
      code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
×
697
      if (code == TSDB_CODE_FAILED) {
×
698
        streamStateCurNext(pFileStore, pCur);
×
699
        return pCur;
×
700
      }
701
      winCount = *((COUNT_TYPE*)((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
×
702
      taosMemoryFreeClear(pVal);
×
703
      if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
×
704
        streamStateCurPrev(pFileStore, pCur);
×
705
      } else {
706
        streamStateCurNext(pFileStore, pCur);
×
707
        return pCur;
×
708
      }
709
    }
710
  }
711
  return pBuffCur;
348✔
712
}
713

714
int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
15,477✔
715
  if (!pCur) {
15,477✔
716
    return TSDB_CODE_FAILED;
1,848✔
717
  }
718
  int32_t code = TSDB_CODE_SUCCESS;
13,629✔
719

720
  SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
13,629✔
721
  void**     ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
13,630✔
722
  SArray*    pWinStates = NULL;
13,638✔
723
  if (ppBuff) {
13,638✔
724
    pWinStates = (SArray*)(*ppBuff);
13,210✔
725
  }
726

727
  if (pCur->buffIndex >= 0) {
13,638✔
728
    int32_t size = taosArrayGetSize(pWinStates);
11,839✔
729
    if (pCur->buffIndex >= size) {
11,838✔
730
      return TSDB_CODE_FAILED;
6,903✔
731
    }
732
    SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
4,935✔
733
    if (pVal) {
4,934✔
734
      *pVal = pPos;
1,054✔
735
    }
736
    *pKey = *(SSessionKey*)(pPos->pKey);
4,934✔
737
  } else {
738
    void* pData = NULL;
1,799✔
739
    code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
1,799✔
740
    if (taosArrayGetSize(pWinStates) > 0 &&
1,799✔
741
        (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
807!
742
      transformCursor(pCur->pStreamFileState, pCur);
19✔
743
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
19✔
744
      if (pVal) {
19!
745
        *pVal = pPos;
×
746
      }
747
      *pKey = *(SSessionKey*)(pPos->pKey);
19✔
748
      code = TSDB_CODE_SUCCESS;
19✔
749
    } else if (code == TSDB_CODE_SUCCESS && pVal) {
1,780✔
750
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
175✔
751
      if (!pNewPos || !pNewPos->pRowBuff) {
175!
752
        code = TSDB_CODE_OUT_OF_MEMORY;
×
753
        taosMemoryFreeClear(pData);
×
754
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
755
        return code;
×
756
      }
757
      memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
175✔
758
      pNewPos->needFree = true;
175✔
759
      pNewPos->beFlushed = true;
175✔
760
      memcpy(pNewPos->pRowBuff, pData, *pVLen);
175✔
761
      (*pVal) = pNewPos;
175✔
762
    }
763
    taosMemoryFreeClear(pData);
1,799!
764
  }
765
  return code;
6,733✔
766
}
767

768
void sessionWinStateMoveToNext(SStreamStateCur* pCur) {
11,522✔
769
  qTrace("move cursor to next");
11,522!
770
  if (pCur && pCur->buffIndex >= 0) {
11,522✔
771
    pCur->buffIndex++;
7,833✔
772
  } else {
773
    streamStateCurNext_rocksdb(pCur);
3,689✔
774
  }
775
}
11,524✔
776

777
void sessionWinStateMoveToPrev(SStreamStateCur* pCur) {
×
778
  qTrace("move cursor to prev");
×
779
  if (pCur && pCur->buffIndex >= 1) {
×
780
    pCur->buffIndex--;
×
781
  } else {
782
    streamStateCurPrev_rocksdb(pCur);
×
783
  }
784
}
×
785

786
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey,
4,587✔
787
                                     range_cmpr_fn cmpFn) {
788
  SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
4,587✔
789
  SSessionKey      tmpKey = *key;
4,586✔
790
  int32_t          code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
4,586✔
791
  bool             hasCurrentPrev = true;
4,586✔
792
  if (code == TSDB_CODE_FAILED) {
4,586✔
793
    streamStateFreeCur(pCur);
850✔
794
    pCur = sessionWinStateSeekKeyNext(pFileState, key);
850✔
795
    code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
851✔
796
    hasCurrentPrev = false;
851✔
797
  }
798

799
  if (code == TSDB_CODE_FAILED) {
4,587✔
800
    code = TSDB_CODE_FAILED;
627✔
801
    goto _end;
627✔
802
  }
803

804
  if (cmpFn(key, &tmpKey) == 0) {
3,960✔
805
    *curKey = tmpKey;
2,932✔
806
    goto _end;
2,932✔
807
  } else if (!hasCurrentPrev) {
1,028✔
808
    code = TSDB_CODE_FAILED;
200✔
809
    goto _end;
200✔
810
  }
811

812
  sessionWinStateMoveToNext(pCur);
828✔
813
  code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
828✔
814
  if (code == TSDB_CODE_SUCCESS && cmpFn(key, &tmpKey) == 0) {
827✔
815
    *curKey = tmpKey;
380✔
816
  } else {
817
    code = TSDB_CODE_FAILED;
447✔
818
  }
819

820
_end:
4,586✔
821
  streamStateFreeCur(pCur);
4,586✔
822
  return code;
4,586✔
823
}
824

825
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
3,210✔
826
                              state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
827
  int32_t code = TSDB_CODE_SUCCESS;
3,210✔
828
  int32_t lino = 0;
3,210✔
829
  (*pWinCode) = TSDB_CODE_SUCCESS;
3,210✔
830

831
  SSessionKey* pWinKey = key;
3,210✔
832
  TSKEY        gap = 0;
3,210✔
833
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
3,210✔
834
  SArray*      pWinStates = NULL;
3,210✔
835

836
  void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
3,210✔
837
  if (ppBuff) {
3,210✔
838
    pWinStates = (SArray*)(*ppBuff);
2,778✔
839
  } else {
840
    pWinStates = taosArrayInit(16, POINTER_BYTES);
432✔
841
    if (!pWinStates) {
432!
842
      code = terrno;
×
843
      QUERY_CHECK_CODE(code, lino, _end);
×
844
    }
845

846
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
432✔
847
    QUERY_CHECK_CODE(code, lino, _end);
432!
848
  }
849

850
  TSKEY startTs = pWinKey->win.skey;
3,210✔
851
  TSKEY endTs = pWinKey->win.ekey;
3,210✔
852

853
  int32_t size = taosArrayGetSize(pWinStates);
3,210✔
854
  if (size == 0) {
3,210✔
855
    void*   pFileStore = getStateFileStore(pFileState);
741✔
856
    void*   p = NULL;
741✔
857
    int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
741✔
858
    if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
741✔
859
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
28✔
860
      if (!(*pVal)) {
28!
861
        code = TSDB_CODE_OUT_OF_MEMORY;
×
862
        QUERY_CHECK_CODE(code, lino, _end);
×
863
      }
864

865
      (*pWinCode) = code_file;
28✔
866
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
28✔
867
             pWinKey->win.ekey, code_file);
868
    } else {
869
      code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
713✔
870
      (*pWinCode) = TSDB_CODE_FAILED;
713✔
871
      taosMemoryFree(p);
713!
872
      QUERY_CHECK_CODE(code, lino, _end);
713!
873
    }
874
    goto _end;
741✔
875
  }
876

877
  // find the first position which is smaller than the pWinKey
878
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
2,469✔
879
  SRowBuffPos* pPos = NULL;
2,469✔
880
  int32_t      valSize = *pVLen;
2,469✔
881

882
  if (index >= 0) {
2,469✔
883
    pPos = taosArrayGetP(pWinStates, index);
2,363✔
884
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
2,363✔
885
    if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
2,363✔
886
      (*pVal) = pPos;
1,946✔
887
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
1,946✔
888
      pPos->beUsed = true;
1,946✔
889
      pPos->beFlushed = false;
1,946✔
890
      *key = *pDestWinKey;
1,946✔
891
      goto _end;
1,946✔
892
    }
893
  }
894

895
  if (index + 1 < size) {
523✔
896
    pPos = taosArrayGetP(pWinStates, index + 1);
129✔
897
    void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
129✔
898
    if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
256!
899
        fn(pKeyData, stateKey) == true) {
127✔
900
      (*pVal) = pPos;
4✔
901
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
4✔
902
      pPos->beUsed = true;
4✔
903
      pPos->beFlushed = false;
4✔
904
      *key = *pDestWinKey;
4✔
905
      goto _end;
4✔
906
    }
907
  }
908

909
  if (index + 1 == 0) {
519✔
910
    if (!isDeteled(pFileState, endTs)) {
104!
911
      void*   p = NULL;
104✔
912
      void*   pFileStore = getStateFileStore(pFileState);
104✔
913
      int32_t code_file =
914
          streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
104✔
915
      if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
104✔
916
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
95✔
917
        if (!(*pVal)) {
95!
918
          code = TSDB_CODE_OUT_OF_MEMORY;
×
919
          QUERY_CHECK_CODE(code, lino, _end);
95!
920
        }
921

922
        (*pWinCode) = code_file;
95✔
923
        qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
95✔
924
               pWinKey->win.ekey, code_file);
925
        goto _end;
95✔
926
      } else {
927
        taosMemoryFree(p);
9!
928
      }
929
    }
930
  }
931

932
  if (index == size - 1) {
424✔
933
    code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
394✔
934
    QUERY_CHECK_CODE(code, lino, _end);
394!
935

936
    (*pWinCode) = TSDB_CODE_FAILED;
394✔
937
    goto _end;
394✔
938
  }
939
  code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
30✔
940
  QUERY_CHECK_CODE(code, lino, _end);
30!
941

942
  (*pWinCode) = TSDB_CODE_FAILED;
30✔
943

944
_end:
3,210✔
945
  if (code != TSDB_CODE_SUCCESS) {
3,210!
946
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
947
  }
948
  return code;
3,210✔
949
}
950

951
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1,567✔
952
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
1,567✔
953
  int32_t          code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,567✔
954
  streamStateFreeCur(pCur);
1,567✔
955
  if (code == TSDB_CODE_SUCCESS) {
1,566!
956
    return code;
×
957
  } else {
958
    pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
1,566✔
959
  }
960

961
  code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen);
1,567✔
962
  streamStateFreeCur(pCur);
1,567✔
963
  return code;
1,567✔
964
}
965

966
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
4,986✔
967
                              int32_t* pVLen, int32_t* pWinCount) {
968
  int32_t code = TSDB_CODE_SUCCESS;
4,986✔
969
  int32_t lino = 0;
4,986✔
970
  (*pWinCount) = TSDB_CODE_SUCCESS;
4,986✔
971

972
  SSessionKey* pWinKey = pKey;
4,986✔
973
  const TSKEY  gap = 0;
4,986✔
974
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
4,986✔
975
  SArray*      pWinStates = NULL;
4,987✔
976
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
4,987✔
977
  if (ppBuff) {
4,987✔
978
    pWinStates = (SArray*)(*ppBuff);
3,660✔
979
  } else {
980
    pWinStates = taosArrayInit(16, POINTER_BYTES);
1,327✔
981
    if (!pWinStates) {
1,326!
982
      code = terrno;
×
983
      QUERY_CHECK_CODE(code, lino, _end);
×
984
    }
985

986
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1,326✔
987
    QUERY_CHECK_CODE(code, lino, _end);
1,327!
988
  }
989

990
  TSKEY startTs = pWinKey->win.skey;
4,987✔
991
  TSKEY endTs = pWinKey->win.ekey;
4,987✔
992

993
  int32_t size = taosArrayGetSize(pWinStates);
4,987✔
994
  if (size == 0) {
4,987✔
995
    void* pFileStore = getStateFileStore(pFileState);
1,341✔
996
    void* pRockVal = NULL;
1,341✔
997
    (*pWinCount) = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
1,341✔
998
    if ((*pWinCount) == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
1,341!
999
      qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
41✔
1000
             pWinKey->win.ekey, (*pWinCount));
1001
      if ((*pWinCount) == TSDB_CODE_SUCCESS) {
41!
1002
        int32_t     valSize = *pVLen;
41✔
1003
        COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
41✔
1004
        if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
41!
1005
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
32✔
1006
          if (!(*pVal)) {
32!
1007
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1008
            QUERY_CHECK_CODE(code, lino, _end);
×
1009
          }
1010

1011
          goto _end;
32✔
1012
        }
1013
      }
1014
      pWinKey->win.skey = startTs;
9✔
1015
      pWinKey->win.ekey = endTs;
9✔
1016
      (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
9✔
1017
      taosMemoryFree(pRockVal);
9!
1018
      if (!(*pVal)) {
9!
1019
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1020
        QUERY_CHECK_CODE(code, lino, _end);
×
1021
      }
1022
    } else {
1023
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1,300✔
1024
      QUERY_CHECK_CODE(code, lino, _end);
1,300!
1025

1026
      (*pWinCount) = TSDB_CODE_FAILED;
1,300✔
1027
    }
1028
    goto _end;
1,309✔
1029
  }
1030

1031
  // find the first position which is smaller than the pWinKey
1032
  int32_t      index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
3,646✔
1033
  SRowBuffPos* pPos = NULL;
3,643✔
1034
  int32_t      valSize = *pVLen;
3,643✔
1035

1036
  if (index >= 0) {
3,643✔
1037
    pPos = taosArrayGetP(pWinStates, index);
3,629✔
1038
    COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pPos->pRowBuff) + (valSize - sizeof(COUNT_TYPE)));
3,629✔
1039
    if (inSessionWindow(pPos->pKey, startTs, gap) || (index == size - 1 && (*pWinStateCout) < winCount)) {
3,629✔
1040
      (*pVal) = pPos;
2,934✔
1041
      SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
2,934✔
1042
      pPos->beUsed = true;
2,934✔
1043
      pPos->beFlushed = false;
2,934✔
1044
      *pWinKey = *pDestWinKey;
2,934✔
1045
      goto _end;
2,934✔
1046
    }
1047
  }
1048

1049
  if (index == -1) {
709✔
1050
    if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
14!
1051
      SSessionKey tmpKey = *pWinKey;
×
1052
      void*       pRockVal = NULL;
×
1053
      void*       pFileStore = getStateFileStore(pFileState);
×
1054
      int32_t     code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
×
1055
      if (code_file == TSDB_CODE_SUCCESS) {
×
1056
        SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
×
1057
        SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
×
1058
        if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
×
1059
          *pWinKey = tmpKey;
×
1060
          (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1061
          if (!(*pVal)) {
×
1062
            code = TSDB_CODE_OUT_OF_MEMORY;
×
1063
            QUERY_CHECK_CODE(code, lino, _end);
×
1064
          }
1065

1066
          (*pWinCount) = code_file;
×
1067
          qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1068
                 pWinKey->win.ekey, code_file);
1069
          goto _end;
×
1070
        }
1071
      }
1072
      taosMemoryFree(pRockVal);
×
1073
    }
1074
  }
1075

1076
  if (index + 1 < size) {
709✔
1077
    pPos = taosArrayGetP(pWinStates, index + 1);
43✔
1078
    (*pVal) = pPos;
43✔
1079
    SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
43✔
1080
    pPos->beUsed = true;
43✔
1081
    pPos->beFlushed = false;
43✔
1082
    *pWinKey = *pDestWinKey;
43✔
1083
    goto _end;
43✔
1084
  }
1085

1086
  code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
666✔
1087
  QUERY_CHECK_CODE(code, lino, _end);
666!
1088

1089
  (*pWinCount) = TSDB_CODE_FAILED;
666✔
1090

1091
_end:
4,984✔
1092
  if (code != TSDB_CODE_SUCCESS) {
4,984!
1093
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1094
  }
1095
  return code;
4,984✔
1096
}
1097

1098
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
275✔
1099
                                 int32_t* pVLen) {
1100
  SSessionKey* pWinKey = pKey;
275✔
1101
  const TSKEY  gap = 0;
275✔
1102
  int32_t      code = TSDB_CODE_SUCCESS;
275✔
1103
  int32_t      lino = 0;
275✔
1104
  SSHashObj*   pSessionBuff = getRowStateBuff(pFileState);
275✔
1105
  SArray*      pWinStates = NULL;
275✔
1106
  void**       ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
275✔
1107
  if (ppBuff) {
275✔
1108
    pWinStates = (SArray*)(*ppBuff);
59✔
1109
  } else {
1110
    pWinStates = taosArrayInit(16, POINTER_BYTES);
216✔
1111
    code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
216✔
1112
    QUERY_CHECK_CODE(code, lino, _end);
216!
1113
  }
1114

1115
  TSKEY startTs = pWinKey->win.skey;
275✔
1116
  TSKEY endTs = pWinKey->win.ekey;
275✔
1117

1118
  int32_t size = taosArrayGetSize(pWinStates);
275✔
1119
  if (size == 0) {
275✔
1120
    void* pFileStore = getStateFileStore(pFileState);
226✔
1121
    void* pRockVal = NULL;
226✔
1122

1123
    int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
226✔
1124
    if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
226!
1125
      int32_t     valSize = *pVLen;
×
1126
      COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
×
1127
      if ((*pWinStateCount) == winCount) {
×
1128
        code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
×
1129
        QUERY_CHECK_CODE(code, lino, _end);
×
1130
      } else {
1131
        (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
×
1132
        if (!(*pVal)) {
×
1133
          code = TSDB_CODE_OUT_OF_MEMORY;
×
1134
          QUERY_CHECK_CODE(code, lino, _end);
×
1135
        }
1136
        qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
×
1137
               pWinKey->win.ekey, code_file);
1138
      }
1139
    } else {
1140
      code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
226✔
1141
      taosMemoryFree(pRockVal);
226!
1142
      QUERY_CHECK_CODE(code, lino, _end);
226!
1143
    }
1144
  } else {
1145
    code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
49✔
1146
    QUERY_CHECK_CODE(code, lino, _end);
49!
1147
  }
1148

1149
_end:
49✔
1150
  if (code != TSDB_CODE_SUCCESS) {
275!
1151
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1152
  }
1153
  return code;
275✔
1154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc