• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.57
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "thash.h"
23
#include "tsimplehash.h"
24

25
#define FLUSH_RATIO                    0.5
26
#define FLUSH_NUM                      4
27
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
28
#define MIN_NUM_OF_ROW_BUFF            10240
29
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
30
#define MIN_NUM_SEARCH_BUCKET          128
31
#define MAX_ARRAY_SIZE                 1024
32
#define MAX_GROUP_ID_NUM               200000
33
#define NUM_OF_CACHE_WIN               64
34
#define MAX_NUM_OF_CACHE_WIN           128
35

36
#define TASK_KEY               "streamFileState"
37
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
38

39
struct SStreamFileState {
40
  SList*     usedBuffs;
41
  SList*     freeBuffs;
42
  void*      rowStateBuff;
43
  void*      pFileStore;
44
  int32_t    rowSize;
45
  int32_t    selectivityRowSize;
46
  int32_t    keyLen;
47
  uint64_t   preCheckPointVersion;
48
  uint64_t   checkPointVersion;
49
  TSKEY      maxTs;
50
  TSKEY      deleteMark;
51
  TSKEY      flushMark;
52
  uint64_t   maxRowCount;
53
  uint64_t   curRowCount;
54
  GetTsFun   getTs;
55
  char*      id;
56
  char*      cfName;
57
  void*      searchBuff;
58
  SSHashObj* pGroupIdMap;
59
  bool       hasFillCatch;
60

61
  _state_buff_cleanup_fn         stateBuffCleanupFn;
62
  _state_buff_remove_fn          stateBuffRemoveFn;
63
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
64
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
65

66
  _state_file_remove_fn stateFileRemoveFn;
67
  _state_file_get_fn    stateFileGetFn;
68

69
  _state_fun_get_fn stateFunctionGetFn;
70
};
71

72
typedef SRowBuffPos SRowBuffInfo;
73

UNCOV
74
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
×
UNCOV
75
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
×
UNCOV
76
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
×
77
}
78

UNCOV
79
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
×
UNCOV
80
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
×
UNCOV
81
  if (pos) {
×
UNCOV
82
    (*pos)->beFlushed = true;
×
83
  }
UNCOV
84
  return tSimpleHashRemove(pBuff, pKey, keyLen);
×
85
}
86

UNCOV
87
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
88
  size_t        keyLen = pFileState->keyLen;
×
UNCOV
89
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
×
UNCOV
90
  if (ppPos) {
×
UNCOV
91
    if ((*ppPos) == pPos) {
×
UNCOV
92
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
×
UNCOV
93
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
94
    }
95
  }
UNCOV
96
}
×
97

98
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
99

UNCOV
100
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
×
101

UNCOV
102
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
×
UNCOV
103
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
×
104
}
105

UNCOV
106
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
×
UNCOV
107
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
×
108
}
109

UNCOV
110
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
111
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
×
UNCOV
112
  if (pStateKey == NULL) {
×
113
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
114
    return NULL;
×
115
  }
UNCOV
116
  SWinKey* pWinKey = pPos->pKey;
×
UNCOV
117
  pStateKey->key = *pWinKey;
×
UNCOV
118
  pStateKey->opNum = num;
×
UNCOV
119
  return pStateKey;
×
120
}
121

UNCOV
122
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
123
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
×
UNCOV
124
  if (pStateKey == NULL) {
×
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
126
    return NULL;
×
127
  }
UNCOV
128
  SWinKey* pWinKey = pPos->pKey;
×
UNCOV
129
  *pStateKey = *pWinKey;
×
UNCOV
130
  return pStateKey;
×
131
}
132

UNCOV
133
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
×
UNCOV
134
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
×
135
}
136

UNCOV
137
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
×
UNCOV
138
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
×
139
}
140

UNCOV
141
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
×
UNCOV
142
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
×
UNCOV
143
  if (pStateKey == NULL) {
×
144
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
145
    return NULL;
×
146
  }
UNCOV
147
  SSessionKey* pWinKey = pPos->pKey;
×
UNCOV
148
  pStateKey->key = *pWinKey;
×
UNCOV
149
  pStateKey->opNum = num;
×
UNCOV
150
  return pStateKey;
×
151
}
152

UNCOV
153
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
×
154

UNCOV
155
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
×
UNCOV
156
  *pLen = sizeof(TSKEY);
×
UNCOV
157
  (*pVal) = taosMemoryCalloc(1, *pLen);
×
UNCOV
158
  if ((*pVal) == NULL) {
×
159
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
160
    return terrno;
×
161
  }
UNCOV
162
  void*   buff = *pVal;
×
UNCOV
163
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
×
UNCOV
164
  return TSDB_CODE_SUCCESS;
×
165
}
166

167
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
1✔
168
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
169
                            SStreamFileState** ppFileState) {
170
  int32_t code = TSDB_CODE_SUCCESS;
1✔
171
  int32_t lino = 0;
1✔
172
  if (memSize <= 0) {
1!
173
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
174
  }
175
  if (rowSize == 0) {
1!
176
    code = TSDB_CODE_INVALID_PARA;
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178
  }
179

180
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
1!
181
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
1!
182

183
  rowSize += selectRowSize;
1✔
184
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
1✔
185
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
1✔
186
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
1!
187

188
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
1✔
189
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
1!
190

191
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1✔
192
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
1✔
193
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
1!
194
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1✔
195
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
1✔
196
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
1✔
197
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
1✔
198
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
1✔
199

200
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
1✔
201
    pFileState->stateFileGetFn = intervalFileGetFn;
1✔
202
    pFileState->cfName = taosStrdup("state");
1!
203
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
1✔
UNCOV
204
  } else if (type == STREAM_STATE_BUFF_SORT) {
×
UNCOV
205
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
×
UNCOV
206
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
×
UNCOV
207
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
×
UNCOV
208
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
×
UNCOV
209
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
×
210

UNCOV
211
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
×
UNCOV
212
    pFileState->stateFileGetFn = sessionFileGetFn;
×
UNCOV
213
    pFileState->cfName = taosStrdup("sess");
×
UNCOV
214
    pFileState->stateFunctionGetFn = getSessionRowBuff;
×
UNCOV
215
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
×
UNCOV
216
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
×
UNCOV
217
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
×
UNCOV
218
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
×
UNCOV
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
×
UNCOV
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
×
UNCOV
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
×
UNCOV
222
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
×
223

UNCOV
224
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
×
UNCOV
225
    pFileState->stateFileGetFn = hashSortFileGetFn;
×
UNCOV
226
    pFileState->cfName = taosStrdup("fill");
×
UNCOV
227
    pFileState->stateFunctionGetFn = NULL;
×
228
  }
229

230
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
1!
231
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
1!
232
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
1!
233
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
1!
234

235
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
1!
UNCOV
236
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
×
UNCOV
237
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
×
238
  }
239

240
  pFileState->keyLen = keySize;
1✔
241
  pFileState->rowSize = rowSize;
1✔
242
  pFileState->selectivityRowSize = selectRowSize;
1✔
243
  pFileState->preCheckPointVersion = 0;
1✔
244
  pFileState->checkPointVersion = 1;
1✔
245
  pFileState->pFileStore = pFile;
1✔
246
  pFileState->getTs = fp;
1✔
247
  pFileState->curRowCount = 0;
1✔
248
  pFileState->deleteMark = delMark;
1✔
249
  pFileState->flushMark = INT64_MIN;
1✔
250
  pFileState->maxTs = INT64_MIN;
1✔
251
  pFileState->id = taosStrdup(taskId);
1!
252
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
1!
253

254
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
1✔
255
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
1!
256

257
  pFileState->hasFillCatch = true;
1✔
258

259
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
1!
260
    code = recoverSnapshot(pFileState, checkpointId);
1✔
UNCOV
261
  } else if (type == STREAM_STATE_BUFF_SORT) {
×
UNCOV
262
    code = recoverSession(pFileState, checkpointId);
×
UNCOV
263
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
×
UNCOV
264
    code = recoverFillSnapshot(pFileState, checkpointId);
×
265
  }
266
  QUERY_CHECK_CODE(code, lino, _end);
1!
267

268
  void*   valBuf = NULL;
1✔
269
  int32_t len = 0;
1✔
270
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
1✔
271
  if (tmpRes == TSDB_CODE_SUCCESS) {
1!
UNCOV
272
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
273
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
×
UNCOV
274
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
×
275
  }
276
  taosMemoryFreeClear(valBuf);
1!
277
  (*ppFileState) = pFileState;
1✔
278

279
_end:
1✔
280
  if (code != TSDB_CODE_SUCCESS) {
1!
281
    streamFileStateDestroy(pFileState);
×
282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
283
  }
284
  return code;
1✔
285
}
286

287
void destroyRowBuffPos(SRowBuffPos* pPos) {
1✔
288
  taosMemoryFreeClear(pPos->pKey);
1!
289
  taosMemoryFreeClear(pPos->pRowBuff);
1!
290
  taosMemoryFree(pPos);
1!
291
}
1✔
292

UNCOV
293
void destroyRowBuffPosPtr(void* ptr) {
×
UNCOV
294
  if (!ptr) {
×
295
    return;
×
296
  }
UNCOV
297
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
×
UNCOV
298
  if (!pPos->beUsed) {
×
UNCOV
299
    destroyRowBuffPos(pPos);
×
300
  }
301
}
302

UNCOV
303
void destroyRowBuffAllPosPtr(void* ptr) {
×
UNCOV
304
  if (!ptr) {
×
305
    return;
×
306
  }
UNCOV
307
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
×
UNCOV
308
  destroyRowBuffPos(pPos);
×
309
}
310

UNCOV
311
void destroyRowBuff(void* ptr) {
×
UNCOV
312
  if (!ptr) {
×
313
    return;
×
314
  }
UNCOV
315
  taosMemoryFree(*(void**)ptr);
×
316
}
317

318
void streamFileStateDestroy(SStreamFileState* pFileState) {
3✔
319
  if (!pFileState) {
3!
320
    return;
3✔
321
  }
322

UNCOV
323
  taosMemoryFree(pFileState->id);
×
UNCOV
324
  taosMemoryFree(pFileState->cfName);
×
UNCOV
325
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
×
UNCOV
326
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
×
UNCOV
327
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
×
UNCOV
328
  sessionWinStateCleanup(pFileState->searchBuff);
×
UNCOV
329
  tSimpleHashCleanup(pFileState->pGroupIdMap);
×
UNCOV
330
  taosMemoryFree(pFileState);
×
331
}
332

UNCOV
333
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
334
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
335
  int32_t lino = 0;
×
UNCOV
336
  if (pPos->pRowBuff) {
×
UNCOV
337
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
×
UNCOV
338
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
339
    pPos->pRowBuff = NULL;
×
340
  }
341

UNCOV
342
_end:
×
UNCOV
343
  if (code != TSDB_CODE_SUCCESS) {
×
344
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
345
  }
UNCOV
346
  return code;
×
347
}
348

UNCOV
349
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
×
UNCOV
350
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
351
  int32_t   lino = 0;
×
UNCOV
352
  SListIter iter = {0};
×
UNCOV
353
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
354

UNCOV
355
  SListNode* pNode = NULL;
×
UNCOV
356
  while ((pNode = tdListNext(&iter)) != NULL) {
×
UNCOV
357
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
×
UNCOV
358
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
×
UNCOV
359
      code = putFreeBuff(pFileState, pPos);
×
UNCOV
360
      QUERY_CHECK_CODE(code, lino, _end);
×
361

UNCOV
362
      if (!all) {
×
UNCOV
363
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
364
      }
UNCOV
365
      destroyRowBuffPos(pPos);
×
UNCOV
366
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
367
      taosMemoryFreeClear(tmp);
×
368
    }
369
  }
370

UNCOV
371
_end:
×
UNCOV
372
  if (code != TSDB_CODE_SUCCESS) {
×
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
UNCOV
375
}
×
376

UNCOV
377
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
×
UNCOV
378
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
379
  int32_t   lino = 0;
×
UNCOV
380
  uint64_t  i = 0;
×
UNCOV
381
  SListIter iter = {0};
×
UNCOV
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
383

UNCOV
384
  SListNode* pNode = NULL;
×
UNCOV
385
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
×
UNCOV
386
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
387
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
×
UNCOV
388
      if (all || !pPos->beUsed) {
×
UNCOV
389
        if (all && !pPos->pRowBuff) {
×
390
          continue;
×
391
        }
UNCOV
392
        code = tdListAppend(pFlushList, &pPos);
×
UNCOV
393
        QUERY_CHECK_CODE(code, lino, _end);
×
394

UNCOV
395
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
UNCOV
396
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
UNCOV
397
        if (pPos->beUsed == false) {
×
UNCOV
398
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
399
          taosMemoryFreeClear(tmp);
×
400
        }
UNCOV
401
        if (pPos->pRowBuff) {
×
UNCOV
402
          i++;
×
403
        }
404
      }
405
    }
406
  }
UNCOV
407
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
×
408

UNCOV
409
_end:
×
UNCOV
410
  if (code != TSDB_CODE_SUCCESS) {
×
411
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
412
  }
UNCOV
413
  return code;
×
414
}
415

UNCOV
416
void streamFileStateClear(SStreamFileState* pFileState) {
×
UNCOV
417
  pFileState->flushMark = INT64_MIN;
×
UNCOV
418
  pFileState->maxTs = INT64_MIN;
×
UNCOV
419
  tSimpleHashClear(pFileState->rowStateBuff);
×
UNCOV
420
  clearExpiredRowBuff(pFileState, 0, true);
×
UNCOV
421
}
×
422

UNCOV
423
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
×
424

UNCOV
425
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
×
426

UNCOV
427
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
×
UNCOV
428
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
429
  int32_t   lino = 0;
×
UNCOV
430
  uint64_t  i = 0;
×
UNCOV
431
  SListIter iter = {0};
×
UNCOV
432
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
×
433

UNCOV
434
  SListNode* pNode = NULL;
×
UNCOV
435
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
×
UNCOV
436
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
437
    if (pPos->beUsed == used) {
×
UNCOV
438
      if (used && !pPos->pRowBuff) {
×
UNCOV
439
        continue;
×
440
      }
UNCOV
441
      code = tdListAppend(pFlushList, &pPos);
×
UNCOV
442
      QUERY_CHECK_CODE(code, lino, _end);
×
443

UNCOV
444
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
UNCOV
445
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
×
UNCOV
446
      if (pPos->beUsed == false) {
×
UNCOV
447
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
×
UNCOV
448
        taosMemoryFreeClear(tmp);
×
449
      }
UNCOV
450
      if (pPos->pRowBuff) {
×
UNCOV
451
        i++;
×
452
      }
453
    }
454
  }
455

UNCOV
456
  qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
×
457

458
_end:
×
UNCOV
459
  if (code != TSDB_CODE_SUCCESS) {
×
460
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
461
  }
UNCOV
462
  return code;
×
463
}
464

UNCOV
465
int32_t flushRowBuff(SStreamFileState* pFileState) {
×
UNCOV
466
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
467
  int32_t          lino = 0;
×
UNCOV
468
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
×
UNCOV
469
  if (!pFlushList) {
×
470
    code = TSDB_CODE_OUT_OF_MEMORY;
×
471
    QUERY_CHECK_CODE(code, lino, _end);
×
472
  }
473

UNCOV
474
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
×
UNCOV
475
  num = TMAX(num, FLUSH_NUM);
×
UNCOV
476
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
×
UNCOV
477
  QUERY_CHECK_CODE(code, lino, _end);
×
478

UNCOV
479
  if (isListEmpty(pFlushList)) {
×
UNCOV
480
    code = popUsedBuffs(pFileState, pFlushList, num, false);
×
UNCOV
481
    QUERY_CHECK_CODE(code, lino, _end);
×
482

UNCOV
483
    if (isListEmpty(pFlushList)) {
×
UNCOV
484
      code = popUsedBuffs(pFileState, pFlushList, num, true);
×
UNCOV
485
      QUERY_CHECK_CODE(code, lino, _end);
×
486
    }
487
  }
488

UNCOV
489
  if (pFileState->searchBuff) {
×
UNCOV
490
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
×
UNCOV
491
    QUERY_CHECK_CODE(code, lino, _end);
×
492
  }
493

UNCOV
494
  flushSnapshot(pFileState, pFlushList, false);
×
495

UNCOV
496
  SListIter fIter = {0};
×
UNCOV
497
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
×
UNCOV
498
  SListNode* pNode = NULL;
×
UNCOV
499
  while ((pNode = tdListNext(&fIter)) != NULL) {
×
UNCOV
500
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
501
    code = putFreeBuff(pFileState, pPos);
×
UNCOV
502
    QUERY_CHECK_CODE(code, lino, _end);
×
503
  }
504

UNCOV
505
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
506

UNCOV
507
_end:
×
UNCOV
508
  if (code != TSDB_CODE_SUCCESS) {
×
509
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
510
  }
UNCOV
511
  return code;
×
512
}
513

UNCOV
514
int32_t clearRowBuff(SStreamFileState* pFileState) {
×
UNCOV
515
  if (pFileState->deleteMark != INT64_MAX) {
×
516
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
517
  }
518
  do {
UNCOV
519
    int32_t code = flushRowBuff(pFileState);
×
UNCOV
520
    if (code != TSDB_CODE_SUCCESS) {
×
521
      return code;
×
522
    }
UNCOV
523
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
×
UNCOV
524
  return TSDB_CODE_SUCCESS;
×
525
}
526

527
void* getFreeBuff(SStreamFileState* pFileState) {
2✔
528
  SList*     lists = pFileState->freeBuffs;
2✔
529
  int32_t    buffSize = pFileState->rowSize;
2✔
530
  SListNode* pNode = tdListPopHead(lists);
2✔
531
  if (!pNode) {
2!
532
    return NULL;
2✔
533
  }
UNCOV
534
  void* ptr = *(void**)pNode->data;
×
UNCOV
535
  memset(ptr, 0, buffSize);
×
UNCOV
536
  taosMemoryFree(pNode);
×
UNCOV
537
  return ptr;
×
538
}
539

540
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
541
  if (pPos->pRowBuff) {
×
542
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
543
  }
544
}
×
545

546
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
2✔
547
  int32_t      code = TSDB_CODE_SUCCESS;
2✔
548
  int32_t      lino = 0;
2✔
549
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
2!
550
  if (!pPos) {
2!
551
    code = terrno;
×
552
    QUERY_CHECK_CODE(code, lino, _error);
×
553
  }
554

555
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
2!
556
  if (!pPos->pKey) {
2!
557
    code = terrno;
×
558
    QUERY_CHECK_CODE(code, lino, _error);
×
559
  }
560

561
  void* pBuff = getFreeBuff(pFileState);
2✔
562
  if (pBuff) {
2!
UNCOV
563
    pPos->pRowBuff = pBuff;
×
UNCOV
564
    goto _end;
×
565
  }
566

567
  if (pFileState->curRowCount < pFileState->maxRowCount) {
2!
568
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
2!
569
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
2!
570
    pPos->pRowBuff = pBuff;
2✔
571
    pFileState->curRowCount++;
2✔
572
    goto _end;
2✔
573
  }
574

UNCOV
575
  code = clearRowBuff(pFileState);
×
UNCOV
576
  QUERY_CHECK_CODE(code, lino, _error);
×
577

UNCOV
578
  pPos->pRowBuff = getFreeBuff(pFileState);
×
579

580
_end:
2✔
581
  code = tdListAppend(pFileState->usedBuffs, &pPos);
2✔
582
  QUERY_CHECK_CODE(code, lino, _error);
2!
583

584
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2!
585
_error:
2✔
586
  if (code != TSDB_CODE_SUCCESS) {
2!
587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
588
    return NULL;
×
589
  }
590

591
  return pPos;
2✔
592
}
593

594
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
2✔
595
  int32_t      code = TSDB_CODE_SUCCESS;
2✔
596
  int32_t      lino = 0;
2✔
597
  SRowBuffPos* newPos = getNewRowPos(pFileState);
2✔
598
  if (!newPos) {
2!
599
    code = TSDB_CODE_OUT_OF_MEMORY;
×
600
    QUERY_CHECK_CODE(code, lino, _error);
×
601
  }
602
  newPos->beUsed = true;
2✔
603
  newPos->beFlushed = false;
2✔
604
  newPos->needFree = false;
2✔
605
  newPos->beUpdated = true;
2✔
606
  return newPos;
2✔
607

608
_error:
×
609
  if (code != TSDB_CODE_SUCCESS) {
×
610
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
611
  }
612
  return NULL;
×
613
}
614

UNCOV
615
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
616
                             int32_t* pWinCode) {
UNCOV
617
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
618
  int32_t lino = 0;
×
UNCOV
619
  (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
620
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
×
UNCOV
621
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
622
  if (pos) {
×
UNCOV
623
    if (pVal != NULL) {
×
UNCOV
624
      *pVLen = pFileState->rowSize;
×
UNCOV
625
      *pVal = *pos;
×
UNCOV
626
      (*pos)->beUsed = true;
×
UNCOV
627
      (*pos)->beFlushed = false;
×
628
    }
UNCOV
629
    goto _end;
×
630
  }
UNCOV
631
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
632
  if (!pNewPos || !pNewPos->pRowBuff) {
×
633
    code = TSDB_CODE_OUT_OF_MEMORY;
×
634
    QUERY_CHECK_CODE(code, lino, _end);
×
635
  }
636

UNCOV
637
  memcpy(pNewPos->pKey, pKey, keyLen);
×
UNCOV
638
  (*pWinCode) = TSDB_CODE_FAILED;
×
639

UNCOV
640
  TSKEY ts = pFileState->getTs(pKey);
×
UNCOV
641
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
×
UNCOV
642
    int32_t len = 0;
×
UNCOV
643
    void*   p = NULL;
×
UNCOV
644
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
×
UNCOV
645
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
×
UNCOV
646
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
647
      memcpy(pNewPos->pRowBuff, p, len);
×
648
    }
UNCOV
649
    taosMemoryFree(p);
×
650
  }
651

UNCOV
652
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
653
  QUERY_CHECK_CODE(code, lino, _end);
×
654

UNCOV
655
  if (pVal) {
×
UNCOV
656
    *pVLen = pFileState->rowSize;
×
UNCOV
657
    *pVal = pNewPos;
×
658
  }
659

660
_end:
×
UNCOV
661
  if (code != TSDB_CODE_SUCCESS) {
×
662
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
663
  }
UNCOV
664
  return code;
×
665
}
666

UNCOV
667
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
×
UNCOV
668
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
669
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
×
UNCOV
670
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
UNCOV
671
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
×
UNCOV
672
  if (pFileState->searchBuff != NULL) {
×
UNCOV
673
    deleteHashSortRowBuff(pFileState, pKey);
×
674
  }
UNCOV
675
}
×
676

UNCOV
677
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
×
UNCOV
678
  SSHashObj* pRowMap = pFileState->rowStateBuff;
×
UNCOV
679
  void*   pIte = NULL;
×
UNCOV
680
  int32_t iter = 0;
×
UNCOV
681
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
×
UNCOV
682
    size_t keyLen = 0;
×
UNCOV
683
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
UNCOV
684
    if (pKey->groupId == groupId) {
×
UNCOV
685
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
×
UNCOV
686
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
687
    }
688
  }
689

UNCOV
690
  while (1) {
×
UNCOV
691
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
×
UNCOV
692
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
×
UNCOV
693
    SWinKey delKey = {.groupId = groupId};
×
UNCOV
694
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
×
UNCOV
695
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
696
      break;
×
697
    }
UNCOV
698
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
×
UNCOV
699
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
×
700
  }
UNCOV
701
}
×
702

UNCOV
703
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
704
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
705
  int32_t lino = 0;
×
UNCOV
706
  int32_t len = 0;
×
UNCOV
707
  void*   pBuff = NULL;
×
UNCOV
708
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
×
UNCOV
709
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
710
  memcpy(pPos->pRowBuff, pBuff, len);
×
UNCOV
711
  taosMemoryFree(pBuff);
×
712

UNCOV
713
_end:
×
UNCOV
714
  if (code != TSDB_CODE_SUCCESS) {
×
715
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
716
  }
UNCOV
717
  return code;
×
718
}
719

UNCOV
720
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
UNCOV
721
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
722
  int32_t lino = 0;
×
UNCOV
723
  pPos->pRowBuff = getFreeBuff(pFileState);
×
UNCOV
724
  if (!pPos->pRowBuff) {
×
UNCOV
725
    if (pFileState->curRowCount < pFileState->maxRowCount) {
×
UNCOV
726
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
×
UNCOV
727
      if (!pPos->pRowBuff) {
×
728
        code = terrno;
×
729
        QUERY_CHECK_CODE(code, lino, _end);
×
730
      }
UNCOV
731
      pFileState->curRowCount++;
×
732
    } else {
UNCOV
733
      code = clearRowBuff(pFileState);
×
UNCOV
734
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
735
      pPos->pRowBuff = getFreeBuff(pFileState);
×
736
    }
UNCOV
737
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
738
  }
739

UNCOV
740
  code = recoverSessionRowBuff(pFileState, pPos);
×
UNCOV
741
  QUERY_CHECK_CODE(code, lino, _end);
×
742

UNCOV
743
_end:
×
UNCOV
744
  if (code != TSDB_CODE_SUCCESS) {
×
745
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
746
  }
UNCOV
747
  return code;
×
748
}
749

UNCOV
750
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
×
UNCOV
751
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
752
  int32_t lino = 0;
×
UNCOV
753
  if (pPos->pRowBuff) {
×
UNCOV
754
    if (pPos->needFree) {
×
755
      code = recoverSessionRowBuff(pFileState, pPos);
×
756
      QUERY_CHECK_CODE(code, lino, _end);
×
757
    }
UNCOV
758
    (*pVal) = pPos->pRowBuff;
×
UNCOV
759
    goto _end;
×
760
  }
761

UNCOV
762
  code = recoverStateRowBuff(pFileState, pPos);
×
UNCOV
763
  QUERY_CHECK_CODE(code, lino, _end);
×
764

UNCOV
765
  (*pVal) = pPos->pRowBuff;
×
766
  // if (!pPos->needFree) {
767
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
768
  //   QUERY_CHECK_CODE(code, lino, _end);
769
  // }
770

UNCOV
771
_end:
×
UNCOV
772
  if (code != TSDB_CODE_SUCCESS) {
×
773
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
774
  }
UNCOV
775
  return code;
×
776
}
777

UNCOV
778
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
×
UNCOV
779
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
780
  if (pos) {
×
UNCOV
781
    return true;
×
782
  }
UNCOV
783
  return false;
×
784
}
785

UNCOV
786
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
×
UNCOV
787
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
×
788
                     ? INT64_MIN
UNCOV
789
                     : pFileState->maxTs - pFileState->deleteMark;
×
UNCOV
790
  clearExpiredRowBuff(pFileState, mark, false);
×
UNCOV
791
  return pFileState->usedBuffs;
×
792
}
793

UNCOV
794
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
×
UNCOV
795
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
796
  int32_t   lino = 0;
×
UNCOV
797
  SListIter iter = {0};
×
UNCOV
798
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
×
799

UNCOV
800
  const int32_t BATCH_LIMIT = 256;
×
801

UNCOV
802
  int64_t    st = taosGetTimestampMs();
×
UNCOV
803
  SListNode* pNode = NULL;
×
804

UNCOV
805
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
×
806

UNCOV
807
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
×
UNCOV
808
  char*   buf = taosMemoryCalloc(1, len);
×
UNCOV
809
  if (!buf) {
×
810
    code = terrno;
×
811
    QUERY_CHECK_CODE(code, lino, _end);
×
812
  }
813

UNCOV
814
  void* batch = streamStateCreateBatch();
×
UNCOV
815
  if (!batch) {
×
816
    code = TSDB_CODE_OUT_OF_MEMORY;
×
817
    QUERY_CHECK_CODE(code, lino, _end);
×
818
  }
819

UNCOV
820
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
×
UNCOV
821
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
×
UNCOV
822
    if (pPos->beFlushed || !pPos->pRowBuff) {
×
UNCOV
823
      continue;
×
824
    }
UNCOV
825
    pPos->beFlushed = true;
×
UNCOV
826
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
×
827

UNCOV
828
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
×
UNCOV
829
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
UNCOV
830
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
831
      streamStateClearBatch(batch);
×
UNCOV
832
      QUERY_CHECK_CODE(code, lino, _end);
×
833
    }
834

UNCOV
835
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
×
UNCOV
836
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
×
837

UNCOV
838
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
×
839
                                       0, buf);
UNCOV
840
    taosMemoryFreeClear(pSKey);
×
UNCOV
841
    QUERY_CHECK_CODE(code, lino, _end);
×
842
    // todo handle failure
UNCOV
843
    memset(buf, 0, len);
×
844
  }
UNCOV
845
  taosMemoryFreeClear(buf);
×
846

UNCOV
847
  int32_t numOfElems = streamStateGetBatchSize(batch);
×
UNCOV
848
  if (numOfElems > 0) {
×
UNCOV
849
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
850
    QUERY_CHECK_CODE(code, lino, _end);
×
851
  } else {
UNCOV
852
    goto _end;
×
853
  }
854

UNCOV
855
  streamStateClearBatch(batch);
×
856

UNCOV
857
  clearSearchBuff(pFileState);
×
858

UNCOV
859
  int64_t elapsed = taosGetTimestampMs() - st;
×
UNCOV
860
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
×
861
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
862

UNCOV
863
  if (flushState) {
×
UNCOV
864
    void*   valBuf = NULL;
×
UNCOV
865
    int32_t len = 0;
×
UNCOV
866
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
×
UNCOV
867
    QUERY_CHECK_CODE(code, lino, _end);
×
868

UNCOV
869
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
×
UNCOV
870
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
×
UNCOV
871
    taosMemoryFree(valBuf);
×
UNCOV
872
    QUERY_CHECK_CODE(code, lino, _end);
×
873

UNCOV
874
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
×
UNCOV
875
    QUERY_CHECK_CODE(code, lino, _end);
×
876
  }
877

UNCOV
878
_end:
×
UNCOV
879
  if (code != TSDB_CODE_SUCCESS) {
×
880
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
881
  }
UNCOV
882
  taosMemoryFree(buf);
×
UNCOV
883
  streamStateDestroyBatch(batch);
×
UNCOV
884
}
×
885

886
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
887
  char keyBuf[128] = {0};
×
888
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
889
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
890
}
891

892
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
893
  int32_t code = TSDB_CODE_SUCCESS;
×
894
  int64_t maxCheckPointId = 0;
×
895
  {
896
    char    buf[128] = {0};
×
897
    void*   val = NULL;
×
898
    int32_t len = 0;
×
899
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
900
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
901
    if (code != 0 || len == 0 || val == NULL) {
×
902
      return TSDB_CODE_FAILED;
×
903
    }
904
    memcpy(buf, val, len);
×
905
    buf[len] = 0;
×
906
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
907
    taosMemoryFree(val);
×
908
  }
909
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
910
    char    buf[128] = {0};
×
911
    void*   val = 0;
×
912
    int32_t len = 0;
×
913
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
914
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
915
    if (code != 0) {
×
916
      return TSDB_CODE_FAILED;
×
917
    }
918
    memcpy(buf, val, len);
×
919
    buf[len] = 0;
×
920
    taosMemoryFree(val);
×
921

922
    TSKEY ts;
923
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
924
    if (ts < mark) {
×
925
      // statekey winkey.ts < mark
926
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
927
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
928
      break;
×
929
    }
930
  }
931
  return code;
×
932
}
933

UNCOV
934
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
×
UNCOV
935
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
936
  int32_t lino = 0;
×
UNCOV
937
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
938
  if (pFileState->maxTs != INT64_MIN) {
×
939
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
940
                       ? INT64_MIN
941
                       : pFileState->maxTs - pFileState->deleteMark;
×
942
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
943
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
944
  }
945

UNCOV
946
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
×
UNCOV
947
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
948
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
949
    if (pFileState->curRowCount >= recoverNum) {
×
UNCOV
950
      break;
×
951
    }
952

UNCOV
953
    void*       pVal = NULL;
×
UNCOV
954
    int32_t     vlen = 0;
×
UNCOV
955
    SSessionKey key = {0};
×
UNCOV
956
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
×
UNCOV
957
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
958
      break;
×
959
    }
960

UNCOV
961
    if (vlen != pFileState->rowSize) {
×
962
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
963
      QUERY_CHECK_CODE(code, lino, _end);
×
964
    }
965

UNCOV
966
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
UNCOV
967
    pPos->beUsed = false;
×
UNCOV
968
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
UNCOV
969
    if (winRes != TSDB_CODE_SUCCESS) {
×
970
      break;
×
971
    }
972

UNCOV
973
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
974
  }
975

976
_end:
×
UNCOV
977
  if (code != TSDB_CODE_SUCCESS) {
×
978
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
979
  }
UNCOV
980
  streamStateFreeCur(pCur);
×
UNCOV
981
  return code;
×
982
}
983

984
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
1✔
985
  int32_t code = TSDB_CODE_SUCCESS;
1✔
986
  int32_t lino = 0;
1✔
987
  int32_t winCode = TSDB_CODE_SUCCESS;
1✔
988
  if (pFileState->maxTs != INT64_MIN) {
1!
989
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
990
                       ? INT64_MIN
991
                       : pFileState->maxTs - pFileState->deleteMark;
×
992
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
993
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
994
  }
995

996
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
1✔
997
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1✔
998
  while (winCode == TSDB_CODE_SUCCESS) {
1!
999
    if (pFileState->curRowCount >= recoverNum) {
1!
1000
      break;
1✔
1001
    }
1002

1003
    void*        pVal = NULL;
1✔
1004
    int32_t      vlen = 0;
1✔
1005
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
1✔
1006
    if (!pNewPos || !pNewPos->pRowBuff) {
1!
1007
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1008
      QUERY_CHECK_CODE(code, lino, _end);
×
1009
    }
1010

1011
    winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
1✔
1012
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
1!
1013
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
1!
1014
      destroyRowBuffPos(pNewPos);
1✔
1015
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
1✔
1016
      taosMemoryFreeClear(pNode);
1!
1017
      taosMemoryFreeClear(pVal);
1!
1018
      break;
1✔
1019
    }
1020
    if (vlen != pFileState->rowSize) {
×
1021
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1022
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1023
      taosMemoryFreeClear(pVal);
×
1024
      QUERY_CHECK_CODE(code, lino, _end);
×
1025
    }
1026
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1027
    taosMemoryFreeClear(pVal);
×
1028
    pNewPos->beFlushed = true;
×
1029
    pNewPos->beUsed = false;
×
1030
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1031
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1032
    if (code != TSDB_CODE_SUCCESS) {
×
1033
      destroyRowBuffPos(pNewPos);
×
1034
      break;
×
1035
    }
1036
    streamStateCurPrev_rocksdb(pCur);
×
1037
  }
1038

1039
_end:
×
1040
  if (code != TSDB_CODE_SUCCESS) {
1!
1041
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1042
  }
1043
  streamStateFreeCur(pCur);
1✔
1044
  return code;
1✔
1045
}
1046

UNCOV
1047
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
×
1048

UNCOV
1049
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
×
UNCOV
1050
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
×
UNCOV
1051
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
×
UNCOV
1052
}
×
1053

UNCOV
1054
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
×
1055
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
1✔
1056

1057
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
2✔
1058

UNCOV
1059
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
×
UNCOV
1060
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
×
UNCOV
1061
         ts < (pFileState->maxTs - pFileState->deleteMark);
×
1062
}
1063

UNCOV
1064
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
×
1065

UNCOV
1066
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
×
1067

1068
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
1✔
1069

UNCOV
1070
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
×
UNCOV
1071
  int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
1072
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
×
1073
}
1074

UNCOV
1075
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
×
UNCOV
1076
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1077
  int32_t lino = 0;
×
UNCOV
1078
  if (pFileState->maxTs != INT64_MIN) {
×
1079
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1080
                       ? INT64_MIN
1081
                       : pFileState->maxTs - pFileState->deleteMark;
×
1082
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1083
    QUERY_CHECK_CODE(code, lino, _end);
×
1084
  }
1085

UNCOV
1086
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
×
UNCOV
1087
  if (pCur == NULL) {
×
UNCOV
1088
    return code;
×
1089
  }
UNCOV
1090
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
×
UNCOV
1091
  int32_t winRes = TSDB_CODE_SUCCESS;
×
UNCOV
1092
  while (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
1093
    if (pFileState->curRowCount >= recoverNum) {
×
UNCOV
1094
      break;
×
1095
    }
1096

UNCOV
1097
    void*        pVal = NULL;
×
UNCOV
1098
    int32_t      vlen = 0;
×
UNCOV
1099
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1100
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
×
UNCOV
1101
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
×
UNCOV
1102
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
×
UNCOV
1103
      destroyRowBuffPos(pNewPos);
×
UNCOV
1104
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
×
UNCOV
1105
      taosMemoryFreeClear(pNode);
×
UNCOV
1106
      taosMemoryFreeClear(pVal);
×
UNCOV
1107
      break;
×
1108
    }
1109

UNCOV
1110
    if (vlen != pFileState->rowSize) {
×
1111
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1112
      destroyRowBuffPos(pNewPos);
×
1113
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1114
      taosMemoryFreeClear(pVal);
×
1115
      QUERY_CHECK_CODE(code, lino, _end);
×
1116
    }
1117

UNCOV
1118
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
UNCOV
1119
    taosMemoryFreeClear(pVal);
×
UNCOV
1120
    pNewPos->beFlushed = true;
×
UNCOV
1121
    pNewPos->beUsed = false;
×
UNCOV
1122
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
UNCOV
1123
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
1124
    if (winRes != TSDB_CODE_SUCCESS) {
×
1125
      destroyRowBuffPos(pNewPos);
×
1126
      break;
×
1127
    }
UNCOV
1128
    streamStateCurPrev_rocksdb(pCur);
×
1129
  }
UNCOV
1130
  streamStateFreeCur(pCur);
×
1131

UNCOV
1132
_end:
×
UNCOV
1133
  if (code != TSDB_CODE_SUCCESS) {
×
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1135
  }
UNCOV
1136
  return code;
×
1137
}
1138

UNCOV
1139
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
×
1140
                   int32_t* pWinCode) {
UNCOV
1141
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1142
  int32_t lino = 0;
×
UNCOV
1143
  (*pWinCode) = TSDB_CODE_FAILED;
×
UNCOV
1144
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
×
UNCOV
1145
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
×
UNCOV
1146
  if (ppPos) {
×
UNCOV
1147
    *pVLen = pFileState->rowSize;
×
UNCOV
1148
    *pVal = *ppPos;
×
UNCOV
1149
    (*ppPos)->beUsed = true;
×
UNCOV
1150
    (*ppPos)->beFlushed = false;
×
UNCOV
1151
    (*pWinCode) = TSDB_CODE_SUCCESS;
×
UNCOV
1152
    if ((*ppPos)->pRowBuff == NULL) {
×
1153
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1154
      QUERY_CHECK_CODE(code, lino, _end);
×
1155
    }
UNCOV
1156
    goto _end;
×
1157
  }
UNCOV
1158
  TSKEY ts = pFileState->getTs(pKey);
×
UNCOV
1159
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
×
UNCOV
1160
    int32_t len = 0;
×
UNCOV
1161
    void*   p = NULL;
×
UNCOV
1162
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
×
UNCOV
1163
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
×
UNCOV
1164
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
UNCOV
1165
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
UNCOV
1166
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1167
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1168
        QUERY_CHECK_CODE(code, lino, _end);
×
1169
      }
1170

UNCOV
1171
      memcpy(pNewPos->pKey, pKey, keyLen);
×
UNCOV
1172
      memcpy(pNewPos->pRowBuff, p, len);
×
UNCOV
1173
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
×
UNCOV
1174
      QUERY_CHECK_CODE(code, lino, _end);
×
1175

UNCOV
1176
      if (pVal) {
×
UNCOV
1177
        *pVLen = pFileState->rowSize;
×
UNCOV
1178
        *pVal = pNewPos;
×
1179
      }
1180
    }
UNCOV
1181
    taosMemoryFree(p);
×
1182
  }
1183

UNCOV
1184
_end:
×
UNCOV
1185
  if (code != TSDB_CODE_SUCCESS) {
×
1186
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1187
  }
UNCOV
1188
  return code;
×
1189
}
1190

UNCOV
1191
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
×
UNCOV
1192
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1193
  int32_t lino = 0;
×
UNCOV
1194
  if (value != NULL) {
×
1195
    code = TSDB_CODE_INVALID_PARA;
×
1196
    QUERY_CHECK_CODE(code, lino, _end);
×
1197
  }
1198

UNCOV
1199
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
×
UNCOV
1200
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
×
UNCOV
1201
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
×
UNCOV
1202
      QUERY_CHECK_CODE(code, lino, _end);
×
1203
    }
UNCOV
1204
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
×
UNCOV
1205
    QUERY_CHECK_CODE(code, lino, _end);
×
1206
  }
1207

UNCOV
1208
_end:
×
UNCOV
1209
  if (code != TSDB_CODE_SUCCESS) {
×
1210
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1211
  }
UNCOV
1212
  return code;
×
1213
}
1214

UNCOV
1215
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
×
UNCOV
1216
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
×
UNCOV
1217
  if (pCur->hashIter == -1) {
×
UNCOV
1218
    streamStateCurNext(pFileState->pFileStore, pCur);
×
UNCOV
1219
    return;
×
1220
  }
1221

UNCOV
1222
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
×
UNCOV
1223
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
×
1224

UNCOV
1225
  SSHashObj* pHash = pFileState->pGroupIdMap;
×
UNCOV
1226
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
×
UNCOV
1227
  if (!pCur->pHashData) {
×
UNCOV
1228
    pCur->hashIter = -1;
×
UNCOV
1229
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
×
UNCOV
1230
    return;
×
1231
  }
1232
}
1233

UNCOV
1234
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
×
UNCOV
1235
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1236
  if (pCur->pHashData) {
×
UNCOV
1237
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
×
UNCOV
1238
    return code;
×
1239
  }
UNCOV
1240
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
×
1241
}
1242

UNCOV
1243
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
×
UNCOV
1244
  return pFileState->pGroupIdMap;
×
1245
}
1246

UNCOV
1247
void clearExpiredState(SStreamFileState* pFileState) {
×
UNCOV
1248
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1249
  int32_t    lino = 0;
×
UNCOV
1250
  SSHashObj* pSearchBuff = pFileState->searchBuff;
×
UNCOV
1251
  void*      pIte = NULL;
×
UNCOV
1252
  int32_t    iter = 0;
×
UNCOV
1253
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
×
UNCOV
1254
    SArray* pWinStates = *((void**)pIte);
×
UNCOV
1255
    int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1256
    for (int32_t i = 0; i < size - 1; i++) {
×
UNCOV
1257
      SWinKey* pKey = taosArrayGet(pWinStates, i);
×
UNCOV
1258
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
×
UNCOV
1259
      qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff);
×
1260

UNCOV
1261
      if (isFlushedState(pFileState, pKey->ts, 0)) {
×
UNCOV
1262
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
UNCOV
1263
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
×
1264
      }
1265
    }
UNCOV
1266
    taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
×
1267
  }
UNCOV
1268
  code = clearRowBuff(pFileState);
×
UNCOV
1269
  QUERY_CHECK_CODE(code, lino, _end);
×
1270

UNCOV
1271
_end:
×
UNCOV
1272
  if (code != TSDB_CODE_SUCCESS) {
×
1273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1274
  }
UNCOV
1275
}
×
1276

1277
#ifdef BUILD_NO_CALL
1278
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1279
                           int32_t* pWinCode) {
1280
  int32_t code = TSDB_CODE_SUCCESS;
1281
  int32_t lino = 0;
1282

1283
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1284
  QUERY_CHECK_CODE(code, lino, _end);
1285

1286
  SArray*    pWinStates = NULL;
1287
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1288
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1289
  if (ppBuff) {
1290
    pWinStates = (SArray*)(*ppBuff);
1291
  } else {
1292
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1293
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1294

1295
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1296
    QUERY_CHECK_CODE(code, lino, _end);
1297
  }
1298

1299
  // recover
1300
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1301
    TSKEY            ts = getFlushMark(pFileState);
1302
    SWinKey          start = {.groupId = pKey->groupId, .ts = INT64_MAX};
1303
    void*            pState = getStateFileStore(pFileState);
1304
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
1305
    for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
1306
      SWinKey tmpKey = {.groupId = pKey->groupId};
1307
      int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
1308
      if (tmpRes != TSDB_CODE_SUCCESS) {
1309
        break;
1310
      }
1311
      void* tmp = taosArrayPush(pWinStates, &tmpKey);
1312
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1313
      streamStateCurPrev_rocksdb(pCur);
1314
    }
1315
    taosArraySort(pWinStates, winKeyCmprImpl);
1316
    streamStateFreeCur(pCur);
1317
  }
1318

1319
  int32_t size = taosArrayGetSize(pWinStates);
1320
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1321
  if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
1322
    // find the first position which is smaller than the pKey
1323
    if (index >= 0) {
1324
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1325
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1326
        goto _end;
1327
      }
1328
    }
1329
    index++;
1330
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1331
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1332
  }
1333

1334
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1335
    int32_t num = size - NUM_OF_CACHE_WIN;
1336
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1337
  }
1338

1339
_end:
1340
  if (code != TSDB_CODE_SUCCESS) {
1341
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1342
  }
1343
  return code;
1344
}
1345
#endif
1346

UNCOV
1347
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
×
1348
                           int32_t* pVLen, int32_t* pWinCode) {
UNCOV
1349
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1350
  int32_t    lino = 0;
×
UNCOV
1351
  SArray*    pWinStates = NULL;
×
UNCOV
1352
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
×
UNCOV
1353
  void*      pState = getStateFileStore(pFileState);
×
UNCOV
1354
  void**     ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
UNCOV
1355
  if (ppBuff) {
×
UNCOV
1356
    pWinStates = (SArray*)(*ppBuff);
×
1357
  } else {
1358
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1359
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1360
    void*            tmpVal = NULL;
×
1361
    int32_t          len = 0;
×
1362
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1363
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1364
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1365
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1366
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1367
        QUERY_CHECK_CODE(code, lino, _end);
×
1368
      }
1369
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1370
      taosMemoryFreeClear(tmpVal);
×
1371
      *pVLen = getRowStateRowSize(pFileState);
×
1372
      (*ppVal) = pNewPos;
×
1373
    }
1374
    streamStateFreeCur(pCur);
×
1375
    return code;
×
1376
  }
UNCOV
1377
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1378
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
UNCOV
1379
  if (index >= 0) {
×
UNCOV
1380
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
×
UNCOV
1381
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
×
UNCOV
1382
      index--;
×
1383
    } else {
1384
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
×
1385
    }
1386
  }
UNCOV
1387
  if (index == -1) {
×
UNCOV
1388
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
UNCOV
1389
    void*            tmpVal = NULL;
×
UNCOV
1390
    int32_t          len = 0;
×
UNCOV
1391
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
UNCOV
1392
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1393
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1394
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1395
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1396
        QUERY_CHECK_CODE(code, lino, _end);
×
1397
      }
1398
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1399
      taosMemoryFreeClear(tmpVal);
×
1400
      *pVLen = getRowStateRowSize(pFileState);
×
1401
      (*ppVal) = pNewPos;
×
1402
    }
UNCOV
1403
    streamStateFreeCur(pCur);
×
UNCOV
1404
    return code;
×
1405
  } else {
UNCOV
1406
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
×
UNCOV
1407
    *pResKey = *pPrevKey;
×
UNCOV
1408
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
×
1409
  }
1410
  (*pWinCode) = TSDB_CODE_FAILED;
1411

1412
_end:
×
1413
  if (code != TSDB_CODE_SUCCESS) {
×
1414
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1415
  }
1416
  return code;
×
1417
}
1418

UNCOV
1419
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
×
UNCOV
1420
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1421
  int32_t lino = 0;
×
UNCOV
1422
  int32_t size = taosArrayGetSize(pWinStates);
×
UNCOV
1423
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
UNCOV
1424
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
×
UNCOV
1425
    if (index >= 0) {
×
UNCOV
1426
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
×
UNCOV
1427
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
×
UNCOV
1428
        goto _end;
×
1429
      }
1430
    }
UNCOV
1431
    index++;
×
UNCOV
1432
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
×
UNCOV
1433
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1434
  }
1435

UNCOV
1436
  if (size >= MAX_NUM_OF_CACHE_WIN) {
×
1437
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1438
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1439
  }
UNCOV
1440
_end:
×
UNCOV
1441
  if (code != TSDB_CODE_SUCCESS) {
×
1442
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1443
  }
UNCOV
1444
  return code;
×
1445
}
1446

UNCOV
1447
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
×
UNCOV
1448
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1449
  int32_t lino = 0; 
×
UNCOV
1450
  SArray*    pWinStates = NULL;
×
UNCOV
1451
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
×
UNCOV
1452
  if (ppBuff) {
×
UNCOV
1453
    pWinStates = (SArray*)(*ppBuff);
×
1454
  } else {
UNCOV
1455
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
×
UNCOV
1456
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
×
1457

UNCOV
1458
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
UNCOV
1459
    QUERY_CHECK_CODE(code, lino, _end);
×
1460
  }
1461

UNCOV
1462
  (*ppResStates) = pWinStates;
×
1463

UNCOV
1464
_end:
×
UNCOV
1465
  if (code != TSDB_CODE_SUCCESS) {
×
1466
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1467
  }
UNCOV
1468
  return code;
×
1469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc