• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3658

14 Mar 2025 08:10AM UTC coverage: 63.25% (+0.4%) from 62.877%
#3658

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

148878 of 302527 branches covered (49.21%)

Branch coverage included in aggregate %.

88 of 99 new or added lines in 12 files covered. (88.89%)

3290 existing lines in 68 files now uncovered.

234027 of 302857 relevant lines covered (77.27%)

17847433.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.36
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "thash.h"
23
#include "tsimplehash.h"
24

25
#define FLUSH_RATIO                    0.5
26
#define FLUSH_NUM                      4
27
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
28
#define MIN_NUM_OF_ROW_BUFF            10240
29
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
30
#define MIN_NUM_SEARCH_BUCKET          128
31
#define MAX_ARRAY_SIZE                 1024
32
#define MAX_GROUP_ID_NUM               200000
33
#define NUM_OF_CACHE_WIN               64
34
#define MAX_NUM_OF_CACHE_WIN           128
35

36
#define TASK_KEY               "streamFileState"
37
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
38

39
struct SStreamFileState {
40
  SList*     usedBuffs;
41
  SList*     freeBuffs;
42
  void*      rowStateBuff;
43
  void*      pFileStore;
44
  int32_t    rowSize;
45
  int32_t    selectivityRowSize;
46
  int32_t    keyLen;
47
  uint64_t   preCheckPointVersion;
48
  uint64_t   checkPointVersion;
49
  TSKEY      maxTs;
50
  TSKEY      deleteMark;
51
  TSKEY      flushMark;
52
  uint64_t   maxRowCount;
53
  uint64_t   curRowCount;
54
  GetTsFun   getTs;
55
  char*      id;
56
  char*      cfName;
57
  void*      searchBuff;
58
  SSHashObj* pGroupIdMap;
59
  bool       hasFillCatch;
60

61
  _state_buff_cleanup_fn         stateBuffCleanupFn;
62
  _state_buff_remove_fn          stateBuffRemoveFn;
63
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
64
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
65

66
  _state_file_remove_fn stateFileRemoveFn;
67
  _state_file_get_fn    stateFileGetFn;
68

69
  _state_fun_get_fn stateFunctionGetFn;
70
};
71

72
typedef SRowBuffPos SRowBuffInfo;
73

74
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
9,756✔
75
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
9,756✔
76
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
9,756✔
77
}
78

79
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
3,995✔
80
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
3,995✔
81
  if (pos) {
3,995✔
82
    (*pos)->beFlushed = true;
2,186✔
83
  }
84
  return tSimpleHashRemove(pBuff, pKey, keyLen);
3,995✔
85
}
86

87
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,391,643✔
88
  size_t        keyLen = pFileState->keyLen;
6,391,643✔
89
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,391,643✔
90
  if (ppPos) {
6,391,643✔
91
    if ((*ppPos) == pPos) {
6,391,329!
92
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,391,329✔
93
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
6,391,329✔
94
    }
95
  }
96
}
6,391,643✔
97

98
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
99

100
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
4,404✔
101

102
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,660✔
103
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,660✔
104
}
105

106
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
268,966✔
107
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
268,966✔
108
}
109

110
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
2,031,426✔
111
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
2,031,426!
112
  if (pStateKey == NULL) {
2,031,426!
113
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
114
    return NULL;
×
115
  }
116
  SWinKey* pWinKey = pPos->pKey;
2,031,426✔
117
  pStateKey->key = *pWinKey;
2,031,426✔
118
  pStateKey->opNum = num;
2,031,426✔
119
  return pStateKey;
2,031,426✔
120
}
121

122
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
932✔
123
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
932!
124
  if (pStateKey == NULL) {
932!
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
126
    return NULL;
×
127
  }
128
  SWinKey* pWinKey = pPos->pKey;
932✔
129
  *pStateKey = *pWinKey;
932✔
130
  return pStateKey;
932✔
131
}
132

133
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
2,060✔
134
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
2,060✔
135
}
136

137
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
334✔
138
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
334✔
139
}
140

141
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,023✔
142
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
1,023!
143
  if (pStateKey == NULL) {
1,023!
144
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
145
    return NULL;
×
146
  }
147
  SSessionKey* pWinKey = pPos->pKey;
1,023✔
148
  pStateKey->key = *pWinKey;
1,023✔
149
  pStateKey->opNum = num;
1,023✔
150
  return pStateKey;
1,023✔
151
}
152

153
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
15!
154

155
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
1,162✔
156
  *pLen = sizeof(TSKEY);
1,162✔
157
  (*pVal) = taosMemoryCalloc(1, *pLen);
1,162!
158
  if ((*pVal) == NULL) {
1,162!
159
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
160
    return terrno;
×
161
  }
162
  void*   buff = *pVal;
1,162✔
163
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
1,162!
164
  return TSDB_CODE_SUCCESS;
1,162✔
165
}
166

167
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
6,193✔
168
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
169
                            SStreamFileState** ppFileState) {
170
  int32_t code = TSDB_CODE_SUCCESS;
6,193✔
171
  int32_t lino = 0;
6,193✔
172
  if (memSize <= 0) {
6,193!
173
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
174
  }
175
  if (rowSize == 0) {
6,193!
176
    code = TSDB_CODE_INVALID_PARA;
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178
  }
179

180
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
6,193!
181
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
6,193!
182

183
  rowSize += selectRowSize;
6,193✔
184
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
6,193✔
185
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
6,193✔
186
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,193!
187

188
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
6,193✔
189
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,193!
190

191
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
6,193✔
192
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
6,193✔
193
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,193✔
194
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
4,146✔
195
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
4,145✔
196
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
4,145✔
197
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
4,145✔
198
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
4,145✔
199

200
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
4,145✔
201
    pFileState->stateFileGetFn = intervalFileGetFn;
4,145✔
202
    pFileState->cfName = taosStrdup("state");
4,145!
203
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
4,146✔
204
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,047✔
205
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,787✔
206
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,787✔
207
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,787✔
208
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,787✔
209
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,787✔
210

211
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,787✔
212
    pFileState->stateFileGetFn = sessionFileGetFn;
1,787✔
213
    pFileState->cfName = taosStrdup("sess");
1,787!
214
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,787✔
215
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
260!
216
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
260✔
217
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
260✔
218
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
260!
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
260✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
260✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
260✔
222
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
260✔
223

224
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
260✔
225
    pFileState->stateFileGetFn = hashSortFileGetFn;
260✔
226
    pFileState->cfName = taosStrdup("fill");
260!
227
    pFileState->stateFunctionGetFn = NULL;
260✔
228
  }
229

230
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,193!
231
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,193!
232
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
6,193!
233
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
6,193!
234

235
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,193✔
236
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
70✔
237
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
70!
238
  }
239

240
  pFileState->keyLen = keySize;
6,193✔
241
  pFileState->rowSize = rowSize;
6,193✔
242
  pFileState->selectivityRowSize = selectRowSize;
6,193✔
243
  pFileState->preCheckPointVersion = 0;
6,193✔
244
  pFileState->checkPointVersion = 1;
6,193✔
245
  pFileState->pFileStore = pFile;
6,193✔
246
  pFileState->getTs = fp;
6,193✔
247
  pFileState->curRowCount = 0;
6,193✔
248
  pFileState->deleteMark = delMark;
6,193✔
249
  pFileState->flushMark = INT64_MIN;
6,193✔
250
  pFileState->maxTs = INT64_MIN;
6,193✔
251
  pFileState->id = taosStrdup(taskId);
6,193!
252
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
6,193!
253

254
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
6,193✔
255
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
6,191!
256

257
  pFileState->hasFillCatch = true;
6,191✔
258

259
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,191✔
260
    code = recoverSnapshot(pFileState, checkpointId);
4,144✔
261
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,047✔
262
    code = recoverSession(pFileState, checkpointId);
1,787✔
263
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
260!
264
    code = recoverFillSnapshot(pFileState, checkpointId);
260✔
265
  }
266
  QUERY_CHECK_CODE(code, lino, _end);
6,193!
267

268
  void*   valBuf = NULL;
6,193✔
269
  int32_t len = 0;
6,193✔
270
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
6,193✔
271
  if (tmpRes == TSDB_CODE_SUCCESS) {
6,193✔
272
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
15!
273
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
15✔
274
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
15✔
275
  }
276
  taosMemoryFreeClear(valBuf);
6,193!
277
  (*ppFileState) = pFileState;
6,193✔
278

279
_end:
6,193✔
280
  if (code != TSDB_CODE_SUCCESS) {
6,193!
281
    streamFileStateDestroy(pFileState);
×
282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
283
  }
284
  return code;
6,193✔
285
}
286

287
void destroyRowBuffPos(SRowBuffPos* pPos) {
9,113,590✔
288
  taosMemoryFreeClear(pPos->pKey);
9,113,590!
289
  taosMemoryFreeClear(pPos->pRowBuff);
9,113,596!
290
  taosMemoryFree(pPos);
9,113,597!
291
}
9,113,598✔
292

293
void destroyRowBuffPosPtr(void* ptr) {
605,391✔
294
  if (!ptr) {
605,391!
295
    return;
×
296
  }
297
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
605,391✔
298
  if (!pPos->beUsed) {
605,391✔
299
    destroyRowBuffPos(pPos);
1,732✔
300
  }
301
}
302

303
void destroyRowBuffAllPosPtr(void* ptr) {
3,090,014✔
304
  if (!ptr) {
3,090,014!
305
    return;
×
306
  }
307
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
3,090,014✔
308
  destroyRowBuffPos(pPos);
3,090,014✔
309
}
310

311
void destroyRowBuff(void* ptr) {
6,015,512✔
312
  if (!ptr) {
6,015,512!
313
    return;
×
314
  }
315
  taosMemoryFree(*(void**)ptr);
6,015,512!
316
}
317

318
void streamFileStateDestroy(SStreamFileState* pFileState) {
13,726✔
319
  if (!pFileState) {
13,726✔
320
    return;
7,535✔
321
  }
322

323
  taosMemoryFree(pFileState->id);
6,191!
324
  taosMemoryFree(pFileState->cfName);
6,192!
325
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
6,191✔
326
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
6,192✔
327
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
6,192✔
328
  sessionWinStateCleanup(pFileState->searchBuff);
6,192✔
329
  tSimpleHashCleanup(pFileState->pGroupIdMap);
6,192✔
330
  taosMemoryFree(pFileState);
6,192!
331
}
332

333
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,623,817✔
334
  int32_t code = TSDB_CODE_SUCCESS;
6,623,817✔
335
  int32_t lino = 0;
6,623,817✔
336
  if (pPos->pRowBuff) {
6,623,817✔
337
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
6,623,769✔
338
    QUERY_CHECK_CODE(code, lino, _end);
6,623,769!
339
    pPos->pRowBuff = NULL;
6,623,769✔
340
  }
341

342
_end:
48✔
343
  if (code != TSDB_CODE_SUCCESS) {
6,623,817!
344
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
345
  }
346
  return code;
6,623,817✔
347
}
348

349
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
6,357✔
350
  int32_t   code = TSDB_CODE_SUCCESS;
6,357✔
351
  int32_t   lino = 0;
6,357✔
352
  SListIter iter = {0};
6,357✔
353
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
6,357✔
354

355
  SListNode* pNode = NULL;
6,357✔
356
  while ((pNode = tdListNext(&iter)) != NULL) {
7,914,909✔
357
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
7,908,552✔
358
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
7,908,552!
359
      code = putFreeBuff(pFileState, pPos);
6,017,696✔
360
      QUERY_CHECK_CODE(code, lino, _end);
6,017,696!
361

362
      if (!all) {
6,017,696✔
363
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,660✔
364
      }
365
      destroyRowBuffPos(pPos);
6,017,696✔
366
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
6,017,696✔
367
      taosMemoryFreeClear(tmp);
6,017,696!
368
    }
369
  }
370

371
_end:
6,357✔
372
  if (code != TSDB_CODE_SUCCESS) {
6,357!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
}
6,357✔
376

377
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
8,551✔
378
  int32_t   code = TSDB_CODE_SUCCESS;
8,551✔
379
  int32_t   lino = 0;
8,551✔
380
  uint64_t  i = 0;
8,551✔
381
  SListIter iter = {0};
8,551✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
8,551✔
383

384
  SListNode* pNode = NULL;
8,550✔
385
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
2,427,999✔
386
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,419,449✔
387
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
2,419,449✔
388
      if (all || !pPos->beUsed) {
1,215,248!
389
        if (all && !pPos->pRowBuff) {
1,172!
390
          continue;
×
391
        }
392
        code = tdListAppend(pFlushList, &pPos);
1,172✔
393
        QUERY_CHECK_CODE(code, lino, _end);
1,172!
394

395
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
1,172✔
396
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
1,172✔
397
        if (pPos->beUsed == false) {
1,172!
398
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
1,172✔
399
          taosMemoryFreeClear(tmp);
1,172!
400
        }
401
        if (pPos->pRowBuff) {
1,172✔
402
          i++;
1,124✔
403
        }
404
      }
405
    }
406
  }
407
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
8,548✔
408

409
_end:
3✔
410
  if (code != TSDB_CODE_SUCCESS) {
8,552!
411
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
412
  }
413
  return code;
8,554✔
414
}
415

416
void streamFileStateClear(SStreamFileState* pFileState) {
1,739✔
417
  pFileState->flushMark = INT64_MIN;
1,739✔
418
  pFileState->maxTs = INT64_MIN;
1,739✔
419
  tSimpleHashClear(pFileState->rowStateBuff);
1,739✔
420
  clearExpiredRowBuff(pFileState, 0, true);
1,739✔
421
}
1,739✔
422

423
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
252✔
424

425
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
8,439,076✔
426

427
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
6,800✔
428
  int32_t   code = TSDB_CODE_SUCCESS;
6,800✔
429
  int32_t   lino = 0;
6,800✔
430
  uint64_t  i = 0;
6,800✔
431
  SListIter iter = {0};
6,800✔
432
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
6,800✔
433

434
  SListNode* pNode = NULL;
6,799✔
435
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
4,232,813✔
436
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
4,226,014✔
437
    if (pPos->beUsed == used) {
4,226,014✔
438
      if (used && !pPos->pRowBuff) {
1,811,469✔
439
        continue;
1,207,250✔
440
      }
441
      code = tdListAppend(pFlushList, &pPos);
604,219✔
442
      QUERY_CHECK_CODE(code, lino, _end);
604,219!
443

444
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
604,219✔
445
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
604,219✔
446
      if (pPos->beUsed == false) {
604,219✔
447
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
560✔
448
        taosMemoryFreeClear(tmp);
560!
449
      }
450
      if (pPos->pRowBuff) {
604,219!
451
        i++;
604,219✔
452
      }
453
    }
454
  }
455

456
  qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
6,799!
457

458
_end:
×
459
  if (code != TSDB_CODE_SUCCESS) {
6,800!
460
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
461
  }
462
  return code;
6,800✔
463
}
464

465
int32_t flushRowBuff(SStreamFileState* pFileState) {
4,403✔
466
  int32_t          code = TSDB_CODE_SUCCESS;
4,403✔
467
  int32_t          lino = 0;
4,403✔
468
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
4,403✔
469
  if (!pFlushList) {
4,402!
470
    code = TSDB_CODE_OUT_OF_MEMORY;
×
471
    QUERY_CHECK_CODE(code, lino, _end);
×
472
  }
473

474
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
4,402✔
475
  num = TMAX(num, FLUSH_NUM);
4,402✔
476
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
4,402✔
477
  QUERY_CHECK_CODE(code, lino, _end);
4,404!
478

479
  if (isListEmpty(pFlushList)) {
4,404✔
480
    code = popUsedBuffs(pFileState, pFlushList, num, false);
3,586✔
481
    QUERY_CHECK_CODE(code, lino, _end);
3,586!
482

483
    if (isListEmpty(pFlushList)) {
3,586✔
484
      code = popUsedBuffs(pFileState, pFlushList, num, true);
3,214✔
485
      QUERY_CHECK_CODE(code, lino, _end);
3,214!
486
    }
487
  }
488

489
  if (pFileState->searchBuff) {
4,404✔
490
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
4,150✔
491
    QUERY_CHECK_CODE(code, lino, _end);
4,150!
492
  }
493

494
  flushSnapshot(pFileState, pFlushList, false);
4,404✔
495

496
  SListIter fIter = {0};
4,404✔
497
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
4,404✔
498
  SListNode* pNode = NULL;
4,404✔
499
  while ((pNode = tdListNext(&fIter)) != NULL) {
609,795✔
500
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
605,391✔
501
    code = putFreeBuff(pFileState, pPos);
605,391✔
502
    QUERY_CHECK_CODE(code, lino, _end);
605,391!
503
  }
504

505
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
4,404✔
506

507
_end:
4,404✔
508
  if (code != TSDB_CODE_SUCCESS) {
4,404!
509
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
510
  }
511
  return code;
4,404✔
512
}
513

514
int32_t clearRowBuff(SStreamFileState* pFileState) {
4,401✔
515
  if (pFileState->deleteMark != INT64_MAX) {
4,401!
516
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
517
  }
518
  do {
519
    int32_t code = flushRowBuff(pFileState);
4,402✔
520
    if (code != TSDB_CODE_SUCCESS) {
4,404!
521
      return code;
×
522
    }
523
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
4,404✔
524
  return TSDB_CODE_SUCCESS;
4,403✔
525
}
526

527
void* getFreeBuff(SStreamFileState* pFileState) {
9,113,993✔
528
  SList*     lists = pFileState->freeBuffs;
9,113,993✔
529
  int32_t    buffSize = pFileState->rowSize;
9,113,993✔
530
  SListNode* pNode = tdListPopHead(lists);
9,113,993✔
531
  if (!pNode) {
9,113,950✔
532
    return NULL;
8,603,428✔
533
  }
534
  void* ptr = *(void**)pNode->data;
510,522✔
535
  memset(ptr, 0, buffSize);
510,522✔
536
  taosMemoryFree(pNode);
510,522!
537
  return ptr;
510,524✔
538
}
539

540
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
541
  if (pPos->pRowBuff) {
×
542
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
543
  }
544
}
×
545

546
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
9,112,910✔
547
  int32_t      code = TSDB_CODE_SUCCESS;
9,112,910✔
548
  int32_t      lino = 0;
9,112,910✔
549
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
9,112,910!
550
  if (!pPos) {
9,113,422!
551
    code = terrno;
×
552
    QUERY_CHECK_CODE(code, lino, _error);
×
553
  }
554

555
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
9,113,422!
556
  if (!pPos->pKey) {
9,113,430!
557
    code = terrno;
×
558
    QUERY_CHECK_CODE(code, lino, _error);
×
559
  }
560

561
  void* pBuff = getFreeBuff(pFileState);
9,113,430✔
562
  if (pBuff) {
9,113,223✔
563
    pPos->pRowBuff = pBuff;
509,951✔
564
    goto _end;
509,951✔
565
  }
566

567
  if (pFileState->curRowCount < pFileState->maxRowCount) {
8,603,272✔
568
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
8,603,094!
569
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
8,603,257!
570
    pPos->pRowBuff = pBuff;
8,603,257✔
571
    pFileState->curRowCount++;
8,603,257✔
572
    goto _end;
8,603,257✔
573
  }
574

575
  code = clearRowBuff(pFileState);
178✔
576
  QUERY_CHECK_CODE(code, lino, _error);
153!
577

578
  pPos->pRowBuff = getFreeBuff(pFileState);
153✔
579

580
_end:
9,113,361✔
581
  code = tdListAppend(pFileState->usedBuffs, &pPos);
9,113,361✔
582
  QUERY_CHECK_CODE(code, lino, _error);
9,113,335!
583

584
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
9,113,335!
585
_error:
9,113,335✔
586
  if (code != TSDB_CODE_SUCCESS) {
9,113,335!
587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
588
    return NULL;
×
589
  }
590

591
  return pPos;
9,113,335✔
592
}
593

594
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
9,112,929✔
595
  int32_t      code = TSDB_CODE_SUCCESS;
9,112,929✔
596
  int32_t      lino = 0;
9,112,929✔
597
  SRowBuffPos* newPos = getNewRowPos(pFileState);
9,112,929✔
598
  if (!newPos) {
9,113,319!
599
    code = TSDB_CODE_OUT_OF_MEMORY;
×
600
    QUERY_CHECK_CODE(code, lino, _error);
×
601
  }
602
  newPos->beUsed = true;
9,113,319✔
603
  newPos->beFlushed = false;
9,113,319✔
604
  newPos->needFree = false;
9,113,319✔
605
  newPos->beUpdated = true;
9,113,319✔
606
  return newPos;
9,113,319✔
607

608
_error:
×
609
  if (code != TSDB_CODE_SUCCESS) {
×
610
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
611
  }
612
  return NULL;
×
613
}
614

615
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
9,752,281✔
616
                             int32_t* pWinCode) {
617
  int32_t code = TSDB_CODE_SUCCESS;
9,752,281✔
618
  int32_t lino = 0;
9,752,281✔
619
  (*pWinCode) = TSDB_CODE_SUCCESS;
9,752,281✔
620
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
9,752,281✔
621
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
9,752,227✔
622
  if (pos) {
9,752,358✔
623
    if (pVal != NULL) {
650,726!
624
      *pVLen = pFileState->rowSize;
650,734✔
625
      *pVal = *pos;
650,734✔
626
      (*pos)->beUsed = true;
650,734✔
627
      (*pos)->beFlushed = false;
650,734✔
628
    }
629
    goto _end;
650,726✔
630
  }
631
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
9,101,632✔
632
  if (!pNewPos || !pNewPos->pRowBuff) {
9,101,954!
UNCOV
633
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
634
    QUERY_CHECK_CODE(code, lino, _end);
×
635
  }
636

637
  memcpy(pNewPos->pKey, pKey, keyLen);
9,101,954✔
638
  (*pWinCode) = TSDB_CODE_FAILED;
9,101,954✔
639

640
  TSKEY ts = pFileState->getTs(pKey);
9,101,954✔
641
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
9,101,828✔
642
    int32_t len = 0;
269,582✔
643
    void*   p = NULL;
269,582✔
644
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
269,582✔
645
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
269,582✔
646
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
269,582✔
647
      memcpy(pNewPos->pRowBuff, p, len);
889✔
648
    }
649
    taosMemoryFree(p);
269,582!
650
  }
651

652
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
9,101,611✔
653
  QUERY_CHECK_CODE(code, lino, _end);
9,101,540!
654

655
  if (pVal) {
9,101,540!
656
    *pVLen = pFileState->rowSize;
9,101,544✔
657
    *pVal = pNewPos;
9,101,544✔
658
  }
659

660
_end:
×
661
  if (code != TSDB_CODE_SUCCESS) {
9,752,266!
662
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
663
  }
664
  return code;
9,752,064✔
665
}
666

667
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
5,635✔
668
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
5,635✔
669
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
5,634✔
670
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
5,634✔
671
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
5,632✔
672
  if (pFileState->searchBuff != NULL) {
5,632✔
673
    deleteHashSortRowBuff(pFileState, pKey);
41✔
674
  }
675
}
5,632✔
676

677
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
55✔
678
  SSHashObj* pRowMap = pFileState->rowStateBuff;
55✔
679
  void*   pIte = NULL;
55✔
680
  int32_t iter = 0;
55✔
681
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
115✔
682
    size_t keyLen = 0;
60✔
683
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
60!
684
    if (pKey->groupId == groupId) {
60!
685
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
60✔
686
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
60!
687
    }
688
  }
689

690
  while (1) {
340✔
691
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
395✔
692
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
395✔
693
    SWinKey delKey = {.groupId = groupId};
395✔
694
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
395✔
695
    if (code != TSDB_CODE_SUCCESS) {
395✔
696
      break;
55✔
697
    }
698
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
340✔
699
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
340!
700
  }
701
}
55✔
702

703
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
422✔
704
  int32_t code = TSDB_CODE_SUCCESS;
422✔
705
  int32_t lino = 0;
422✔
706
  int32_t len = 0;
422✔
707
  void*   pBuff = NULL;
422✔
708
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
422✔
709
  QUERY_CHECK_CODE(code, lino, _end);
422!
710
  memcpy(pPos->pRowBuff, pBuff, len);
422✔
711
  taosMemoryFree(pBuff);
422!
712

713
_end:
422✔
714
  if (code != TSDB_CODE_SUCCESS) {
422!
715
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
716
  }
717
  return code;
422✔
718
}
719

720
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
422✔
721
  int32_t code = TSDB_CODE_SUCCESS;
422✔
722
  int32_t lino = 0;
422✔
723
  pPos->pRowBuff = getFreeBuff(pFileState);
422✔
724
  if (!pPos->pRowBuff) {
422✔
725
    if (pFileState->curRowCount < pFileState->maxRowCount) {
102✔
726
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
2!
727
      if (!pPos->pRowBuff) {
2!
728
        code = terrno;
×
729
        QUERY_CHECK_CODE(code, lino, _end);
×
730
      }
731
      pFileState->curRowCount++;
2✔
732
    } else {
733
      code = clearRowBuff(pFileState);
100✔
734
      QUERY_CHECK_CODE(code, lino, _end);
100!
735
      pPos->pRowBuff = getFreeBuff(pFileState);
100✔
736
    }
737
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
102!
738
  }
739

740
  code = recoverSessionRowBuff(pFileState, pPos);
422✔
741
  QUERY_CHECK_CODE(code, lino, _end);
422!
742

743
_end:
422✔
744
  if (code != TSDB_CODE_SUCCESS) {
422!
745
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
746
  }
747
  return code;
422✔
748
}
749

750
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
8,403,354✔
751
  int32_t code = TSDB_CODE_SUCCESS;
8,403,354✔
752
  int32_t lino = 0;
8,403,354✔
753
  if (pPos->pRowBuff) {
8,403,354✔
754
    if (pPos->needFree) {
8,402,931!
755
      code = recoverSessionRowBuff(pFileState, pPos);
×
756
      QUERY_CHECK_CODE(code, lino, _end);
×
757
    }
758
    (*pVal) = pPos->pRowBuff;
8,402,931✔
759
    goto _end;
8,402,931✔
760
  }
761

762
  code = recoverStateRowBuff(pFileState, pPos);
423✔
763
  QUERY_CHECK_CODE(code, lino, _end);
422!
764

765
  (*pVal) = pPos->pRowBuff;
422✔
766
  // if (!pPos->needFree) {
767
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
768
  //   QUERY_CHECK_CODE(code, lino, _end);
769
  // }
770

771
_end:
8,403,353✔
772
  if (code != TSDB_CODE_SUCCESS) {
8,403,353!
773
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
774
  }
775
  return code;
8,403,349✔
776
}
777

778
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
23✔
779
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
23✔
780
  if (pos) {
23✔
781
    return true;
11✔
782
  }
783
  return false;
12✔
784
}
785

786
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
4,618✔
787
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,940✔
788
                     ? INT64_MIN
789
                     : pFileState->maxTs - pFileState->deleteMark;
6,558✔
790
  clearExpiredRowBuff(pFileState, mark, false);
4,618✔
791
  return pFileState->usedBuffs;
4,618✔
792
}
793

794
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
9,022✔
795
  int32_t   code = TSDB_CODE_SUCCESS;
9,022✔
796
  int32_t   lino = 0;
9,022✔
797
  SListIter iter = {0};
9,022✔
798
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
9,022✔
799

800
  const int32_t BATCH_LIMIT = 256;
9,022✔
801

802
  int64_t    st = taosGetTimestampMs();
9,022✔
803
  SListNode* pNode = NULL;
9,022✔
804

805
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
9,022✔
806

807
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
9,022✔
808
  char*   buf = taosMemoryCalloc(1, len);
9,022!
809
  if (!buf) {
9,021!
810
    code = terrno;
×
811
    QUERY_CHECK_CODE(code, lino, _end);
×
812
  }
813

814
  void* batch = streamStateCreateBatch();
9,021✔
815
  if (!batch) {
9,022!
816
    code = TSDB_CODE_OUT_OF_MEMORY;
×
817
    QUERY_CHECK_CODE(code, lino, _end);
×
818
  }
819

820
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
2,505,269!
821
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,496,247✔
822
    if (pPos->beFlushed || !pPos->pRowBuff) {
2,496,247!
823
      continue;
462,866✔
824
    }
825
    pPos->beFlushed = true;
2,033,381✔
826
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
2,033,381✔
827

828
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
2,033,381✔
829
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
2,033,381✔
830
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
7,858✔
831
      streamStateClearBatch(batch);
7,858✔
832
      QUERY_CHECK_CODE(code, lino, _end);
7,858!
833
    }
834

835
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
2,033,381✔
836
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
2,033,381!
837

838
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
2,033,381✔
839
                                       0, buf);
840
    taosMemoryFreeClear(pSKey);
2,033,381!
841
    QUERY_CHECK_CODE(code, lino, _end);
2,033,381!
842
    // todo handle failure
843
    memset(buf, 0, len);
2,033,381✔
844
  }
845
  taosMemoryFreeClear(buf);
9,021!
846

847
  int32_t numOfElems = streamStateGetBatchSize(batch);
9,021✔
848
  if (numOfElems > 0) {
9,021✔
849
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
2,060✔
850
    QUERY_CHECK_CODE(code, lino, _end);
2,060!
851
  } else {
852
    goto _end;
6,961✔
853
  }
854

855
  streamStateClearBatch(batch);
2,060✔
856

857
  clearSearchBuff(pFileState);
2,060✔
858

859
  int64_t elapsed = taosGetTimestampMs() - st;
2,060✔
860
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
2,060✔
861
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
862

863
  if (flushState) {
2,060✔
864
    void*   valBuf = NULL;
1,162✔
865
    int32_t len = 0;
1,162✔
866
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
1,162✔
867
    QUERY_CHECK_CODE(code, lino, _end);
1,162!
868

869
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
1,162✔
870
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
1,162✔
871
    taosMemoryFree(valBuf);
1,162!
872
    QUERY_CHECK_CODE(code, lino, _end);
1,162!
873

874
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,162✔
875
    QUERY_CHECK_CODE(code, lino, _end);
1,162!
876
  }
877

878
_end:
898✔
879
  if (code != TSDB_CODE_SUCCESS) {
9,021!
880
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
881
  }
882
  taosMemoryFree(buf);
9,021!
883
  streamStateDestroyBatch(batch);
9,021✔
884
}
9,022✔
885

886
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
887
  char keyBuf[128] = {0};
×
888
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
889
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
890
}
891

892
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
893
  int32_t code = TSDB_CODE_SUCCESS;
×
894
  int64_t maxCheckPointId = 0;
×
895
  {
896
    char    buf[128] = {0};
×
897
    void*   val = NULL;
×
898
    int32_t len = 0;
×
899
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
900
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
901
    if (code != 0 || len == 0 || val == NULL) {
×
902
      return TSDB_CODE_FAILED;
×
903
    }
904
    memcpy(buf, val, len);
×
905
    buf[len] = 0;
×
906
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
907
    taosMemoryFree(val);
×
908
  }
909
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
910
    char    buf[128] = {0};
×
911
    void*   val = 0;
×
912
    int32_t len = 0;
×
913
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
914
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
915
    if (code != 0) {
×
916
      return TSDB_CODE_FAILED;
×
917
    }
918
    memcpy(buf, val, len);
×
919
    buf[len] = 0;
×
920
    taosMemoryFree(val);
×
921

922
    TSKEY ts;
923
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
924
    if (ts < mark) {
×
925
      // statekey winkey.ts < mark
926
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
927
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
928
      break;
×
929
    }
930
  }
931
  return code;
×
932
}
933

934
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,787✔
935
  int32_t code = TSDB_CODE_SUCCESS;
1,787✔
936
  int32_t lino = 0;
1,787✔
937
  int32_t winRes = TSDB_CODE_SUCCESS;
1,787✔
938
  if (pFileState->maxTs != INT64_MIN) {
1,787!
939
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
940
                       ? INT64_MIN
941
                       : pFileState->maxTs - pFileState->deleteMark;
×
942
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
943
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
944
  }
945

946
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,787✔
947
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,787✔
948
  while (winRes == TSDB_CODE_SUCCESS) {
1,787!
949
    if (pFileState->curRowCount >= recoverNum) {
1,787!
950
      break;
1,787✔
951
    }
952

953
    void*       pVal = NULL;
1,787✔
954
    int32_t     vlen = 0;
1,787✔
955
    SSessionKey key = {0};
1,787✔
956
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,787✔
957
    if (winRes != TSDB_CODE_SUCCESS) {
1,787!
958
      break;
1,787✔
959
    }
960

961
    if (vlen != pFileState->rowSize) {
×
962
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
963
      QUERY_CHECK_CODE(code, lino, _end);
×
964
    }
965

966
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
967
    pPos->beUsed = false;
×
968
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
969
    if (winRes != TSDB_CODE_SUCCESS) {
×
970
      break;
×
971
    }
972

973
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
974
  }
975

976
_end:
×
977
  if (code != TSDB_CODE_SUCCESS) {
1,787!
978
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
979
  }
980
  streamStateFreeCur(pCur);
1,787✔
981
  return code;
1,787✔
982
}
983

984
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
4,144✔
985
  int32_t code = TSDB_CODE_SUCCESS;
4,144✔
986
  int32_t lino = 0;
4,144✔
987
  int32_t winCode = TSDB_CODE_SUCCESS;
4,144✔
988
  if (pFileState->maxTs != INT64_MIN) {
4,144!
989
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
990
                       ? INT64_MIN
991
                       : pFileState->maxTs - pFileState->deleteMark;
×
992
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
993
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
994
  }
995

996
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
4,144✔
997
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
4,146✔
998
  while (winCode == TSDB_CODE_SUCCESS) {
4,146!
999
    if (pFileState->curRowCount >= recoverNum) {
4,146!
1000
      break;
4,146✔
1001
    }
1002

1003
    void*        pVal = NULL;
4,146✔
1004
    int32_t      vlen = 0;
4,146✔
1005
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4,146✔
1006
    if (!pNewPos || !pNewPos->pRowBuff) {
4,146!
1007
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1008
      QUERY_CHECK_CODE(code, lino, _end);
×
1009
    }
1010

1011
    winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4,146✔
1012
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
4,146✔
1013
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
4,146!
1014
      destroyRowBuffPos(pNewPos);
4,146✔
1015
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
4,146✔
1016
      taosMemoryFreeClear(pNode);
4,146!
1017
      taosMemoryFreeClear(pVal);
4,146!
1018
      break;
4,146✔
1019
    }
1020
    if (vlen != pFileState->rowSize) {
×
1021
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1022
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1023
      taosMemoryFreeClear(pVal);
×
1024
      QUERY_CHECK_CODE(code, lino, _end);
×
1025
    }
1026
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1027
    taosMemoryFreeClear(pVal);
×
1028
    pNewPos->beFlushed = true;
×
1029
    pNewPos->beUsed = false;
×
1030
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1031
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1032
    if (code != TSDB_CODE_SUCCESS) {
×
1033
      destroyRowBuffPos(pNewPos);
×
1034
      break;
×
1035
    }
1036
    streamStateCurPrev_rocksdb(pCur);
×
1037
  }
1038

1039
_end:
×
1040
  if (code != TSDB_CODE_SUCCESS) {
4,146!
1041
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1042
  }
1043
  streamStateFreeCur(pCur);
4,146✔
1044
  return code;
4,146✔
1045
}
1046

1047
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
113,679✔
1048

1049
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,503✔
1050
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,503✔
1051
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,503✔
1052
}
1,503✔
1053

1054
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
41,421✔
1055
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
9,643,053✔
1056

1057
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
21,698✔
1058

1059
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
9,103,048✔
1060
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
14,905,046!
1061
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,801,998✔
1062
}
1063

1064
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
8,057,391✔
1065

1066
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
849✔
1067

1068
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
114,621✔
1069

1070
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
113,680✔
1071
  int32_t winCode = TSDB_CODE_SUCCESS;
113,680✔
1072
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
113,680✔
1073
}
1074

1075
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
260✔
1076
  int32_t code = TSDB_CODE_SUCCESS;
260✔
1077
  int32_t lino = 0;
260✔
1078
  if (pFileState->maxTs != INT64_MIN) {
260!
1079
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1080
                       ? INT64_MIN
1081
                       : pFileState->maxTs - pFileState->deleteMark;
×
1082
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1083
    QUERY_CHECK_CODE(code, lino, _end);
×
1084
  }
1085

1086
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
260✔
1087
  if (pCur == NULL) {
260✔
1088
    return code;
258✔
1089
  }
1090
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1091
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1092
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1093
    if (pFileState->curRowCount >= recoverNum) {
4!
1094
      break;
2✔
1095
    }
1096

1097
    void*        pVal = NULL;
4✔
1098
    int32_t      vlen = 0;
4✔
1099
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1100
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1101
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1102
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1103
      destroyRowBuffPos(pNewPos);
2✔
1104
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1105
      taosMemoryFreeClear(pNode);
2!
1106
      taosMemoryFreeClear(pVal);
2!
1107
      break;
2✔
1108
    }
1109

1110
    if (vlen != pFileState->rowSize) {
2!
1111
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1112
      destroyRowBuffPos(pNewPos);
×
1113
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1114
      taosMemoryFreeClear(pVal);
×
1115
      QUERY_CHECK_CODE(code, lino, _end);
×
1116
    }
1117

1118
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1119
    taosMemoryFreeClear(pVal);
2!
1120
    pNewPos->beFlushed = true;
2✔
1121
    pNewPos->beUsed = false;
2✔
1122
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1123
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1124
    if (winRes != TSDB_CODE_SUCCESS) {
2!
1125
      destroyRowBuffPos(pNewPos);
×
1126
      break;
×
1127
    }
1128
    streamStateCurPrev_rocksdb(pCur);
2✔
1129
  }
1130
  streamStateFreeCur(pCur);
2✔
1131

1132
_end:
2✔
1133
  if (code != TSDB_CODE_SUCCESS) {
2!
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
  }
1136
  return code;
2✔
1137
}
1138

1139
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
1,663✔
1140
                   int32_t* pWinCode) {
1141
  int32_t code = TSDB_CODE_SUCCESS;
1,663✔
1142
  int32_t lino = 0;
1,663✔
1143
  (*pWinCode) = TSDB_CODE_FAILED;
1,663✔
1144
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
1,663✔
1145
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
1,663✔
1146
  if (ppPos) {
1,663✔
1147
    *pVLen = pFileState->rowSize;
818✔
1148
    *pVal = *ppPos;
818✔
1149
    (*ppPos)->beUsed = true;
818✔
1150
    (*ppPos)->beFlushed = false;
818✔
1151
    (*pWinCode) = TSDB_CODE_SUCCESS;
818✔
1152
    if ((*ppPos)->pRowBuff == NULL) {
818!
1153
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1154
      QUERY_CHECK_CODE(code, lino, _end);
×
1155
    }
1156
    goto _end;
818✔
1157
  }
1158
  TSKEY ts = pFileState->getTs(pKey);
845✔
1159
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
845!
1160
    int32_t len = 0;
18✔
1161
    void*   p = NULL;
18✔
1162
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
18✔
1163
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
18!
1164
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
18✔
1165
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
14✔
1166
      if (!pNewPos || !pNewPos->pRowBuff) {
14!
1167
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1168
        QUERY_CHECK_CODE(code, lino, _end);
×
1169
      }
1170

1171
      memcpy(pNewPos->pKey, pKey, keyLen);
14✔
1172
      memcpy(pNewPos->pRowBuff, p, len);
14✔
1173
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
14✔
1174
      QUERY_CHECK_CODE(code, lino, _end);
14!
1175

1176
      if (pVal) {
14!
1177
        *pVLen = pFileState->rowSize;
14✔
1178
        *pVal = pNewPos;
14✔
1179
      }
1180
    }
1181
    taosMemoryFree(p);
18!
1182
  }
1183

1184
_end:
827✔
1185
  if (code != TSDB_CODE_SUCCESS) {
1,663!
1186
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1187
  }
1188
  return code;
1,662✔
1189
}
1190

1191
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
456✔
1192
  int32_t code = TSDB_CODE_SUCCESS;
456✔
1193
  int32_t lino = 0;
456✔
1194
  if (value != NULL) {
456!
1195
    code = TSDB_CODE_INVALID_PARA;
×
1196
    QUERY_CHECK_CODE(code, lino, _end);
×
1197
  }
1198

1199
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
456✔
1200
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
161!
1201
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
161✔
1202
      QUERY_CHECK_CODE(code, lino, _end);
161!
1203
    }
1204
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
161✔
1205
    QUERY_CHECK_CODE(code, lino, _end);
161!
1206
  }
1207

1208
_end:
456✔
1209
  if (code != TSDB_CODE_SUCCESS) {
456!
1210
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1211
  }
1212
  return code;
456✔
1213
}
1214

1215
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
1,222✔
1216
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
1,222✔
1217
  if (pCur->hashIter == -1) {
1,222✔
1218
    streamStateCurNext(pFileState->pFileStore, pCur);
70✔
1219
    return;
70✔
1220
  }
1221

1222
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,152!
1223
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
1,152✔
1224

1225
  SSHashObj* pHash = pFileState->pGroupIdMap;
1,152✔
1226
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
1,152✔
1227
  if (!pCur->pHashData) {
1,152✔
1228
    pCur->hashIter = -1;
738✔
1229
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
738✔
1230
    return;
738✔
1231
  }
1232
}
1233

1234
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
3,780✔
1235
  int32_t code = TSDB_CODE_SUCCESS;
3,780✔
1236
  if (pCur->pHashData) {
3,780✔
1237
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,152!
1238
    return code;
1,152✔
1239
  }
1240
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
2,628✔
1241
}
1242

1243
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
2,558✔
1244
  return pFileState->pGroupIdMap;
2,558✔
1245
}
1246

1247
void clearExpiredState(SStreamFileState* pFileState) {
4,150✔
1248
  int32_t    code = TSDB_CODE_SUCCESS;
4,150✔
1249
  int32_t    lino = 0;
4,150✔
1250
  SSHashObj* pSearchBuff = pFileState->searchBuff;
4,150✔
1251
  void*      pIte = NULL;
4,150✔
1252
  int32_t    iter = 0;
4,150✔
1253
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
6,128✔
1254
    SArray* pWinStates = *((void**)pIte);
1,978✔
1255
    int32_t size = taosArrayGetSize(pWinStates);
1,978✔
1256
    for (int32_t i = 0; i < size - 1; i++) {
2,398✔
1257
      SWinKey* pKey = taosArrayGet(pWinStates, i);
420✔
1258
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
420✔
1259
      qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff);
420!
1260

1261
      if (isFlushedState(pFileState, pKey->ts, 0)) {
420✔
1262
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
336✔
1263
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
336!
1264
      }
1265
    }
1266
    taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
1,978✔
1267
  }
1268
  code = clearRowBuff(pFileState);
4,146✔
1269
  QUERY_CHECK_CODE(code, lino, _end);
4,150!
1270

1271
_end:
4,150✔
1272
  if (code != TSDB_CODE_SUCCESS) {
4,150!
1273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1274
  }
1275
}
4,150✔
1276

1277
#ifdef BUILD_NO_CALL
1278
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1279
                           int32_t* pWinCode) {
1280
  int32_t code = TSDB_CODE_SUCCESS;
1281
  int32_t lino = 0;
1282

1283
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1284
  QUERY_CHECK_CODE(code, lino, _end);
1285

1286
  SArray*    pWinStates = NULL;
1287
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1288
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1289
  if (ppBuff) {
1290
    pWinStates = (SArray*)(*ppBuff);
1291
  } else {
1292
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1293
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1294

1295
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1296
    QUERY_CHECK_CODE(code, lino, _end);
1297
  }
1298

1299
  // recover
1300
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1301
    TSKEY            ts = getFlushMark(pFileState);
1302
    SWinKey          start = {.groupId = pKey->groupId, .ts = INT64_MAX};
1303
    void*            pState = getStateFileStore(pFileState);
1304
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
1305
    for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
1306
      SWinKey tmpKey = {.groupId = pKey->groupId};
1307
      int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
1308
      if (tmpRes != TSDB_CODE_SUCCESS) {
1309
        break;
1310
      }
1311
      void* tmp = taosArrayPush(pWinStates, &tmpKey);
1312
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1313
      streamStateCurPrev_rocksdb(pCur);
1314
    }
1315
    taosArraySort(pWinStates, winKeyCmprImpl);
1316
    streamStateFreeCur(pCur);
1317
  }
1318

1319
  int32_t size = taosArrayGetSize(pWinStates);
1320
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1321
  if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
1322
    // find the first position which is smaller than the pKey
1323
    if (index >= 0) {
1324
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1325
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1326
        goto _end;
1327
      }
1328
    }
1329
    index++;
1330
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1331
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1332
  }
1333

1334
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1335
    int32_t num = size - NUM_OF_CACHE_WIN;
1336
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1337
  }
1338

1339
_end:
1340
  if (code != TSDB_CODE_SUCCESS) {
1341
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1342
  }
1343
  return code;
1344
}
1345
#endif
1346

1347
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
109✔
1348
                           int32_t* pVLen, int32_t* pWinCode) {
1349
  int32_t    code = TSDB_CODE_SUCCESS;
109✔
1350
  int32_t    lino = 0;
109✔
1351
  SArray*    pWinStates = NULL;
109✔
1352
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
109✔
1353
  void*      pState = getStateFileStore(pFileState);
109✔
1354
  void**     ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
109✔
1355
  if (ppBuff) {
109!
1356
    pWinStates = (SArray*)(*ppBuff);
109✔
1357
  } else {
1358
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1359
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1360
    void*            tmpVal = NULL;
×
1361
    int32_t          len = 0;
×
1362
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1363
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1364
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1365
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1366
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1367
        QUERY_CHECK_CODE(code, lino, _end);
×
1368
      }
1369
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1370
      taosMemoryFreeClear(tmpVal);
×
1371
      *pVLen = getRowStateRowSize(pFileState);
×
1372
      (*ppVal) = pNewPos;
×
1373
    }
1374
    streamStateFreeCur(pCur);
×
1375
    return code;
×
1376
  }
1377
  int32_t size = taosArrayGetSize(pWinStates);
109✔
1378
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
109✔
1379
  if (index >= 0) {
109!
1380
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
109✔
1381
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
109!
1382
      index--;
109✔
1383
    } else {
1384
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
×
1385
    }
1386
  }
1387
  if (index == -1) {
109✔
1388
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
28✔
1389
    void*            tmpVal = NULL;
28✔
1390
    int32_t          len = 0;
28✔
1391
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
28✔
1392
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
28!
1393
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1394
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1395
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1396
        QUERY_CHECK_CODE(code, lino, _end);
×
1397
      }
1398
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1399
      taosMemoryFreeClear(tmpVal);
×
1400
      *pVLen = getRowStateRowSize(pFileState);
×
1401
      (*ppVal) = pNewPos;
×
1402
    }
1403
    streamStateFreeCur(pCur);
28✔
1404
    return code;
28✔
1405
  } else {
1406
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
81✔
1407
    *pResKey = *pPrevKey;
81✔
1408
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
81✔
1409
  }
1410
  (*pWinCode) = TSDB_CODE_FAILED;
1411

1412
_end:
×
1413
  if (code != TSDB_CODE_SUCCESS) {
×
1414
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1415
  }
1416
  return code;
×
1417
}
1418

1419
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
2,422✔
1420
  int32_t code = TSDB_CODE_SUCCESS;
2,422✔
1421
  int32_t lino = 0;
2,422✔
1422
  int32_t size = taosArrayGetSize(pWinStates);
2,422✔
1423
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
2,422✔
1424
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
2,422!
1425
    if (index >= 0) {
2,422✔
1426
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
2,120✔
1427
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
2,120✔
1428
        goto _end;
1,502✔
1429
      }
1430
    }
1431
    index++;
920✔
1432
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
920✔
1433
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
920!
1434
  }
1435

1436
  if (size >= MAX_NUM_OF_CACHE_WIN) {
920!
1437
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1438
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1439
  }
1440
_end:
920✔
1441
  if (code != TSDB_CODE_SUCCESS) {
2,422!
1442
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1443
  }
1444
  return code;
2,422✔
1445
}
1446

1447
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
3,709✔
1448
  int32_t code = TSDB_CODE_SUCCESS;
3,709✔
1449
  int32_t lino = 0; 
3,709✔
1450
  SArray*    pWinStates = NULL;
3,709✔
1451
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
3,709✔
1452
  if (ppBuff) {
3,709✔
1453
    pWinStates = (SArray*)(*ppBuff);
3,419✔
1454
  } else {
1455
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
290✔
1456
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
290!
1457

1458
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
290✔
1459
    QUERY_CHECK_CODE(code, lino, _end);
290!
1460
  }
1461

1462
  (*ppResStates) = pWinStates;
3,709✔
1463

1464
_end:
3,709✔
1465
  if (code != TSDB_CODE_SUCCESS) {
3,709!
1466
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1467
  }
1468
  return code;
3,709✔
1469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc