• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.02
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "thash.h"
24
#include "tsimplehash.h"
25

26
#define FLUSH_RATIO                    0.5
27
#define FLUSH_NUM                      4
28
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
29
#define MIN_NUM_OF_ROW_BUFF            10240
30
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
31
#define MIN_NUM_SEARCH_BUCKET          128
32
#define MAX_ARRAY_SIZE                 1024
33
#define MAX_GROUP_ID_NUM               200000
34
#define NUM_OF_CACHE_WIN               64
35
#define MAX_NUM_OF_CACHE_WIN           128
36
#define MIN_NUM_OF_SORT_CACHE_WIN      40960
37
#define BATCH_LIMIT                    256
38

39
#define DEFAULT_STATE_MAP_CAPACITY 10240
40
#define MAX_STATE_MAP_SIZE         10240000
41

42
#define SET_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) |= 1)
43
#define UNSET_TSDATA_FLAG(ptr, len) ((*(char*)POINTER_SHIFT(ptr, (len - 1))) &= 0)
44
#define HAS_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) & 1)
45

46
#define TASK_KEY               "streamFileState"
47
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
48

49
struct SStreamFileState {
50
  SList*     usedBuffs;
51
  SList*     freeBuffs;
52
  void*      rowStateBuff;
53
  void*      pFileStore;
54
  int32_t    rowSize;
55
  int32_t    selectivityRowSize;
56
  int32_t    keyLen;
57
  uint64_t   preCheckPointVersion;
58
  uint64_t   checkPointVersion;
59
  TSKEY      maxTs;
60
  TSKEY      deleteMark;
61
  TSKEY      flushMark;
62
  uint64_t   maxRowCount;
63
  uint64_t   curRowCount;
64
  GetTsFun   getTs;
65
  char*      id;
66
  char*      cfName;
67
  void*      searchBuff;
68
  SSHashObj* pGroupIdMap;
69
  bool       hasFillCatch;
70
  SSHashObj* pRecFlagMap;
71

72
  _state_buff_cleanup_fn         stateBuffCleanupFn;
73
  _state_buff_remove_fn          stateBuffRemoveFn;
74
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
75
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
76

77
  _state_file_remove_fn stateFileRemoveFn;
78
  _state_file_get_fn    stateFileGetFn;
79

80
  _state_fun_get_fn stateFunctionGetFn;
81
};
82

83
typedef SRowBuffPos SRowBuffInfo;
84

85
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
5,859✔
86
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
5,859✔
87
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
5,859✔
88
}
89

90
int fillTSKeyCompare(const void* pKey1, const void* pDatas, int pos) {
4,325✔
91
  SWinKey* pWin1 = (SWinKey*)pKey1;
4,325✔
92
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
4,325✔
93
  if (pWin1->ts > pWin2->ts) {
4,325✔
94
    return 1;
1,375✔
95
  } else if (pWin1->ts < pWin2->ts) {
2,950✔
96
    return -1;
868✔
97
  }
98

99
  return 0;
2,082✔
100
}
101

102
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
3,658✔
103
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
3,658✔
104
  if (pos) {
3,658✔
105
    (*pos)->beFlushed = true;
1,994✔
106
    (*pos)->invalid = true;
1,994✔
107
  }
108
  return tSimpleHashRemove(pBuff, pKey, keyLen);
3,658✔
109
}
110

111
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,103,329✔
112
  size_t        keyLen = pFileState->keyLen;
6,103,329✔
113
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,103,329✔
114
  if (ppPos) {
6,103,329✔
115
    if ((*ppPos) == pPos) {
6,103,247!
116
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,103,247✔
117
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
6,103,247✔
118
    }
119
  }
120
}
6,103,329✔
121

122
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
123

124
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
3,157✔
125

126
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,135✔
127
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,135✔
128
}
129

130
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
159✔
131
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
159✔
132
}
133

134
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
428,209✔
135
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
428,209!
136
  if (pStateKey == NULL) {
428,209!
137
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
138
    return NULL;
×
139
  }
140
  SWinKey* pWinKey = pPos->pKey;
428,209✔
141
  pStateKey->key = *pWinKey;
428,209✔
142
  pStateKey->opNum = num;
428,209✔
143
  return pStateKey;
428,209✔
144
}
145

146
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
18✔
147
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
18!
148
  if (pStateKey == NULL) {
18!
149
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
150
    return NULL;
×
151
  }
152
  SWinKey* pWinKey = pPos->pKey;
18✔
153
  *pStateKey = *pWinKey;
18✔
154
  return pStateKey;
18✔
155
}
156

157
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
1,852✔
158
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
1,852✔
159
}
160

161
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
262✔
162
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
262✔
163
}
164

165
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
520✔
166
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
520!
167
  if (pStateKey == NULL) {
520!
168
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
169
    return NULL;
×
170
  }
171
  SSessionKey* pWinKey = pPos->pKey;
520✔
172
  pStateKey->key = *pWinKey;
520✔
173
  pStateKey->opNum = num;
520✔
174
  return pStateKey;
520✔
175
}
176

177
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
11!
178

179
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
649✔
180
  *pLen = sizeof(TSKEY);
649✔
181
  (*pVal) = taosMemoryCalloc(1, *pLen);
649!
182
  if ((*pVal) == NULL) {
649!
183
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
184
    return terrno;
×
185
  }
186
  void*   buff = *pVal;
649✔
187
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
649!
188
  return TSDB_CODE_SUCCESS;
649✔
189
}
190

191
static void printSRowBuffPos(SRowBuffPos* buf, const char* info, int32_t line) {
29,305,458✔
192
  // qTrace("[StreamBuff] rowBuf:%p, %s, line:%d", buf->pRowBuff, info, line);
193
}
29,305,458✔
194

195
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
4,674✔
196
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
197
                            SStreamFileState** ppFileState) {
198
  int32_t code = TSDB_CODE_SUCCESS;
4,674✔
199
  int32_t lino = 0;
4,674✔
200
  if (memSize <= 0) {
4,674!
201
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
202
  }
203
  if (rowSize == 0) {
4,674!
204
    code = TSDB_CODE_INVALID_PARA;
×
205
    QUERY_CHECK_CODE(code, lino, _end);
×
206
  }
207

208
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
4,674!
209
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
4,678!
210

211
  rowSize += selectRowSize;
4,678✔
212
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
4,678✔
213
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
4,678✔
214
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
4,679!
215

216
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
4,679✔
217
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
4,680!
218

219
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
4,680✔
220
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
4,679✔
221
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,679✔
222
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
2,873✔
223
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
2,872✔
224
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
2,872✔
225
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
2,872✔
226
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
2,872✔
227

228
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
2,872✔
229
    pFileState->stateFileGetFn = intervalFileGetFn;
2,872✔
230
    pFileState->cfName = taosStrdup("state");
2,872!
231
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
2,872✔
232
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,806✔
233
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,520✔
234
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,521✔
235
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,521✔
236
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,521✔
237
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,521✔
238

239
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,521✔
240
    pFileState->stateFileGetFn = sessionFileGetFn;
1,521✔
241
    pFileState->cfName = taosStrdup("sess");
1,521!
242
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,520✔
243
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
286!
244
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
286✔
245
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
286✔
246
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
286!
247
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
286✔
248
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
286✔
249
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
286✔
250
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
286✔
251

252
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
286✔
253
    pFileState->stateFileGetFn = hashSortFileGetFn;
286✔
254
    pFileState->cfName = taosStrdup("fill");
286!
255
    pFileState->stateFunctionGetFn = NULL;
286✔
256
  }
257

258
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
4,678!
259
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
4,678!
260
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
4,678!
261
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
4,678!
262

263
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,678✔
264
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
79✔
265
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
79!
266
  }
267

268
  pFileState->keyLen = keySize;
4,678✔
269
  pFileState->rowSize = rowSize;
4,678✔
270
  pFileState->selectivityRowSize = selectRowSize;
4,678✔
271
  pFileState->preCheckPointVersion = 0;
4,678✔
272
  pFileState->checkPointVersion = 1;
4,678✔
273
  pFileState->pFileStore = pFile;
4,678✔
274
  pFileState->getTs = fp;
4,678✔
275
  pFileState->curRowCount = 0;
4,678✔
276
  pFileState->deleteMark = delMark;
4,678✔
277
  pFileState->flushMark = INT64_MIN;
4,678✔
278
  pFileState->maxTs = INT64_MIN;
4,678✔
279
  pFileState->id = taosStrdup(taskId);
4,678!
280
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
4,680!
281

282
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
4,680✔
283
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
4,679!
284

285
  pFileState->pRecFlagMap = tSimpleHashInit(1024, hashFn);
4,679✔
286
  QUERY_CHECK_NULL(pFileState->pRecFlagMap, code, lino, _end, terrno);
4,679!
287

288

289
  pFileState->hasFillCatch = true;
4,679✔
290

291
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
4,679✔
292
    code = recoverSnapshot(pFileState, checkpointId);
2,872✔
293
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,807✔
294
    code = recoverSession(pFileState, checkpointId);
1,522✔
295
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
285!
296
    code = recoverFillSnapshot(pFileState, checkpointId);
286✔
297
  }
298
  QUERY_CHECK_CODE(code, lino, _end);
4,677!
299

300
  void*   valBuf = NULL;
4,677✔
301
  int32_t len = 0;
4,677✔
302
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
4,677✔
303
  if (tmpRes == TSDB_CODE_SUCCESS) {
4,675✔
304
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
11!
305
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
11✔
306
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
11✔
307
  }
308
  taosMemoryFreeClear(valBuf);
4,676!
309
  (*ppFileState) = pFileState;
4,676✔
310

311
_end:
4,676✔
312
  if (code != TSDB_CODE_SUCCESS) {
4,676!
313
    streamFileStateDestroy(pFileState);
×
314
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
315
  }
316
  return code;
4,676✔
317
}
318

319
void destroyRowBuffPos(SRowBuffPos* pPos) {
8,285,250✔
320
  if (pPos == NULL) return;
8,285,250!
321
  printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
8,285,250✔
322
  taosMemoryFreeClear(pPos->pKey);
8,285,254!
323
  taosMemoryFreeClear(pPos->pRowBuff);
8,285,248!
324
  taosMemoryFree(pPos);
8,285,249!
325
}
326

327
void destroyRowBuffPosPtr(void* ptr) {
316,975✔
328
  if (!ptr) {
316,975!
329
    return;
×
330
  }
331
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
316,975✔
332
  if (pPos != NULL && !pPos->beUsed) {
316,975!
333
    destroyRowBuffPos(pPos);
288✔
334
  }
335
  *(SRowBuffPos**)ptr = NULL;
316,975✔
336
}
337

338
void destroyRowBuffAllPosPtr(void* ptr) {
2,229,564✔
339
  if (!ptr) {
2,229,564!
340
    return;
×
341
  }
342
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
2,229,564✔
343
  destroyRowBuffPos(pPos);
2,229,564✔
344
  *(SRowBuffPos**)ptr = NULL;
2,229,567✔
345
}
346

347
void destroyRowBuff(void* ptr) {
5,976,292✔
348
  if (!ptr) {
5,976,292!
349
    return;
×
350
  }
351
  taosMemoryFreeClear(*(void**)ptr);
5,976,292!
352
}
353

354
void streamFileStateDestroy(SStreamFileState* pFileState) {
10,714✔
355
  if (!pFileState) {
10,714✔
356
    return;
6,035✔
357
  }
358

359
  taosMemoryFree(pFileState->id);
4,679!
360
  taosMemoryFree(pFileState->cfName);
4,679!
361
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
4,679✔
362
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
4,679✔
363
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
4,679✔
364
  sessionWinStateCleanup(pFileState->searchBuff);
4,679✔
365
  tSimpleHashCleanup(pFileState->pGroupIdMap);
4,679✔
366
  tSimpleHashCleanup(pFileState->pRecFlagMap);
4,679✔
367

368
  memset(pFileState, 0, sizeof(SStreamFileState));
4,679✔
369
  taosMemoryFree(pFileState);
4,679!
370
}
371

372
int32_t getFileStateRowSize(SStreamFileState* pFileState){
628✔
373
  return pFileState->rowSize;
628✔
374
}
375

376
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,370,111✔
377
  int32_t code = TSDB_CODE_SUCCESS;
6,370,111✔
378
  int32_t lino = 0;
6,370,111✔
379
  if (pPos->pRowBuff) {
6,370,111✔
380
    printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
6,370,098✔
381
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
6,370,097✔
382
    QUERY_CHECK_CODE(code, lino, _end);
6,370,104!
383
    pPos->pRowBuff = NULL;
6,370,104✔
384
  }
385

386
_end:
13✔
387
  if (code != TSDB_CODE_SUCCESS) {
6,370,117!
388
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
389
  }
390
  return code;
6,370,134✔
391
}
392

393
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
4,849✔
394
  int32_t   code = TSDB_CODE_SUCCESS;
4,849✔
395
  int32_t   lino = 0;
4,849✔
396
  SListIter iter = {0};
4,849✔
397
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
4,849✔
398

399
  SListNode* pNode = NULL;
4,849✔
400
  while ((pNode = tdListNext(&iter)) != NULL) {
6,170,614✔
401
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
6,165,765✔
402
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
6,165,765!
403
      printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
6,052,044✔
404
      code = putFreeBuff(pFileState, pPos);
6,052,044✔
405
      QUERY_CHECK_CODE(code, lino, _end);
6,052,049!
406

407
      if (!all) {
6,052,049✔
408
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,600✔
409
      }
410
      destroyRowBuffPos(pPos);
6,052,049✔
411
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
6,052,044✔
412
      taosMemoryFreeClear(tmp);
6,052,041!
413
    }
414
  }
415

416
_end:
4,849✔
417
  if (code != TSDB_CODE_SUCCESS) {
4,849!
418
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
419
  }
420
}
4,849✔
421

422
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
207✔
423
  int32_t   code = TSDB_CODE_SUCCESS;
207✔
424
  int32_t   lino = 0;
207✔
425
  uint64_t  i = 0;
207✔
426
  SListIter iter = {0};
207✔
427
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
207✔
428

429
  SListNode* pNode = NULL;
207✔
430
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
952,447!
431
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
952,240✔
432
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
952,240✔
433
      if (all || !pPos->beUsed) {
320,912!
434
        if (all && !pPos->pRowBuff) {
280!
435
          continue;
×
436
        }
437
        code = tdListAppend(pFlushList, &pPos);
280✔
438
        QUERY_CHECK_CODE(code, lino, _end);
280!
439

440
        printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
280✔
441
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
280✔
442
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
280✔
443
        if (pPos->beUsed == false) {
280!
444
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
280✔
445
          taosMemoryFreeClear(tmp);
280!
446
        }
447
        if (pPos->pRowBuff) {
280✔
448
          i++;
250✔
449
        }
450
      }
451
    }
452
  }
453
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
207✔
454

455
_end:
3✔
456
  if (code != TSDB_CODE_SUCCESS) {
207!
457
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
458
  }
459
  return code;
207✔
460
}
461

462
void streamFileStateClear(SStreamFileState* pFileState) {
2,044✔
463
  pFileState->flushMark = INT64_MIN;
2,044✔
464
  pFileState->maxTs = INT64_MIN;
2,044✔
465
  tSimpleHashClear(pFileState->rowStateBuff);
2,044✔
466
  clearExpiredRowBuff(pFileState, 0, true);
2,044✔
467
}
2,044✔
468

469
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
332✔
470

471
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
7,734,972✔
472

473
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
76✔
474
  int32_t   code = TSDB_CODE_SUCCESS;
76✔
475
  int32_t   lino = 0;
76✔
476
  uint64_t  i = 0;
76✔
477
  SListIter iter = {0};
76✔
478
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
76✔
479

480
  SListNode* pNode = NULL;
76✔
481
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
1,583,574✔
482
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,583,498✔
483
    if (pPos->beUsed == used) {
1,583,498✔
484
      if (used && !pPos->pRowBuff) {
633,422✔
485
        continue;
316,727✔
486
      }
487
      code = tdListAppend(pFlushList, &pPos);
316,695✔
488
      QUERY_CHECK_CODE(code, lino, _end);
316,695!
489
      printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
316,695✔
490
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
316,695✔
491
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
316,695✔
492
      if (pPos->beUsed == false) {
316,695✔
493
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
8✔
494
        taosMemoryFreeClear(tmp);
8!
495
      }
496
      if (pPos->pRowBuff) {
316,695!
497
        i++;
316,695✔
498
      }
499
    }
500
  }
501

502
  qInfo("%s stream state flush %d rows to disk. is used:%d", pFileState->id, listNEles(pFlushList), used);
76!
503

504
_end:
×
505
  if (code != TSDB_CODE_SUCCESS) {
76!
506
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
507
  }
508
  return code;
76✔
509
}
510

511
int32_t flushRowBuff(SStreamFileState* pFileState) {
207✔
512
  int32_t          code = TSDB_CODE_SUCCESS;
207✔
513
  int32_t          lino = 0;
207✔
514
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
207✔
515
  if (!pFlushList) {
207!
516
    code = TSDB_CODE_OUT_OF_MEMORY;
×
517
    QUERY_CHECK_CODE(code, lino, _end);
×
518
  }
519

520
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
207✔
521
  num = TMAX(num, FLUSH_NUM);
207✔
522
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
207✔
523
  QUERY_CHECK_CODE(code, lino, _end);
207!
524

525
  if (isListEmpty(pFlushList)) {
207✔
526
    code = popUsedBuffs(pFileState, pFlushList, num, false);
39✔
527
    QUERY_CHECK_CODE(code, lino, _end);
39!
528

529
    if (isListEmpty(pFlushList)) {
39✔
530
      code = popUsedBuffs(pFileState, pFlushList, num, true);
37✔
531
      QUERY_CHECK_CODE(code, lino, _end);
37!
532
    }
533
  }
534

535
  if (pFileState->searchBuff) {
207!
536
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
×
537
    QUERY_CHECK_CODE(code, lino, _end);
×
538
  }
539

540
  flushSnapshot(pFileState, pFlushList, false);
207✔
541

542
  SListIter fIter = {0};
207✔
543
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
207✔
544
  SListNode* pNode = NULL;
207✔
545
  while ((pNode = tdListNext(&fIter)) != NULL) {
317,182✔
546
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
316,975✔
547
    code = putFreeBuff(pFileState, pPos);
316,975✔
548
    QUERY_CHECK_CODE(code, lino, _end);
316,975!
549
  }
550

551
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
207✔
552

553
_end:
207✔
554
  if (code != TSDB_CODE_SUCCESS) {
207!
555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
556
  }
557
  return code;
207✔
558
}
559

560
int32_t clearRowBuff(SStreamFileState* pFileState) {
207✔
561
  if (pFileState->deleteMark != INT64_MAX) {
207!
562
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
563
  }
564
  do {
565
    int32_t code = flushRowBuff(pFileState);
207✔
566
    if (code != TSDB_CODE_SUCCESS) {
207!
567
      return code;
×
568
    }
569
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
207!
570
  return TSDB_CODE_SUCCESS;
207✔
571
}
572

573
int32_t clearFlushedRowBuffByFlag(SStreamFileState* pFileState, uint64_t max) {
5,463✔
574
  int32_t   code = TSDB_CODE_SUCCESS;
5,463✔
575
  int32_t   lino = 0;
5,463✔
576
  uint64_t  i = 0;
5,463✔
577
  SListIter iter = {0};
5,463✔
578
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,463✔
579

580
  SListNode* pNode = NULL;
5,463✔
581
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
8,915✔
582
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
3,452✔
583
    if (pPos->invalid) {
3,452✔
584
      if (!pPos->beUsed) {
482!
585
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
482✔
586
        taosMemoryFreeClear(tmp);
482!
587
        if (pPos->pRowBuff) {
482!
588
          i++;
482✔
589
        }
590
        code = putFreeBuff(pFileState, pPos);
482✔
591
        QUERY_CHECK_CODE(code, lino, _end);
482!
592
        destroyRowBuffPos(pPos);
482✔
593
      }
594
    }
595
  }
596

597
_end:
5,463✔
598
  if (code != TSDB_CODE_SUCCESS) {
5,463!
599
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
600
  }
601
  return code;
5,463✔
602
}
603

604
int32_t clearRowBuffNonFlush(SStreamFileState* pFileState) {
5,463✔
605
  int32_t code = TSDB_CODE_SUCCESS;
5,463✔
606
  int32_t lino = 0;
5,463✔
607

608
  if (pFileState->deleteMark != INT64_MAX) {
5,463!
609
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
610
  }
611

612
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
5,463✔
613
  num = TMAX(num, FLUSH_NUM);
5,463✔
614
  code = clearFlushedRowBuffByFlag(pFileState, num);
5,463✔
615
  QUERY_CHECK_CODE(code, lino, _end);
5,463!
616

617
_end:
5,463✔
618
  if (code != TSDB_CODE_SUCCESS) {
5,463!
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
620
  }
621
  return code;
5,463✔
622
}
623

624
void* getFreeBuff(SStreamFileState* pFileState) {
8,285,597✔
625
  SList*     lists = pFileState->freeBuffs;
8,285,597✔
626
  int32_t    buffSize = pFileState->rowSize;
8,285,597✔
627
  SListNode* pNode = tdListPopHead(lists);
8,285,597✔
628
  if (!pNode) {
8,285,573✔
629
    return NULL;
7,978,923✔
630
  }
631
  void* ptr = *(void**)pNode->data;
306,650✔
632
  memset(ptr, 0, buffSize);
306,650✔
633
  taosMemoryFree(pNode);
306,650!
634
  return ptr;
306,652✔
635
}
636

637
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
638
  if (pPos->pRowBuff) {
×
639
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
640
  }
641
}
×
642

643
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
8,284,955✔
644
  int32_t      code = TSDB_CODE_SUCCESS;
8,284,955✔
645
  int32_t      lino = 0;
8,284,955✔
646
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
8,284,955!
647
  if (!pPos) {
8,285,159!
648
    code = terrno;
×
649
    QUERY_CHECK_CODE(code, lino, _error);
×
650
  }
651

652
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
8,285,159!
653
  if (!pPos->pKey) {
8,285,061!
654
    code = terrno;
×
655
    QUERY_CHECK_CODE(code, lino, _error);
×
656
  }
657

658
  void* pBuff = getFreeBuff(pFileState);
8,285,061✔
659
  if (pBuff) {
8,285,000✔
660
    pPos->pRowBuff = pBuff;
306,179✔
661
    goto _end;
306,179✔
662
  }
663

664
  if (pFileState->curRowCount < pFileState->maxRowCount) {
7,978,821✔
665
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
7,978,703!
666
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
7,978,673!
667
    pPos->pRowBuff = pBuff;
7,978,673✔
668
    pFileState->curRowCount++;
7,978,673✔
669
    goto _end;
7,978,673✔
670
  }
671

672
  code = clearRowBuff(pFileState);
118✔
673
  QUERY_CHECK_CODE(code, lino, _error);
123!
674

675
  pPos->pRowBuff = getFreeBuff(pFileState);
123✔
676
  printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
123✔
677
_end:
8,284,975✔
678
  code = tdListAppend(pFileState->usedBuffs, &pPos);
8,284,975✔
679
  printSRowBuffPos(pPos, __FUNCTION__, __LINE__);
8,285,089✔
680
  QUERY_CHECK_CODE(code, lino, _error);
8,285,073!
681

682
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
8,285,073!
683
_error:
8,285,073✔
684
  if (code != TSDB_CODE_SUCCESS) {
8,285,073!
685
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
686
    return NULL;
×
687
  }
688

689
  return pPos;
8,285,073✔
690
}
691

692
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
8,284,962✔
693
  int32_t      code = TSDB_CODE_SUCCESS;
8,284,962✔
694
  int32_t      lino = 0;
8,284,962✔
695
  SRowBuffPos* newPos = getNewRowPos(pFileState);
8,284,962✔
696
  if (!newPos) {
8,285,067!
697
    code = TSDB_CODE_OUT_OF_MEMORY;
×
698
    QUERY_CHECK_CODE(code, lino, _error);
×
699
  }
700
  newPos->beUsed = true;
8,285,067✔
701
  newPos->beFlushed = false;
8,285,067✔
702
  newPos->needFree = false;
8,285,067✔
703
  newPos->beUpdated = true;
8,285,067✔
704
  newPos->invalid = false;
8,285,067✔
705
  return newPos;
8,285,067✔
706

707
_error:
×
708
  if (code != TSDB_CODE_SUCCESS) {
×
709
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
710
  }
711
  return NULL;
×
712
}
713

714
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
8,895,638✔
715
                             int32_t* pWinCode) {
716
  int32_t code = TSDB_CODE_SUCCESS;
8,895,638✔
717
  int32_t lino = 0;
8,895,638✔
718
  (*pWinCode) = TSDB_CODE_SUCCESS;
8,895,638✔
719
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
8,895,638✔
720
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
8,895,621✔
721
  if (pos) {
8,895,978✔
722
    if (pVal != NULL) {
619,927!
723
      *pVLen = pFileState->rowSize;
619,933✔
724
      *pVal = *pos;
619,933✔
725
      (*pos)->beUsed = true;
619,933✔
726
      (*pos)->beFlushed = false;
619,933✔
727
    }
728
    goto _end;
619,927✔
729
  }
730
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
8,276,051✔
731
  if (!pNewPos || !pNewPos->pRowBuff) {
8,276,136!
UNCOV
732
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
733
    QUERY_CHECK_CODE(code, lino, _end);
×
734
  }
735

736
  memcpy(pNewPos->pKey, pKey, keyLen);
8,276,136✔
737
  (*pWinCode) = TSDB_CODE_FAILED;
8,276,136✔
738

739
  TSKEY ts = pFileState->getTs(pKey);
8,276,136✔
740
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
8,276,121✔
741
    int32_t len = 0;
79✔
742
    void*   p = NULL;
79✔
743
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
79✔
744
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
79✔
745
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
79✔
746
      memcpy(pNewPos->pRowBuff, p, len);
35✔
747
    }
748
    taosMemoryFree(p);
79!
749
  }
750

751
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
8,276,037✔
752
  QUERY_CHECK_CODE(code, lino, _end);
8,275,818!
753

754
  if (pVal) {
8,275,818!
755
    *pVLen = pFileState->rowSize;
8,275,839✔
756
    *pVal = pNewPos;
8,275,839✔
757
  }
758

759
_end:
×
760
  if (code != TSDB_CODE_SUCCESS) {
8,895,745!
761
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
762
  }
763
  return code;
8,895,733✔
764
}
765

766
int32_t createRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
212✔
767
  int32_t code = TSDB_CODE_SUCCESS;
212✔
768
  int32_t lino = 0;
212✔
769

770
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
212✔
771

772
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
212✔
773
  if (!pNewPos || !pNewPos->pRowBuff) {
212!
774
    code = TSDB_CODE_OUT_OF_MEMORY;
×
775
    QUERY_CHECK_CODE(code, lino, _end);
×
776
  }
777

778
  memcpy(pNewPos->pKey, pKey, keyLen);
212✔
779

780
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
212✔
781
  QUERY_CHECK_CODE(code, lino, _end);
212!
782

783
  if (pVal) {
212!
784
    *pVLen = pFileState->rowSize;
212✔
785
    *pVal = pNewPos;
212✔
786
  }
787

788
_end:
×
789
  if (code != TSDB_CODE_SUCCESS) {
212!
790
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
791
  }
792
  return code;
212✔
793
}
794

795
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
5,026✔
796
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
5,026✔
797
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
5,026✔
798
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
5,026✔
799
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
5,016✔
800
  if (pFileState->searchBuff != NULL) {
5,016✔
801
    deleteHashSortRowBuff(pFileState, pKey);
41✔
802
  }
803
}
5,016✔
804

805
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
×
806
  SSHashObj* pRowMap = pFileState->rowStateBuff;
×
807
  void*   pIte = NULL;
×
808
  int32_t iter = 0;
×
809
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
×
810
    size_t keyLen = 0;
×
811
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
812
    if (pKey->groupId == groupId) {
×
813
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
×
814
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
815
    }
816
  }
817

818
  while (1) {
×
819
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
×
820
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
×
821
    SWinKey delKey = {.groupId = groupId};
×
822
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
×
823
    if (code != TSDB_CODE_SUCCESS) {
×
824
      break;
×
825
    }
826
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
×
827
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
×
828
  }
829
}
×
830

831
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
350✔
832
  int32_t code = TSDB_CODE_SUCCESS;
350✔
833
  int32_t lino = 0;
350✔
834
  int32_t len = 0;
350✔
835
  void*   pBuff = NULL;
350✔
836
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
350✔
837
  QUERY_CHECK_CODE(code, lino, _end);
350!
838
  memcpy(pPos->pRowBuff, pBuff, len);
350✔
839
  taosMemoryFree(pBuff);
350!
840

841
_end:
350✔
842
  if (code != TSDB_CODE_SUCCESS) {
350!
843
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
844
  }
845
  return code;
350✔
846
}
847

848
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
350✔
849
  int32_t code = TSDB_CODE_SUCCESS;
350✔
850
  int32_t lino = 0;
350✔
851
  pPos->pRowBuff = getFreeBuff(pFileState);
350✔
852
  if (!pPos->pRowBuff) {
350✔
853
    if (pFileState->curRowCount < pFileState->maxRowCount) {
84!
854
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
855
      if (!pPos->pRowBuff) {
×
856
        code = terrno;
×
857
        QUERY_CHECK_CODE(code, lino, _end);
×
858
      }
859
      pFileState->curRowCount++;
×
860
    } else {
861
      code = clearRowBuff(pFileState);
84✔
862
      QUERY_CHECK_CODE(code, lino, _end);
84!
863
      pPos->pRowBuff = getFreeBuff(pFileState);
84✔
864
    }
865
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
84!
866
  }
867

868
  code = recoverSessionRowBuff(pFileState, pPos);
350✔
869
  QUERY_CHECK_CODE(code, lino, _end);
350!
870

871
_end:
350✔
872
  if (code != TSDB_CODE_SUCCESS) {
350!
873
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
874
  }
875
  return code;
350✔
876
}
877

878
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
7,659,577✔
879
  int32_t code = TSDB_CODE_SUCCESS;
7,659,577✔
880
  int32_t lino = 0;
7,659,577✔
881
  if (pPos->pRowBuff) {
7,659,577✔
882
    if (pPos->needFree) {
7,659,262!
883
      code = recoverSessionRowBuff(pFileState, pPos);
×
884
      QUERY_CHECK_CODE(code, lino, _end);
×
885
    }
886
    (*pVal) = pPos->pRowBuff;
7,659,262✔
887
    goto _end;
7,659,262✔
888
  }
889

890
  code = recoverStateRowBuff(pFileState, pPos);
315✔
891
  QUERY_CHECK_CODE(code, lino, _end);
350!
892

893
  (*pVal) = pPos->pRowBuff;
350✔
894
  // if (!pPos->needFree) {
895
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
896
  //   QUERY_CHECK_CODE(code, lino, _end);
897
  // }
898

899
_end:
7,659,612✔
900
  if (code != TSDB_CODE_SUCCESS) {
7,659,612!
901
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
902
  }
903
  return code;
7,659,565✔
904
}
905

906
bool hasRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, bool hasLimit, bool* pIsLast) {
26✔
907
  bool res = false;
26✔
908
  if (pIsLast != NULL) {
26!
909
    (*pIsLast) = false;
×
910
  }
911

912
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
26✔
913
  if (pos) {
26✔
914
    res = true;
12✔
915
  }
916
  void* pSearchBuff = getSearchBuff(pFileState);
26✔
917
  if (pSearchBuff != NULL) {
26!
918
    void** ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
919
    if (ppBuff != NULL) {
×
920
      SArray* pWinStates = (SArray*)(*ppBuff);
×
921
      if (pIsLast != NULL) {
×
922
        SWinKey* pLastKey = (SWinKey*)taosArrayGetLast(pWinStates);
×
923
        *pIsLast = (winKeyCmprImpl(pKey, pLastKey) == 0);
×
924
      }
925
      if (hasLimit && taosArrayGetSize(pWinStates) <= MIN_NUM_OF_SORT_CACHE_WIN) {
×
926
        res = true;
×
927
      }
928
      if (qDebugFlag & DEBUG_DEBUG) {
×
929
        if (taosArrayGetSize(pWinStates) > 0) {
×
930
          SWinKey* fistKey = (SWinKey*)taosArrayGet(pWinStates, 0);
×
931
          qDebug("===stream===check window state. buff min ts:%" PRId64 ",groupId:%" PRIu64 ".key ts:%" PRId64
×
932
                 ",groupId:%" PRIu64,
933
                 fistKey->ts, fistKey->groupId, pKey->ts, pKey->groupId);
934
        }
935
      }
936
    } else {
937
      res = true;
×
938
    }
939
  }
940
  return res;
26✔
941
}
942

943
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
2,805✔
944
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
822✔
945
                     ? INT64_MIN
946
                     : pFileState->maxTs - pFileState->deleteMark;
3,627✔
947
  clearExpiredRowBuff(pFileState, mark, false);
2,805✔
948
  return pFileState->usedBuffs;
2,805✔
949
}
950

951
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
3,012✔
952
  int32_t   code = TSDB_CODE_SUCCESS;
3,012✔
953
  int32_t   lino = 0;
3,012✔
954
  SListIter iter = {0};
3,012✔
955
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
3,012✔
956

957
  int64_t    st = taosGetTimestampMs();
3,012✔
958
  SListNode* pNode = NULL;
3,012✔
959

960
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
3,012✔
961

962
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
3,012✔
963
  char*   buf = taosMemoryCalloc(1, len);
3,012!
964
  if (!buf) {
3,012!
965
    code = terrno;
×
966
    QUERY_CHECK_CODE(code, lino, _end);
×
967
  }
968

969
  void* batch = streamStateCreateBatch();
3,012✔
970
  if (!batch) {
3,012!
971
    code = TSDB_CODE_OUT_OF_MEMORY;
×
972
    QUERY_CHECK_CODE(code, lino, _end);
×
973
  }
974

975
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
433,708!
976
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
430,696✔
977
    if (pPos->beFlushed || !pPos->pRowBuff) {
430,696!
978
      continue;
1,949✔
979
    }
980
    pPos->beFlushed = true;
428,747✔
981
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
428,747✔
982

983
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
428,747✔
984
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
428,747✔
985
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,640✔
986
      streamStateClearBatch(batch);
1,640✔
987
      QUERY_CHECK_CODE(code, lino, _end);
1,640!
988
    }
989

990
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
428,747✔
991
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
428,747!
992

993
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
428,747✔
994
                                       0, buf);
995
    taosMemoryFreeClear(pSKey);
428,747!
996
    QUERY_CHECK_CODE(code, lino, _end);
428,747!
997
    // todo handle failure
998
    memset(buf, 0, len);
428,747✔
999
  }
1000
  taosMemoryFreeClear(buf);
3,012!
1001

1002
  int32_t numOfElems = streamStateGetBatchSize(batch);
3,012✔
1003
  if (numOfElems > 0) {
3,012✔
1004
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
688✔
1005
    QUERY_CHECK_CODE(code, lino, _end);
688!
1006
  } else {
1007
    goto _end;
2,324✔
1008
  }
1009

1010
  streamStateClearBatch(batch);
688✔
1011

1012
  clearSearchBuff(pFileState);
688✔
1013

1014
  int64_t elapsed = taosGetTimestampMs() - st;
688✔
1015
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
688✔
1016
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
1017

1018
  if (flushState) {
688✔
1019
    void*   valBuf = NULL;
649✔
1020
    int32_t len = 0;
649✔
1021
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
649✔
1022
    QUERY_CHECK_CODE(code, lino, _end);
649!
1023

1024
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
649✔
1025
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
649✔
1026
    taosMemoryFree(valBuf);
649!
1027
    QUERY_CHECK_CODE(code, lino, _end);
649!
1028

1029
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
649✔
1030
    QUERY_CHECK_CODE(code, lino, _end);
649!
1031
  }
1032

1033
_end:
39✔
1034
  if (code != TSDB_CODE_SUCCESS) {
3,012!
1035
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1036
  }
1037
  taosMemoryFree(buf);
3,012!
1038
  streamStateDestroyBatch(batch);
3,012✔
1039
}
3,012✔
1040

1041
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
1042
  char keyBuf[128] = {0};
×
1043
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64, TASK_KEY, checkpointId));
×
1044
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
1045
}
1046

1047
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
1048
  int32_t code = TSDB_CODE_SUCCESS;
×
1049
  int64_t maxCheckPointId = 0;
×
1050
  {
1051
    char    buf[128] = {0};
×
1052
    void*   val = NULL;
×
1053
    int32_t len = 0;
×
1054
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
1055
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1056
    if (code != 0 || len == 0 || val == NULL) {
×
1057
      return TSDB_CODE_FAILED;
×
1058
    }
1059
    memcpy(buf, val, len);
×
1060
    buf[len] = 0;
×
1061
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
1062
    taosMemoryFree(val);
×
1063
  }
1064
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
1065
    char    buf[128] = {0};
×
1066
    void*   val = 0;
×
1067
    int32_t len = 0;
×
1068
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64, TASK_KEY, i));
×
1069
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1070
    if (code != 0) {
×
1071
      return TSDB_CODE_FAILED;
×
1072
    }
1073
    memcpy(buf, val, len);
×
1074
    buf[len] = 0;
×
1075
    taosMemoryFree(val);
×
1076

1077
    TSKEY ts;
1078
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
1079
    if (ts < mark) {
×
1080
      // statekey winkey.ts < mark
1081
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
1082
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1083
      break;
×
1084
    }
1085
  }
1086
  return code;
×
1087
}
1088

1089
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,521✔
1090
  int32_t code = TSDB_CODE_SUCCESS;
1,521✔
1091
  int32_t lino = 0;
1,521✔
1092
  int32_t winRes = TSDB_CODE_SUCCESS;
1,521✔
1093
  if (pFileState->maxTs != INT64_MIN) {
1,521!
1094
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1095
                       ? INT64_MIN
1096
                       : pFileState->maxTs - pFileState->deleteMark;
×
1097
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1098
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1099
  }
1100

1101
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,521✔
1102
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,519✔
1103
  while (winRes == TSDB_CODE_SUCCESS) {
1,519!
1104
    if (pFileState->curRowCount >= recoverNum) {
1,519!
1105
      break;
1,520✔
1106
    }
1107

1108
    void*       pVal = NULL;
1,519✔
1109
    int32_t     vlen = 0;
1,519✔
1110
    SSessionKey key = {0};
1,519✔
1111
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,519✔
1112
    if (winRes != TSDB_CODE_SUCCESS) {
1,520!
1113
      break;
1,520✔
1114
    }
1115

1116
    if (vlen != pFileState->rowSize) {
×
1117
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1118
      qError("[StreamInternal] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],vlen:%d, rowSize:%d", key.win.skey, key.win.ekey, key.groupId, vlen, pFileState->rowSize);
×
1119
      QUERY_CHECK_CODE(code, lino, _end);
×
1120
    }
1121

1122
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
1123
    pPos->beUsed = false;
×
1124
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
1125
    if (winRes != TSDB_CODE_SUCCESS) {
×
1126
      break;
×
1127
    }
1128

1129
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
1130
  }
1131
_end:
×
1132
  if (code != TSDB_CODE_SUCCESS) {
1,520!
1133
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1134
  }
1135
  streamStateFreeCur(pCur);
1,520✔
1136
  return code;
1,519✔
1137
}
1138

1139
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
2,870✔
1140
  int32_t code = TSDB_CODE_SUCCESS;
2,870✔
1141
  int32_t lino = 0;
2,870✔
1142
  int32_t winCode = TSDB_CODE_SUCCESS;
2,870✔
1143
  if (pFileState->maxTs != INT64_MIN) {
2,870!
1144
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1145
                       ? INT64_MIN
1146
                       : pFileState->maxTs - pFileState->deleteMark;
×
1147
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1148
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1149
  }
1150

1151
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
2,870✔
1152
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2,869✔
1153
  while (winCode == TSDB_CODE_SUCCESS) {
2,869!
1154
    if (pFileState->curRowCount >= recoverNum) {
2,870!
1155
      break;
2,870✔
1156
    }
1157

1158
    void*        pVal = NULL;
2,870✔
1159
    int32_t      vlen = 0;
2,870✔
1160
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
2,870✔
1161
    if (!pNewPos || !pNewPos->pRowBuff) {
2,871!
1162
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1163
      QUERY_CHECK_CODE(code, lino, _end);
×
1164
    }
1165

1166
    winCode =
1167
        streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
2,871✔
1168
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
2,869✔
1169
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
2,869!
1170
      destroyRowBuffPos(pNewPos);
2,869✔
1171
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2,871✔
1172
      taosMemoryFreeClear(pNode);
2,870!
1173
      taosMemoryFreeClear(pVal);
2,870!
1174
      break;
2,870✔
1175
    }
1176
    if (vlen != pFileState->rowSize) {
×
1177
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1178
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1179
      taosMemoryFreeClear(pVal);
×
1180
      QUERY_CHECK_CODE(code, lino, _end);
×
1181
    }
1182
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1183
    taosMemoryFreeClear(pVal);
×
1184
    pNewPos->beFlushed = true;
×
1185
    pNewPos->beUsed = false;
×
1186
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1187
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1188
    if (code != TSDB_CODE_SUCCESS) {
×
1189
      destroyRowBuffPos(pNewPos);
×
1190
      break;
×
1191
    }
1192
    streamStateCurPrev_rocksdb(pCur);
×
1193
  }
1194

1195
_end:
×
1196
  if (code != TSDB_CODE_SUCCESS) {
2,869!
1197
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1198
  }
1199
  streamStateFreeCur(pCur);
2,869✔
1200
  return code;
2,870✔
1201
}
1202

1203
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
262,607✔
1204

1205
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
827✔
1206
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
827✔
1207
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
827✔
1208
}
827✔
1209

1210
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
37,591✔
1211
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
8,637,237✔
1212

1213
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
18,224✔
1214

1215
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
8,277,582✔
1216
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
14,069,791!
1217
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,792,209✔
1218
}
1219

1220
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
5,764,583✔
1221

1222
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
8✔
1223

1224
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
263,880✔
1225

1226
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
262,607✔
1227
  int32_t winCode = TSDB_CODE_SUCCESS;
262,607✔
1228
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
262,607✔
1229
}
1230

1231
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
286✔
1232
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1233
  int32_t lino = 0;
286✔
1234
  if (pFileState->maxTs != INT64_MIN) {
286!
1235
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1236
                       ? INT64_MIN
1237
                       : pFileState->maxTs - pFileState->deleteMark;
×
1238
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1239
    QUERY_CHECK_CODE(code, lino, _end);
×
1240
  }
1241

1242
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
286✔
1243
  if (pCur == NULL) {
286✔
1244
    return code;
284✔
1245
  }
1246
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1247
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1248
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1249
    if (pFileState->curRowCount >= recoverNum) {
4!
1250
      break;
2✔
1251
    }
1252

1253
    void*        pVal = NULL;
4✔
1254
    int32_t      vlen = 0;
4✔
1255
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1256
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1257
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1258
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1259
      destroyRowBuffPos(pNewPos);
2✔
1260
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1261
      taosMemoryFreeClear(pNode);
2!
1262
      taosMemoryFreeClear(pVal);
2!
1263
      break;
2✔
1264
    }
1265

1266
    if (vlen != pFileState->rowSize) {
2!
1267
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1268
      destroyRowBuffPos(pNewPos);
×
1269
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1270
      taosMemoryFreeClear(pVal);
×
1271
      QUERY_CHECK_CODE(code, lino, _end);
×
1272
    }
1273

1274
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1275
    taosMemoryFreeClear(pVal);
2!
1276
    pNewPos->beFlushed = true;
2✔
1277
    pNewPos->beUsed = false;
2✔
1278
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1279
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1280
    if (winRes != TSDB_CODE_SUCCESS) {
2!
1281
      destroyRowBuffPos(pNewPos);
×
1282
      break;
×
1283
    }
1284
    streamStateCurPrev_rocksdb(pCur);
2✔
1285
  }
1286
  streamStateFreeCur(pCur);
2✔
1287

1288
_end:
2✔
1289
  if (code != TSDB_CODE_SUCCESS) {
2!
1290
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1291
  }
1292
  return code;
2✔
1293
}
1294

1295
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
2,053✔
1296
                   int32_t* pWinCode) {
1297
  int32_t code = TSDB_CODE_SUCCESS;
2,053✔
1298
  int32_t lino = 0;
2,053✔
1299
  (*pWinCode) = TSDB_CODE_FAILED;
2,053✔
1300
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
2,053✔
1301
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
2,053✔
1302
  if (ppPos) {
2,053✔
1303
    *pVLen = pFileState->rowSize;
908✔
1304
    *pVal = *ppPos;
908✔
1305
    (*ppPos)->beUsed = true;
908✔
1306
    (*ppPos)->beFlushed = false;
908✔
1307
    (*pWinCode) = TSDB_CODE_SUCCESS;
908✔
1308
    if ((*ppPos)->pRowBuff == NULL) {
908!
1309
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1310
      QUERY_CHECK_CODE(code, lino, _end);
×
1311
    }
1312
    goto _end;
908✔
1313
  }
1314
  TSKEY ts = pFileState->getTs(pKey);
1,145✔
1315
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
1,145!
1316
    int32_t len = 0;
4✔
1317
    void*   p = NULL;
4✔
1318
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
4✔
1319
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
4!
1320
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
4!
1321
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1322
      if (!pNewPos || !pNewPos->pRowBuff) {
4!
1323
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1324
        QUERY_CHECK_CODE(code, lino, _end);
×
1325
      }
1326

1327
      memcpy(pNewPos->pKey, pKey, keyLen);
4✔
1328
      memcpy(pNewPos->pRowBuff, p, len);
4✔
1329
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
4✔
1330
      QUERY_CHECK_CODE(code, lino, _end);
4!
1331

1332
      if (pVal) {
4!
1333
        *pVLen = pFileState->rowSize;
4✔
1334
        *pVal = pNewPos;
4✔
1335
      }
1336
    }
1337
    taosMemoryFree(p);
4!
1338
  }
1339

1340
_end:
1,141✔
1341
  if (code != TSDB_CODE_SUCCESS) {
2,053!
1342
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1343
  }
1344
  return code;
2,053✔
1345
}
1346

1347
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
544✔
1348
  int32_t code = TSDB_CODE_SUCCESS;
544✔
1349
  int32_t lino = 0;
544✔
1350
  if (value != NULL) {
544!
1351
    code = TSDB_CODE_INVALID_PARA;
×
1352
    QUERY_CHECK_CODE(code, lino, _end);
×
1353
  }
1354

1355
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
544✔
1356
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
194!
1357
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
194✔
1358
      QUERY_CHECK_CODE(code, lino, _end);
194!
1359
    }
1360
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
194✔
1361
    QUERY_CHECK_CODE(code, lino, _end);
194!
1362
  }
1363

1364
_end:
544✔
1365
  if (code != TSDB_CODE_SUCCESS) {
544!
1366
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1367
  }
1368
  return code;
544✔
1369
}
1370

1371
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
1,533✔
1372
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
1,533✔
1373
  if (pCur->hashIter == -1) {
1,533✔
1374
    streamStateCurNext(pFileState->pFileStore, pCur);
100✔
1375
    return;
100✔
1376
  }
1377

1378
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,433!
1379
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
1,433✔
1380

1381
  SSHashObj* pHash = pFileState->pGroupIdMap;
1,433✔
1382
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
1,433✔
1383
  if (!pCur->pHashData) {
1,433✔
1384
    pCur->hashIter = -1;
915✔
1385
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
915✔
1386
    return;
901✔
1387
  }
1388
}
1389

1390
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
4,964✔
1391
  int32_t code = TSDB_CODE_SUCCESS;
4,964✔
1392
  if (pCur->pHashData) {
4,964✔
1393
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,433!
1394
    return code;
1,433✔
1395
  }
1396
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
3,531✔
1397
}
1398

1399
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
3,475✔
1400
  return pFileState->pGroupIdMap;
3,475✔
1401
}
1402

1403
void clearExpiredState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs) {
5,463✔
1404
  int32_t    code = TSDB_CODE_SUCCESS;
5,463✔
1405
  int32_t    lino = 0;
5,463✔
1406
  SSHashObj* pSearchBuff = pFileState->searchBuff;
5,463✔
1407
  void*      pIte = NULL;
5,463✔
1408
  int32_t    iter = 0;
5,463✔
1409
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
7,924✔
1410
    SArray* pWinStates = *((void**)pIte);
2,461✔
1411
    int32_t arraySize = TARRAY_SIZE(pWinStates);
2,461✔
1412
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
2,461!
1413
      SWinKey key = {.ts = minTs};
×
1414
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
1415
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
1416
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
1417
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
1418
    }
1419

1420
    int32_t size = arraySize - numOfKeep;
2,461✔
1421
    for (int32_t i = 0; i < size; i++) {
2,944✔
1422
      SWinKey* pKey = taosArrayGet(pWinStates, i);
483✔
1423
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
483✔
1424
      qTrace("clear expired buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d res:%d", pKey->ts, pKey->groupId, __func__, __LINE__, code_buff);
483!
1425

1426
      if (isFlushedState(pFileState, pKey->ts, 0)) {
483✔
1427
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
4✔
1428
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
4!
1429
      }
1430

1431
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
483!
1432
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SWinKey));
×
1433
      }
1434
    }
1435
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
2,461✔
1436
  }
1437
  code = clearRowBuffNonFlush(pFileState);
5,463✔
1438
  QUERY_CHECK_CODE(code, lino, _end);
5,463!
1439

1440
_end:
5,463✔
1441
  if (code != TSDB_CODE_SUCCESS) {
5,463!
1442
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1443
  }
1444
}
5,463✔
1445

1446
#ifdef BUILD_NO_CALL
1447
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1448
                              int32_t* pWinCode) {
1449
  int32_t code = TSDB_CODE_SUCCESS;
1450
  int32_t lino = 0;
1451

1452
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1453
  QUERY_CHECK_CODE(code, lino, _end);
1454

1455
  SArray*    pWinStates = NULL;
1456
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1457
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1458
  if (ppBuff) {
1459
    pWinStates = (SArray*)(*ppBuff);
1460
  } else {
1461
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1462
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1463

1464
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1465
    QUERY_CHECK_CODE(code, lino, _end);
1466
  }
1467

1468
  // recover
1469
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1470
    recoverHashSortBuff(pFileState, pWinStates, pKey->groupId);
1471
  }
1472

1473
  int32_t size = taosArrayGetSize(pWinStates);
1474
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1475
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0) {
1476
    // find the first position which is smaller than the pKey
1477
    if (index >= 0) {
1478
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1479
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1480
        goto _end;
1481
      }
1482
    }
1483
    index++;
1484
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1485
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1486
  }
1487

1488
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1489
    int32_t num = size - NUM_OF_CACHE_WIN;
1490
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1491
  }
1492

1493
_end:
1494
  if (code != TSDB_CODE_SUCCESS) {
1495
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1496
  }
1497
  return code;
1498
}
1499
#endif
1500

1501
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
103✔
1502
                           int32_t* pVLen, int32_t* pWinCode) {
1503
  int32_t    code = TSDB_CODE_SUCCESS;
103✔
1504
  int32_t    lino = 0;
103✔
1505
  SArray*    pWinStates = NULL;
103✔
1506
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
103✔
1507
  void*      pState = getStateFileStore(pFileState);
103✔
1508
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
103✔
1509
  if (ppBuff) {
103!
1510
    pWinStates = (SArray*)(*ppBuff);
103✔
1511
  } else if (needClearDiskBuff(pFileState)) {
×
1512
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1513
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1514
    void*            tmpVal = NULL;
×
1515
    int32_t          len = 0;
×
1516
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1517
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1518
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1519
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1520
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1521
        QUERY_CHECK_CODE(code, lino, _end);
×
1522
      }
1523
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1524
      taosMemoryFreeClear(tmpVal);
×
1525
      *pVLen = getRowStateRowSize(pFileState);
×
1526
      (*ppVal) = pNewPos;
×
1527
    }
1528
    streamStateFreeCur(pCur);
×
1529
    return code;
×
1530
  } else {
1531
    (*pWinCode) = TSDB_CODE_FAILED;
×
1532
    return code;
×
1533
  }
1534
  int32_t size = taosArrayGetSize(pWinStates);
103✔
1535
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
103✔
1536
  if (index >= 0) {
103!
1537
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
103✔
1538
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
103!
1539
      index--;
103✔
1540
    } else {
1541
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__,
×
1542
             __LINE__);
1543
    }
1544
  }
1545
  if (index == -1) {
103✔
1546
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
27✔
1547
    void*            tmpVal = NULL;
27✔
1548
    int32_t          len = 0;
27✔
1549
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
27✔
1550
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
27!
1551
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1552
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1553
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1554
        QUERY_CHECK_CODE(code, lino, _end);
×
1555
      }
1556
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1557
      taosMemoryFreeClear(tmpVal);
×
1558
      *pVLen = getRowStateRowSize(pFileState);
×
1559
      (*ppVal) = pNewPos;
×
1560
    }
1561
    streamStateFreeCur(pCur);
27✔
1562
    return code;
27✔
1563
  } else {
1564
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
76✔
1565
    *pResKey = *pPrevKey;
76✔
1566
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
76✔
1567
  }
1568
  (*pWinCode) = TSDB_CODE_FAILED;
1569

1570
_end:
×
1571
  if (code != TSDB_CODE_SUCCESS) {
×
1572
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1573
  }
1574
  return code;
×
1575
}
1576

1577
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey, bool* pIsEnd) {
2,952✔
1578
  int32_t code = TSDB_CODE_SUCCESS;
2,952✔
1579
  int32_t lino = 0;
2,952✔
1580
  int32_t size = taosArrayGetSize(pWinStates);
2,952✔
1581
  int32_t index = binarySearch(pWinStates, size, pKey, fillTSKeyCompare);
2,952✔
1582
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
2,952!
1583
    if (index >= 0) {
2,952✔
1584
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
2,615✔
1585
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
2,615✔
1586
        goto _end;
1,930✔
1587
      }
1588
    }
1589
    index++;
1,022✔
1590
    (*pIsEnd) = (index >= size);
1,022✔
1591
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1,022✔
1592
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,022!
1593
  }
1594

1595
_end:
1,022✔
1596
  if (code != TSDB_CODE_SUCCESS) {
2,952!
1597
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1598
  }
1599
  return code;
2,952✔
1600
}
1601

1602
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
4,615✔
1603
  int32_t code = TSDB_CODE_SUCCESS;
4,615✔
1604
  int32_t lino = 0;
4,615✔
1605
  SArray* pWinStates = NULL;
4,615✔
1606
  void**  ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
4,615✔
1607
  if (ppBuff) {
4,616✔
1608
    pWinStates = (SArray*)(*ppBuff);
4,289✔
1609
  } else {
1610
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
327✔
1611
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
327!
1612

1613
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
327✔
1614
    QUERY_CHECK_CODE(code, lino, _end);
327!
1615
  }
1616

1617
  (*ppResStates) = pWinStates;
4,616✔
1618

1619
_end:
4,616✔
1620
  if (code != TSDB_CODE_SUCCESS) {
4,616!
1621
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1622
  }
1623
  return code;
4,616✔
1624
}
1625

1626
static void setValueBuff(TSKEY ts, char* pVal, int32_t len, char* pBuff, int32_t buffLen) {
×
1627
  SET_TSDATA_FLAG(pBuff, buffLen);
×
1628
  if (len == 0) {
×
1629
    *(TSKEY*)pBuff = ts;
×
1630
    return;
×
1631
  }
1632
  memset(pBuff, 0, buffLen - 1);
×
1633
  *(TSKEY*)pBuff = ts;
×
1634
  memcpy(pBuff + sizeof(TSKEY), pVal, len);
×
1635
}
1636

1637
int32_t getAndSetTsData(STableTsDataState* pTsDataState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
×
1638
                        TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode) {
1639
  int32_t code = TSDB_CODE_SUCCESS;
×
1640
  int32_t lino = 0;
×
1641
  bool    hasPk = (lastPkLen != 0);
×
1642

1643
  TSKEY* pDataVal = tSimpleHashGet(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t));
×
1644
  if (pDataVal != NULL) {
×
1645
    (*pWinCode) = TSDB_CODE_SUCCESS;
×
1646
    *pCurTs = *pDataVal;
×
1647
    if ((*pCurTs) < lastTs) {
×
1648
      setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1649
    } else {
1650
      if (hasPk) {
×
1651
        (*ppCurPkVal) = POINTER_SHIFT(pDataVal, sizeof(TSKEY));
×
1652
        if ((*pCurTs) == lastTs && pTsDataState->comparePkColFn((*ppCurPkVal), pLastPkVal) < 0) {
×
1653
          setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1654
        }
1655
      }
1656
    }
1657
  } else {
1658
    setValueBuff(lastTs, pLastPkVal, lastPkLen, pTsDataState->pPkValBuff, pTsDataState->pkValLen);
×
1659
    int32_t size = tSimpleHashGetSize(pTsDataState->pTableTsDataMap);
×
1660
    if (size < MAX_STATE_MAP_SIZE) {
×
1661
      (*pWinCode) = TSDB_CODE_FAILED;
×
1662
      code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pTsDataState->pPkValBuff,
×
1663
                            pTsDataState->pkValLen);
×
1664
      QUERY_CHECK_CODE(code, lino, _end);
×
1665
    } else {
1666
      (*pWinCode) = streamStateGetParTag_rocksdb(pTsDataState->pState, tableUid, &pTsDataState->pPkValBuff,
×
1667
                                                 &pTsDataState->pkValLen);
1668
      if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1669
        *pCurTs = *(TSKEY*)pTsDataState->pPkValBuff;
×
1670
        if (hasPk) {
×
1671
          (*ppCurPkVal) = POINTER_SHIFT(pTsDataState->pPkValBuff, sizeof(TSKEY));
×
1672
        }
1673
      }
1674

1675
      int32_t tmpCode = streamStatePutParTag_rocksdb(pTsDataState->pState, tableUid, pTsDataState->pPkValBuff,
×
1676
                                                     pTsDataState->pkValLen);
1677
    }
1678
  }
1679

1680
_end:
×
1681
  if (code != TSDB_CODE_SUCCESS) {
×
1682
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1683
  }
1684
  return code;
×
1685
}
1686

1687
int32_t doTsDataCommit(STableTsDataState* pTsDataState) {
×
1688
  int32_t code = TSDB_CODE_SUCCESS;
×
1689
  int32_t lino = 0;
×
1690
  void*   batch = NULL;
×
1691
  char*   pTempBuf = NULL;
×
1692

1693
  batch = streamStateCreateBatch();
×
1694
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
×
1695
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "partag");
×
1696
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
1697
  pTempBuf = taosMemoryCalloc(1, len);
×
1698
  QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
×
1699

1700
  void*   pIte = NULL;
×
1701
  int32_t iter = 0;
×
1702
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1703
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
1704
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1705
      streamStateClearBatch(batch);
×
1706
      QUERY_CHECK_CODE(code, lino, _end);
×
1707
    }
1708

1709
    if (HAS_TSDATA_FLAG(pIte, pTsDataState->pkValLen)) {
×
1710
      void* pKey = tSimpleHashGetKey(pIte, NULL);
×
1711
      UNSET_TSDATA_FLAG(pIte, pTsDataState->pkValLen);
×
1712
      code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, pKey, pIte, pTsDataState->pkValLen, 0,
×
1713
                                         pTempBuf);
1714
      QUERY_CHECK_CODE(code, lino, _end);
×
1715
      memset(pTempBuf, 0, len);
×
1716
      qDebug("flush ts data,table id:%" PRIu64 , *(uint64_t*)pKey);
×
1717
    }
1718
  }
1719

1720
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
1721
  if (numOfElems > 0) {
×
1722
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1723
    QUERY_CHECK_CODE(code, lino, _end);
×
1724
  }
1725

1726
_end:
×
1727
  if (code != TSDB_CODE_SUCCESS) {
×
1728
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1729
  }
1730
  taosMemoryFree(pTempBuf);
×
1731
  streamStateDestroyBatch(batch);
×
1732
  return code;
×
1733
}
1734

1735
int32_t doRangeDataCommit(STableTsDataState* pTsDataState) {
×
1736
  int32_t code = TSDB_CODE_SUCCESS;
×
1737
  int32_t lino = 0;
×
1738
  void*   batch = NULL;
×
1739

1740
  batch = streamStateCreateBatch();
×
1741
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
×
1742
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "sess");
×
1743
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
1744

1745
  int32_t size = taosArrayGetSize(pTsDataState->pScanRanges);
×
1746
  for (int32_t i = 0; i < size; i++) {
×
1747
    SScanRange* pRange = taosArrayGet(pTsDataState->pScanRanges, i);
×
1748
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
1749
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1750
      streamStateClearBatch(batch);
×
1751
      QUERY_CHECK_CODE(code, lino, _end);
×
1752
    }
1753
    SSessionKey key = {.win = pRange->win, .groupId = 0};
×
1754
    int32_t     uidSize = tSimpleHashGetSize(pRange->pUIds);
×
1755
    int32_t     gpIdSize = tSimpleHashGetSize(pRange->pGroupIds);
×
1756
    int32_t     size = uidSize + gpIdSize;
×
1757
    uint64_t*   pIdBuf = (uint64_t*)taosMemoryCalloc(1, size);
×
1758
    void*       pIte = NULL;
×
1759
    int32_t     iter = 0;
×
1760
    int32_t     i = 0;
×
1761
    while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1762
      void* pTempKey = tSimpleHashGetKey(pIte, NULL);
×
1763
      pIdBuf[i] = *(uint64_t*)pTempKey;
×
1764
      i++;
×
1765
    }
1766

1767
    code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, &key, (void*)pIdBuf, size, 0,
×
1768
                                       NULL);
1769
    QUERY_CHECK_CODE(code, lino, _end);
×
1770
  }
1771

1772
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
1773
  if (numOfElems > 0) {
×
1774
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1775
    QUERY_CHECK_CODE(code, lino, _end);
×
1776
  }
1777

1778
_end:
×
1779
  if (code != TSDB_CODE_SUCCESS) {
×
1780
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1781
  }
1782
  streamStateDestroyBatch(batch);
×
1783
  return code;
×
1784
}
1785

1786
int32_t initTsDataState(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState, void* pOtherState) {
×
1787
  int32_t    code = TSDB_CODE_SUCCESS;
×
1788
  int32_t    lino = 0;
×
1789

1790
  STableTsDataState* pTsDataState = taosMemoryCalloc(1, sizeof(STableTsDataState));
×
1791
  QUERY_CHECK_NULL(pTsDataState, code, lino, _end, terrno);
×
1792

1793
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
×
1794
  pTsDataState->pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
×
1795
  QUERY_CHECK_NULL(pTsDataState->pTableTsDataMap, code, lino, _end, terrno);
×
1796

1797
  pTsDataState->pkValLen = sizeof(TSKEY) + pkLen + sizeof(char);
×
1798
  pTsDataState->pPkValBuff = taosMemoryCalloc(1, pTsDataState->pkValLen);
×
1799
  QUERY_CHECK_NULL(pTsDataState->pPkValBuff, code, lino, _end, terrno);
×
1800

1801
  if (pkLen != 0) {
×
1802
    pTsDataState->comparePkColFn = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
×
1803
  } else {
1804
    pTsDataState->comparePkColFn = NULL;
×
1805
  }
1806

1807
  pTsDataState->pScanRanges = taosArrayInit(64, sizeof(SScanRange));
×
1808
  QUERY_CHECK_NULL(pTsDataState->pScanRanges, code, lino, _end, terrno);
×
1809

1810
  pTsDataState->pState = pState;
×
1811
  pTsDataState->recValueLen = sizeof(SRecDataInfo) + pkLen;
×
1812
  pTsDataState->pRecValueBuff = taosMemoryCalloc(1, pTsDataState->recValueLen);
×
1813
  QUERY_CHECK_NULL(pTsDataState->pRecValueBuff, code, lino, _end, terrno);
×
1814

1815
  pTsDataState->curRecId = -1;
×
1816

1817
  pTsDataState->pStreamTaskState = pOtherState;
×
1818

1819
  pTsDataState->cfgIndex = streamStateGetCfIdx(pTsDataState->pState, "sess");
×
1820
  pTsDataState->pBatch = streamStateCreateBatch();
×
1821
  QUERY_CHECK_NULL(pTsDataState->pBatch, code, lino, _end, TSDB_CODE_FAILED);
×
1822
  
1823
  pTsDataState->batchBufflen = (pTsDataState->recValueLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
1824
  pTsDataState->pBatchBuff = taosMemoryCalloc(1, pTsDataState->batchBufflen);
×
1825

1826
  (*ppTsDataState) = pTsDataState;
×
1827

1828
_end:
×
1829
  if (code != TSDB_CODE_SUCCESS) {
×
1830
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1831
  }
1832
  return code;
×
1833
}
1834

1835
static void destroyScanRange(SScanRange* pRange) {
×
1836
  pRange->win.skey = INT64_MIN;
×
1837
  pRange->win.ekey = INT64_MIN;
×
1838
  tSimpleHashCleanup(pRange->pUIds);
×
1839
  pRange->pUIds = NULL;
×
1840
  tSimpleHashCleanup(pRange->pGroupIds);
×
1841
  pRange->pGroupIds = NULL;
×
1842
}
×
1843

1844
void destroyTsDataState(STableTsDataState* pTsDataState) {
×
1845
  SArray* pScanRanges = pTsDataState->pScanRanges;
×
1846
  int32_t size = taosArrayGetSize(pScanRanges);
×
1847
  for (int32_t i = 0; i < size; i++) {
×
1848
    SScanRange* pRange = taosArrayGet(pScanRanges, i);
×
1849
    destroyScanRange(pRange);
×
1850
  }
1851
  taosArrayDestroy(pTsDataState->pScanRanges);
×
1852
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
×
1853
  taosMemoryFreeClear(pTsDataState->pPkValBuff);
×
1854
  taosMemoryFreeClear(pTsDataState->pState);
×
1855
  taosMemoryFreeClear(pTsDataState->pRecValueBuff);
×
1856
  pTsDataState->pStreamTaskState = NULL;
×
1857

1858
  streamStateClearBatch(pTsDataState->pBatch);
×
1859
  streamStateDestroyBatch(pTsDataState->pBatch);
×
1860
  pTsDataState->pBatch = NULL;
×
1861
  taosMemoryFreeClear(pTsDataState->pBatchBuff);
×
1862

1863
  taosMemoryFreeClear(pTsDataState);
×
1864
}
×
1865

1866
int32_t recoverTsData(STableTsDataState* pTsDataState) {
×
1867
  int32_t          code = TSDB_CODE_SUCCESS;
×
1868
  int32_t          lino = 0;
×
1869
  SStreamStateCur* pCur = createStateCursor(NULL);
×
1870
  streamStateParTagSeekKeyNext_rocksdb(pTsDataState->pState, INT64_MIN, pCur);
×
1871
  while (1) {
×
1872
    uint64_t tableUid = 0;
×
1873
    void*    pVal = NULL;
×
1874
    int32_t  len = 0;
×
1875
    int32_t  winCode = streamStateParTagGetKVByCur_rocksdb(pCur, &tableUid, &pVal, &len);
×
1876
    if (winCode != TSDB_CODE_SUCCESS) {
×
1877
      break;
×
1878
    }
1879
    if (pTsDataState->pkValLen != len) {
×
1880
      taosMemoryFree(pVal);
×
1881
      streamStateCurNext_rocksdb(pCur);
×
1882
      continue;
×
1883
    }
1884
    UNSET_TSDATA_FLAG(pVal, len);
×
1885
    code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pVal, len);
×
1886
    taosMemoryFree(pVal);
×
1887
    QUERY_CHECK_CODE(code, lino, _end);
×
1888
    streamStateCurNext_rocksdb(pCur);
×
1889
  }
1890

1891
_end:
×
1892
  streamStateFreeCur(pCur);
×
1893
  if (code != TSDB_CODE_SUCCESS) {
×
1894
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1895
  }
1896
  return code;
×
1897
}
1898

1899
SStreamStateCur* getLastStateCur(SStreamFileState* pFileState, getStateBuffFn fn) {
×
1900
  SStreamStateCur* pCur = createStateCursor(pFileState);
×
1901
  if (pCur == NULL) {
×
1902
    return NULL;
×
1903
  }
1904
  SSHashObj* pSearchBuff = fn(pFileState);
×
1905
  pCur->buffIndex = 0;
×
1906
  pCur->hashIter = 0;
×
1907
  pCur->pHashData = NULL;
×
1908
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
×
1909
  return pCur;
×
1910
}
1911

1912
void moveLastStateCurNext(SStreamStateCur* pCur, getStateBuffFn fn) {
×
1913
  SSHashObj* pSearchBuff = fn(pCur->pStreamFileState);
×
1914
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
×
1915
}
×
1916

1917
int32_t getNLastStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
1918
  int32_t code = TSDB_CODE_SUCCESS;
×
1919
  int32_t lino = 0;
×
1920
  SArray*  pWinStates = NULL;
×
1921
  int32_t size = 0;
×
1922

1923
  while(1) {
1924
    if (pCur->pHashData == NULL) {
×
1925
      return TSDB_CODE_FAILED;
×
1926
    }
1927
    pWinStates = *((void**)pCur->pHashData);
×
1928
    size = taosArrayGetSize(pWinStates);
×
1929
    if (size > 0) {
×
1930
      break;
×
1931
    }
1932
    moveLastStateCurNext(pCur, getSearchBuff);
×
1933
  }
1934

1935
  int32_t i = TMAX(size - num, 0);
×
1936

1937
  for ( ; i < size; i++) {
×
1938
    SWinKey* pKey = taosArrayGet(pWinStates, i);
×
1939
    int32_t  len = 0;
×
1940
    void*    pVal = NULL;
×
1941
    int32_t  winCode = TSDB_CODE_SUCCESS;
×
1942
    code = addRowBuffIfNotExist(pCur->pStreamFileState, (void*)pKey, sizeof(SWinKey), &pVal, &len, &winCode);
×
1943
    QUERY_CHECK_CODE(code, lino, _end);
×
1944

1945
    if (winCode != TSDB_CODE_SUCCESS) {
×
1946
      qError("%s failed at line %d since window not exist. ts:%" PRId64 ",groupId:%" PRIu64, __func__, __LINE__,
×
1947
             pKey->ts, pKey->groupId);
1948
    }
1949

1950
    void* pTempRes = taosArrayPush(pRes, &pVal);
×
1951
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
1952
  }
1953

1954
_end:
×
1955
  if (code != TSDB_CODE_SUCCESS) {
×
1956
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1957
  }
1958
  return code;
×
1959
}
1960

1961
int32_t reloadTsDataState(STableTsDataState* pTsDataState) {
×
1962
  int32_t code = TSDB_CODE_SUCCESS;
×
1963
  int32_t lino = 0;
×
1964

1965
  STableTsDataState tmpState = *pTsDataState;
×
1966
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
×
1967
  tmpState.pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
×
1968
  QUERY_CHECK_NULL(tmpState.pTableTsDataMap, code, lino, _end, terrno);
×
1969

1970
  code = recoverTsData(&tmpState);
×
1971
  QUERY_CHECK_CODE(code, lino, _end);
×
1972

1973
  void*   pIte = NULL;
×
1974
  int32_t iter = 0;
×
1975
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1976
    size_t keyLen = 0;
×
1977
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1978
    code = tSimpleHashPut(tmpState.pTableTsDataMap, pKey, keyLen, pIte, pTsDataState->pkValLen);
×
1979
    QUERY_CHECK_CODE(code, lino, _end);
×
1980
  }
1981
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
×
1982
  pTsDataState->pTableTsDataMap = tmpState.pTableTsDataMap;
×
1983

1984
_end:
×
1985
  if (code != TSDB_CODE_SUCCESS) {
×
1986
    tSimpleHashCleanup(tmpState.pTableTsDataMap);
×
1987
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1988
  }
1989
  return code;
×
1990
}
1991

1992
int32_t saveRecInfoToDisk(STableTsDataState* pTsDataState, SSessionKey* pKey, SRecDataInfo* pVal, int32_t vLen) {
×
1993
  int32_t code = TSDB_CODE_SUCCESS;
×
1994
  int32_t lino = 0;
×
1995

1996
  SStateSessionKey stateKey = {.key = *pKey, .opNum = ((SStreamState*)pTsDataState->pState)->number};
×
1997
  code = streamStatePutBatchOptimize(pTsDataState->pState, pTsDataState->cfgIndex, pTsDataState->pBatch, &stateKey, pVal, vLen, 0,
×
1998
                                     pTsDataState->pBatchBuff);
1999
  QUERY_CHECK_CODE(code, lino, _end);
×
2000

2001
  memset(pTsDataState->pBatchBuff, 0, pTsDataState->batchBufflen);
×
2002

2003
  if (streamStateGetBatchSize(pTsDataState->pBatch) >= BATCH_LIMIT) {
×
2004
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
2005
    streamStateClearBatch(pTsDataState->pBatch);
×
2006
    QUERY_CHECK_CODE(code, lino, _end);
×
2007
  }
2008

2009
_end:
×
2010
  if (code != TSDB_CODE_SUCCESS) {
×
2011
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2012
  }
2013
  return code;
×
2014
}
2015

2016
int32_t flushRemainRecInfoToDisk(STableTsDataState* pTsDataState) {
×
2017
  int32_t code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
2018
  streamStateClearBatch(pTsDataState->pBatch);
×
2019
  return code;
×
2020
}
2021

2022
int32_t recoverHashSortBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
2✔
2023
  int32_t code = TSDB_CODE_SUCCESS;
2✔
2024
  int32_t lino = 0;
2✔
2025

2026
  SWinKey          start = {.groupId = groupId, .ts = INT64_MAX};
2✔
2027
  void*            pState = getStateFileStore(pFileState);
2✔
2028
  SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
2✔
2029
  for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
4!
2030
    SWinKey tmpKey = {.groupId = groupId};
4✔
2031
    int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
4✔
2032
    if (tmpRes != TSDB_CODE_SUCCESS) {
4✔
2033
      break;
2✔
2034
    }
2035
    void* tmp = taosArrayPush(pWinStates, &tmpKey);
2✔
2036
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2!
2037
    streamStateCurPrev_rocksdb(pCur);
2✔
2038
  }
2039
  taosArraySort(pWinStates, winKeyCmprImpl);
2✔
2040
  streamStateFreeCur(pCur);
2✔
2041

2042
_end:
2✔
2043
  if (code != TSDB_CODE_SUCCESS) {
2!
2044
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2045
  }
2046
  return code;
2✔
2047
}
2048

2049
int32_t getRowStateAllPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum) {
×
2050
  int32_t    code = TSDB_CODE_SUCCESS;
×
2051
  int32_t    lino = 0;
×
2052
  SWinKey*   pPrevKey = NULL;
×
2053
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
×
2054
  void*      pState = getStateFileStore(pFileState);
×
2055
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
2056
  int32_t    num = 0;
×
2057
  if (ppBuff) {
×
2058
    SArray* pWinStates = (SArray*)(*ppBuff);
×
2059
    int32_t size = taosArrayGetSize(pWinStates);
×
2060
    int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
2061
    for (; index >= 0 && num <= maxNum; index--) {
×
2062
      pPrevKey = taosArrayGet(pWinStates, index);
×
2063
      if (winKeyCmprImpl(pPrevKey, pKey) == 0) {
×
2064
        continue;
×
2065
      }
2066
      void*   pVal = NULL;
×
2067
      int32_t len = 0;
×
2068
      int32_t winCode = TSDB_CODE_SUCCESS;
×
2069
      code = addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), &pVal, &len, &winCode);
×
2070
      QUERY_CHECK_CODE(code, lino, _end);
×
2071
      void* tempRes = taosArrayPush(pResArray, &pVal);
×
2072
      QUERY_CHECK_NULL(tempRes, code, lino, _end, terrno);
×
2073
      num++;
×
2074
    }
2075
  }
2076

2077
_end:
×
2078
  if (code != TSDB_CODE_SUCCESS) {
×
2079
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2080
  }
2081
  return code;
×
2082
}
2083

2084
int32_t setStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t mode) {
×
2085
  return tSimpleHashPut(pFileState->pRecFlagMap, pKey, keyLen, &mode, sizeof(int32_t));
×
2086
}
2087

2088
int32_t getStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t* pMode) {
×
2089
  void* pVal = tSimpleHashGet(pFileState->pRecFlagMap, pKey, keyLen);
×
2090
  if (pVal == NULL) {
×
2091
    return TSDB_CODE_FAILED;
×
2092
  }
2093
  *pMode = *(int32_t*) pVal;
×
2094
  return TSDB_CODE_SUCCESS;
×
2095
}
2096

2097
void clearExpiredSessionState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup) {
×
2098
  int32_t    code = TSDB_CODE_SUCCESS;
×
2099
  int32_t    lino = 0;
×
2100
  SSHashObj* pSessionBuff = pFileState->rowStateBuff;
×
2101
  SStreamSnapshot* pFlushList = NULL;
×
2102
  if (pFlushGroup != NULL) {
×
2103
    pFlushList = tdListNew(POINTER_BYTES);
×
2104
  }
2105
  void*      pIte = NULL;
×
2106
  int32_t    iter = 0;
×
2107
  while ((pIte = tSimpleHashIterate(pSessionBuff, pIte, &iter)) != NULL) {
×
2108
    SArray* pWinStates = *((void**)pIte);
×
2109
    int32_t arraySize = TARRAY_SIZE(pWinStates);
×
2110
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
×
2111
      SSessionKey key = {.win.skey = minTs, .win.ekey = minTs};
×
2112
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
2113
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
2114
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
2115
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
2116
    }
2117

2118
    int32_t size = arraySize - numOfKeep;
×
2119
    for (int32_t i = 0; i < size; i++) {
×
2120
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
2121
      SSessionKey* pKey = pPos->pKey;
×
2122
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
×
2123
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SSessionKey));
×
2124
      }
2125
      pPos->invalid = true;
×
2126

2127
      if (i == 0 && pFlushGroup != NULL) {
×
2128
        void* pGpVal = tSimpleHashGet(pFlushGroup, &pKey->groupId, sizeof(uint64_t));
×
2129
        if (pGpVal == NULL) {
×
2130
          code = tdListAppend(pFlushList, &pPos);
×
2131
          QUERY_CHECK_CODE(code, lino, _end);
×
2132
          code = tSimpleHashPut(pFlushGroup, &pKey->groupId, sizeof(uint64_t), NULL, 0);
×
2133
          QUERY_CHECK_CODE(code, lino, _end);
×
2134
          continue;
×
2135
        }
2136
      }
2137
      pPos->beFlushed = true;
×
2138
      qTrace("clear expired session buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d", pKey->win.skey, pKey->groupId, __func__, __LINE__);
×
2139

2140
      if (isFlushedState(pFileState, pKey->win.skey, 0)) {
×
2141
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
2142
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->win.skey, __func__, __LINE__, code_file);
×
2143
      }
2144
    }
2145
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
×
2146
  }
2147

2148
  if (pFlushList != NULL) {
×
2149
    flushSnapshot(pFileState, pFlushList, false);
×
2150
    code = clearRowBuffNonFlush(pFileState);
×
2151
    QUERY_CHECK_CODE(code, lino, _end);
×
2152
    tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
2153
  }
2154

2155
_end:
×
2156
  if (code != TSDB_CODE_SUCCESS) {
×
2157
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2158
  }
2159
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc