• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.39
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "thash.h"
24
#include "tsimplehash.h"
25

26
#define FLUSH_RATIO                    0.5
27
#define FLUSH_NUM                      4
28
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
29
#define MIN_NUM_OF_ROW_BUFF            10240
30
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
31
#define MIN_NUM_SEARCH_BUCKET          128
32
#define MAX_ARRAY_SIZE                 1024
33
#define MAX_GROUP_ID_NUM               200000
34
#define NUM_OF_CACHE_WIN               64
35
#define MAX_NUM_OF_CACHE_WIN           128
36
#define MIN_NUM_OF_SORT_CACHE_WIN      40960
37
#define BATCH_LIMIT                    256
38

39
#define DEFAULT_STATE_MAP_CAPACITY 10240
40
#define MAX_STATE_MAP_SIZE         10240000
41

42
#define SET_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) |= 1)
43
#define UNSET_TSDATA_FLAG(ptr, len) ((*(char*)POINTER_SHIFT(ptr, (len - 1))) &= 0)
44
#define HAS_TSDATA_FLAG(ptr, len)   ((*(char*)POINTER_SHIFT(ptr, (len - 1))) & 1)
45

46
#define TASK_KEY               "streamFileState"
47
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
48

49
struct SStreamFileState {
50
  SList*     usedBuffs;
51
  SList*     freeBuffs;
52
  void*      rowStateBuff;
53
  void*      pFileStore;
54
  int32_t    rowSize;
55
  int32_t    selectivityRowSize;
56
  int32_t    keyLen;
57
  uint64_t   preCheckPointVersion;
58
  uint64_t   checkPointVersion;
59
  TSKEY      maxTs;
60
  TSKEY      deleteMark;
61
  TSKEY      flushMark;
62
  uint64_t   maxRowCount;
63
  uint64_t   curRowCount;
64
  GetTsFun   getTs;
65
  char*      id;
66
  char*      cfName;
67
  void*      searchBuff;
68
  SSHashObj* pGroupIdMap;
69
  bool       hasFillCatch;
70
  SSHashObj* pRecFlagMap;
71

72
  _state_buff_cleanup_fn         stateBuffCleanupFn;
73
  _state_buff_remove_fn          stateBuffRemoveFn;
74
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
75
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
76

77
  _state_file_remove_fn stateFileRemoveFn;
78
  _state_file_get_fn    stateFileGetFn;
79

80
  _state_fun_get_fn stateFunctionGetFn;
81
};
82

83
typedef SRowBuffPos SRowBuffInfo;
84

85
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
5,761✔
86
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
5,761✔
87
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
5,761✔
88
}
89

90
int fillTSKeyCompare(const void* pKey1, const void* pDatas, int pos) {
4,278✔
91
  SWinKey* pWin1 = (SWinKey*)pKey1;
4,278✔
92
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
4,278✔
93
  if (pWin1->ts > pWin2->ts) {
4,277✔
94
    return 1;
1,421✔
95
  } else if (pWin1->ts < pWin2->ts) {
2,856✔
96
    return -1;
851✔
97
  }
98

99
  return 0;
2,005✔
100
}
101

102
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
3,709✔
103
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
3,709✔
104
  if (pos) {
3,707✔
105
    (*pos)->beFlushed = true;
2,077✔
106
    (*pos)->invalid = true;
2,077✔
107
  }
108
  return tSimpleHashRemove(pBuff, pKey, keyLen);
3,707✔
109
}
110

111
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,269,634✔
112
  size_t        keyLen = pFileState->keyLen;
6,269,634✔
113
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,269,634✔
114
  if (ppPos) {
6,269,634✔
115
    if ((*ppPos) == pPos) {
6,269,552!
116
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
6,269,552✔
117
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
6,269,552✔
118
    }
119
  }
120
}
6,269,634✔
121

122
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
123

124
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
4,277✔
125

126
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,165✔
127
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,165✔
128
}
129

130
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
171✔
131
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
171✔
132
}
133

134
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
602,828✔
135
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
602,828!
136
  if (pStateKey == NULL) {
602,828!
137
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
138
    return NULL;
×
139
  }
140
  SWinKey* pWinKey = pPos->pKey;
602,828✔
141
  pStateKey->key = *pWinKey;
602,828✔
142
  pStateKey->opNum = num;
602,828✔
143
  return pStateKey;
602,828✔
144
}
145

146
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
19✔
147
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
19!
148
  if (pStateKey == NULL) {
19!
149
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
150
    return NULL;
×
151
  }
152
  SWinKey* pWinKey = pPos->pKey;
19✔
153
  *pStateKey = *pWinKey;
19✔
154
  return pStateKey;
19✔
155
}
156

157
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
1,964✔
158
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
1,964✔
159
}
160

161
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
270✔
162
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
270✔
163
}
164

165
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,003✔
166
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
1,003!
167
  if (pStateKey == NULL) {
1,003!
168
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
169
    return NULL;
×
170
  }
171
  SSessionKey* pWinKey = pPos->pKey;
1,003✔
172
  pStateKey->key = *pWinKey;
1,003✔
173
  pStateKey->opNum = num;
1,003✔
174
  return pStateKey;
1,003✔
175
}
176

177
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
11!
178

179
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
1,152✔
180
  *pLen = sizeof(TSKEY);
1,152✔
181
  (*pVal) = taosMemoryCalloc(1, *pLen);
1,152!
182
  if ((*pVal) == NULL) {
1,152!
183
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
184
    return terrno;
×
185
  }
186
  void*   buff = *pVal;
1,152✔
187
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
1,152!
188
  return TSDB_CODE_SUCCESS;
1,152✔
189
}
190

191
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
6,066✔
192
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
193
                            SStreamFileState** ppFileState) {
194
  int32_t code = TSDB_CODE_SUCCESS;
6,066✔
195
  int32_t lino = 0;
6,066✔
196
  if (memSize <= 0) {
6,066!
197
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
198
  }
199
  if (rowSize == 0) {
6,066!
200
    code = TSDB_CODE_INVALID_PARA;
×
201
    QUERY_CHECK_CODE(code, lino, _end);
×
202
  }
203

204
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
6,066!
205
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
6,070!
206

207
  rowSize += selectRowSize;
6,070✔
208
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
6,070✔
209
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
6,070✔
210
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,069!
211

212
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
6,069✔
213
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,070!
214

215
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
6,070✔
216
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
6,070✔
217
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,070✔
218
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
3,992✔
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
3,992✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
3,992✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
3,992✔
222
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
3,992✔
223

224
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
3,992✔
225
    pFileState->stateFileGetFn = intervalFileGetFn;
3,992✔
226
    pFileState->cfName = taosStrdup("state");
3,992!
227
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
3,992✔
228
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,078✔
229
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,792✔
230
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,792✔
231
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,792✔
232
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,792✔
233
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,792✔
234

235
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,792✔
236
    pFileState->stateFileGetFn = sessionFileGetFn;
1,792✔
237
    pFileState->cfName = taosStrdup("sess");
1,792!
238
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,792✔
239
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
286!
240
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
286✔
241
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
286✔
242
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
286!
243
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
286✔
244
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
286✔
245
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
286✔
246
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
286✔
247

248
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
286✔
249
    pFileState->stateFileGetFn = hashSortFileGetFn;
286✔
250
    pFileState->cfName = taosStrdup("fill");
286!
251
    pFileState->stateFunctionGetFn = NULL;
286✔
252
  }
253

254
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
6,070!
255
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
6,070!
256
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
6,070!
257
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
6,070!
258

259
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,070✔
260
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
136✔
261
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
136!
262
  }
263

264
  pFileState->keyLen = keySize;
6,070✔
265
  pFileState->rowSize = rowSize;
6,070✔
266
  pFileState->selectivityRowSize = selectRowSize;
6,070✔
267
  pFileState->preCheckPointVersion = 0;
6,070✔
268
  pFileState->checkPointVersion = 1;
6,070✔
269
  pFileState->pFileStore = pFile;
6,070✔
270
  pFileState->getTs = fp;
6,070✔
271
  pFileState->curRowCount = 0;
6,070✔
272
  pFileState->deleteMark = delMark;
6,070✔
273
  pFileState->flushMark = INT64_MIN;
6,070✔
274
  pFileState->maxTs = INT64_MIN;
6,070✔
275
  pFileState->id = taosStrdup(taskId);
6,070!
276
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
6,069!
277

278
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
6,069✔
279
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
6,069!
280

281
  pFileState->pRecFlagMap = tSimpleHashInit(1024, hashFn);
6,069✔
282
  QUERY_CHECK_NULL(pFileState->pRecFlagMap, code, lino, _end, terrno);
6,069!
283

284

285
  pFileState->hasFillCatch = true;
6,069✔
286

287
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
6,069✔
288
    code = recoverSnapshot(pFileState, checkpointId);
3,991✔
289
  } else if (type == STREAM_STATE_BUFF_SORT) {
2,078✔
290
    code = recoverSession(pFileState, checkpointId);
1,792✔
291
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
286!
292
    code = recoverFillSnapshot(pFileState, checkpointId);
286✔
293
  }
294
  QUERY_CHECK_CODE(code, lino, _end);
6,070!
295

296
  void*   valBuf = NULL;
6,070✔
297
  int32_t len = 0;
6,070✔
298
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
6,070✔
299
  if (tmpRes == TSDB_CODE_SUCCESS) {
6,070✔
300
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
11!
301
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
11✔
302
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
11✔
303
  }
304
  taosMemoryFreeClear(valBuf);
6,070!
305
  (*ppFileState) = pFileState;
6,070✔
306

307
_end:
6,070✔
308
  if (code != TSDB_CODE_SUCCESS) {
6,070!
309
    streamFileStateDestroy(pFileState);
×
310
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
311
  }
312
  return code;
6,070✔
313
}
314

315
void destroyRowBuffPos(SRowBuffPos* pPos) {
6,867,209✔
316
  taosMemoryFreeClear(pPos->pKey);
6,867,209!
317
  taosMemoryFreeClear(pPos->pRowBuff);
6,867,214!
318
  taosMemoryFree(pPos);
6,867,211!
319
}
6,867,210✔
320

321
void destroyRowBuffPosPtr(void* ptr) {
483,232✔
322
  if (!ptr) {
483,232!
323
    return;
×
324
  }
325
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
483,232✔
326
  if (!pPos->beUsed) {
483,232✔
327
    destroyRowBuffPos(pPos);
300✔
328
  }
329
}
330

331
void destroyRowBuffAllPosPtr(void* ptr) {
848,398✔
332
  if (!ptr) {
848,398!
333
    return;
×
334
  }
335
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
848,398✔
336
  destroyRowBuffPos(pPos);
848,398✔
337
}
338

339
void destroyRowBuff(void* ptr) {
5,882,644✔
340
  if (!ptr) {
5,882,644!
341
    return;
×
342
  }
343
  taosMemoryFree(*(void**)ptr);
5,882,644!
344
}
345

346
void streamFileStateDestroy(SStreamFileState* pFileState) {
13,512✔
347
  if (!pFileState) {
13,512✔
348
    return;
7,443✔
349
  }
350

351
  taosMemoryFree(pFileState->id);
6,069!
352
  taosMemoryFree(pFileState->cfName);
6,069!
353
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
6,069✔
354
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
6,069✔
355
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
6,069✔
356
  sessionWinStateCleanup(pFileState->searchBuff);
6,069✔
357
  tSimpleHashCleanup(pFileState->pGroupIdMap);
6,068✔
358
  tSimpleHashCleanup(pFileState->pRecFlagMap);
6,069✔
359
  taosMemoryFree(pFileState);
6,069!
360
}
361

362
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
6,498,413✔
363
  int32_t code = TSDB_CODE_SUCCESS;
6,498,413✔
364
  int32_t lino = 0;
6,498,413✔
365
  if (pPos->pRowBuff) {
6,498,413✔
366
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
6,498,383✔
367
    QUERY_CHECK_CODE(code, lino, _end);
6,498,383!
368
    pPos->pRowBuff = NULL;
6,498,383✔
369
  }
370

371
_end:
30✔
372
  if (code != TSDB_CODE_SUCCESS) {
6,498,413!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
  return code;
6,498,413✔
376
}
377

378
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
5,184✔
379
  int32_t   code = TSDB_CODE_SUCCESS;
5,184✔
380
  int32_t   lino = 0;
5,184✔
381
  SListIter iter = {0};
5,184✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,184✔
383

384
  SListNode* pNode = NULL;
5,184✔
385
  while ((pNode = tdListNext(&iter)) != NULL) {
6,140,736✔
386
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
6,135,552✔
387
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
6,135,552!
388
      code = putFreeBuff(pFileState, pPos);
6,014,013✔
389
      QUERY_CHECK_CODE(code, lino, _end);
6,014,013!
390

391
      if (!all) {
6,014,013✔
392
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,660✔
393
      }
394
      destroyRowBuffPos(pPos);
6,014,013✔
395
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
6,014,013✔
396
      taosMemoryFreeClear(tmp);
6,014,013!
397
    }
398
  }
399

400
_end:
5,184✔
401
  if (code != TSDB_CODE_SUCCESS) {
5,184!
402
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
403
  }
404
}
5,184✔
405

406
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
212✔
407
  int32_t   code = TSDB_CODE_SUCCESS;
212✔
408
  int32_t   lino = 0;
212✔
409
  uint64_t  i = 0;
212✔
410
  SListIter iter = {0};
212✔
411
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
212✔
412

413
  SListNode* pNode = NULL;
212✔
414
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
1,692,616!
415
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,692,404✔
416
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
1,692,404✔
417
      if (all || !pPos->beUsed) {
730,522!
418
        if (all && !pPos->pRowBuff) {
284!
419
          continue;
×
420
        }
421
        code = tdListAppend(pFlushList, &pPos);
284✔
422
        QUERY_CHECK_CODE(code, lino, _end);
284!
423

424
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
284✔
425
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
284✔
426
        if (pPos->beUsed == false) {
284!
427
          SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
284✔
428
          taosMemoryFreeClear(tmp);
284!
429
        }
430
        if (pPos->pRowBuff) {
284✔
431
          i++;
254✔
432
        }
433
      }
434
    }
435
  }
436
  qDebug("clear flushed row buff. %d rows to disk. is all:%d", listNEles(pFlushList), all);
212✔
437

438
_end:
4✔
439
  if (code != TSDB_CODE_SUCCESS) {
212!
440
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
441
  }
442
  return code;
212✔
443
}
444

445
void streamFileStateClear(SStreamFileState* pFileState) {
1,712✔
446
  pFileState->flushMark = INT64_MIN;
1,712✔
447
  pFileState->maxTs = INT64_MIN;
1,712✔
448
  tSimpleHashClear(pFileState->rowStateBuff);
1,712✔
449
  clearExpiredRowBuff(pFileState, 0, true);
1,712✔
450
}
1,712✔
451

452
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
369✔
453

454
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
6,283,010✔
455

456
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
80✔
457
  int32_t   code = TSDB_CODE_SUCCESS;
80✔
458
  int32_t   lino = 0;
80✔
459
  uint64_t  i = 0;
80✔
460
  SListIter iter = {0};
80✔
461
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
80✔
462

463
  SListNode* pNode = NULL;
80✔
464
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
2,897,610✔
465
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,897,530✔
466
    if (pPos->beUsed == used) {
2,897,530✔
467
      if (used && !pPos->pRowBuff) {
1,207,318✔
468
        continue;
724,370✔
469
      }
470
      code = tdListAppend(pFlushList, &pPos);
482,948✔
471
      QUERY_CHECK_CODE(code, lino, _end);
482,948!
472

473
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
482,948✔
474
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
482,948✔
475
      if (pPos->beUsed == false) {
482,948✔
476
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
16✔
477
        taosMemoryFreeClear(tmp);
16!
478
      }
479
      if (pPos->pRowBuff) {
482,948!
480
        i++;
482,948✔
481
      }
482
    }
483
  }
484

485
  qInfo("%s stream state flush %d rows to disk. is used:%d", pFileState->id, listNEles(pFlushList), used);
80!
486

487
_end:
×
488
  if (code != TSDB_CODE_SUCCESS) {
80!
489
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
490
  }
491
  return code;
80✔
492
}
493

494
int32_t flushRowBuff(SStreamFileState* pFileState) {
212✔
495
  int32_t          code = TSDB_CODE_SUCCESS;
212✔
496
  int32_t          lino = 0;
212✔
497
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
212✔
498
  if (!pFlushList) {
212!
499
    code = TSDB_CODE_OUT_OF_MEMORY;
×
500
    QUERY_CHECK_CODE(code, lino, _end);
×
501
  }
502

503
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
212✔
504
  num = TMAX(num, FLUSH_NUM);
212✔
505
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
212✔
506
  QUERY_CHECK_CODE(code, lino, _end);
212!
507

508
  if (isListEmpty(pFlushList)) {
212✔
509
    code = popUsedBuffs(pFileState, pFlushList, num, false);
42✔
510
    QUERY_CHECK_CODE(code, lino, _end);
42!
511

512
    if (isListEmpty(pFlushList)) {
42✔
513
      code = popUsedBuffs(pFileState, pFlushList, num, true);
38✔
514
      QUERY_CHECK_CODE(code, lino, _end);
38!
515
    }
516
  }
517

518
  if (pFileState->searchBuff) {
212!
519
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
×
520
    QUERY_CHECK_CODE(code, lino, _end);
×
521
  }
522

523
  flushSnapshot(pFileState, pFlushList, false);
212✔
524

525
  SListIter fIter = {0};
212✔
526
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
212✔
527
  SListNode* pNode = NULL;
212✔
528
  while ((pNode = tdListNext(&fIter)) != NULL) {
483,444✔
529
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
483,232✔
530
    code = putFreeBuff(pFileState, pPos);
483,232✔
531
    QUERY_CHECK_CODE(code, lino, _end);
483,232!
532
  }
533

534
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
212✔
535

536
_end:
212✔
537
  if (code != TSDB_CODE_SUCCESS) {
212!
538
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
539
  }
540
  return code;
212✔
541
}
542

543
int32_t clearRowBuff(SStreamFileState* pFileState) {
212✔
544
  if (pFileState->deleteMark != INT64_MAX) {
212!
545
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
546
  }
547
  do {
548
    int32_t code = flushRowBuff(pFileState);
212✔
549
    if (code != TSDB_CODE_SUCCESS) {
212!
550
      return code;
×
551
    }
552
  } while (isListEmpty(pFileState->freeBuffs) && pFileState->curRowCount == pFileState->maxRowCount);
212!
553
  return TSDB_CODE_SUCCESS;
212✔
554
}
555

556
int32_t clearFlushedRowBuffByFlag(SStreamFileState* pFileState, uint64_t max) {
5,272✔
557
  int32_t   code = TSDB_CODE_SUCCESS;
5,272✔
558
  int32_t   lino = 0;
5,272✔
559
  uint64_t  i = 0;
5,272✔
560
  SListIter iter = {0};
5,272✔
561
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,272✔
562

563
  SListNode* pNode = NULL;
5,270✔
564
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
8,705✔
565
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
3,435✔
566
    if (pPos->invalid) {
3,435✔
567
      if (!pPos->beUsed) {
504!
568
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
504✔
569
        taosMemoryFreeClear(tmp);
504!
570
        if (pPos->pRowBuff) {
504!
571
          i++;
504✔
572
        }
573
        code = putFreeBuff(pFileState, pPos);
504✔
574
        QUERY_CHECK_CODE(code, lino, _end);
504!
575
        destroyRowBuffPos(pPos);
504✔
576
      }
577
    }
578
  }
579

580
_end:
5,271✔
581
  if (code != TSDB_CODE_SUCCESS) {
5,271!
582
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
583
  }
584
  return code;
5,271✔
585
}
586

587
int32_t clearRowBuffNonFlush(SStreamFileState* pFileState) {
5,272✔
588
  int32_t code = TSDB_CODE_SUCCESS;
5,272✔
589
  int32_t lino = 0;
5,272✔
590

591
  if (pFileState->deleteMark != INT64_MAX) {
5,272!
592
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
593
  }
594

595
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
5,272✔
596
  num = TMAX(num, FLUSH_NUM);
5,272✔
597
  code = clearFlushedRowBuffByFlag(pFileState, num);
5,272✔
598
  QUERY_CHECK_CODE(code, lino, _end);
5,271!
599

600
_end:
5,271✔
601
  if (code != TSDB_CODE_SUCCESS) {
5,271!
602
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
603
  }
604
  return code;
5,271✔
605
}
606

607
void* getFreeBuff(SStreamFileState* pFileState) {
6,867,328✔
608
  SList*     lists = pFileState->freeBuffs;
6,867,328✔
609
  int32_t    buffSize = pFileState->rowSize;
6,867,328✔
610
  SListNode* pNode = tdListPopHead(lists);
6,867,328✔
611
  if (!pNode) {
6,867,242✔
612
    return NULL;
6,369,785✔
613
  }
614
  void* ptr = *(void**)pNode->data;
497,457✔
615
  memset(ptr, 0, buffSize);
497,457✔
616
  taosMemoryFree(pNode);
497,457!
617
  return ptr;
497,455✔
618
}
619

620
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
621
  if (pPos->pRowBuff) {
×
622
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
623
  }
624
}
×
625

626
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
6,866,516✔
627
  int32_t      code = TSDB_CODE_SUCCESS;
6,866,516✔
628
  int32_t      lino = 0;
6,866,516✔
629
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
6,866,516!
630
  if (!pPos) {
6,866,966!
631
    code = terrno;
×
632
    QUERY_CHECK_CODE(code, lino, _error);
×
633
  }
634

635
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
6,866,966!
636
  if (!pPos->pKey) {
6,866,858!
637
    code = terrno;
×
638
    QUERY_CHECK_CODE(code, lino, _error);
×
639
  }
640

641
  void* pBuff = getFreeBuff(pFileState);
6,866,858✔
642
  if (pBuff) {
6,866,647✔
643
    pPos->pRowBuff = pBuff;
496,970✔
644
    goto _end;
496,970✔
645
  }
646

647
  if (pFileState->curRowCount < pFileState->maxRowCount) {
6,369,677✔
648
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
6,369,553!
649
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
6,369,467!
650
    pPos->pRowBuff = pBuff;
6,369,467✔
651
    pFileState->curRowCount++;
6,369,467✔
652
    goto _end;
6,369,467✔
653
  }
654

655
  code = clearRowBuff(pFileState);
124✔
656
  QUERY_CHECK_CODE(code, lino, _error);
128!
657

658
  pPos->pRowBuff = getFreeBuff(pFileState);
128✔
659

660
_end:
6,866,565✔
661
  code = tdListAppend(pFileState->usedBuffs, &pPos);
6,866,565✔
662
  QUERY_CHECK_CODE(code, lino, _error);
6,866,753!
663

664
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
6,866,753!
665
_error:
6,866,753✔
666
  if (code != TSDB_CODE_SUCCESS) {
6,866,753!
667
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
668
    return NULL;
×
669
  }
670

671
  return pPos;
6,866,753✔
672
}
673

674
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
6,866,655✔
675
  int32_t      code = TSDB_CODE_SUCCESS;
6,866,655✔
676
  int32_t      lino = 0;
6,866,655✔
677
  SRowBuffPos* newPos = getNewRowPos(pFileState);
6,866,655✔
678
  if (!newPos) {
6,866,731!
679
    code = TSDB_CODE_OUT_OF_MEMORY;
×
680
    QUERY_CHECK_CODE(code, lino, _error);
×
681
  }
682
  newPos->beUsed = true;
6,866,731✔
683
  newPos->beFlushed = false;
6,866,731✔
684
  newPos->needFree = false;
6,866,731✔
685
  newPos->beUpdated = true;
6,866,731✔
686
  newPos->invalid = false;
6,866,731✔
687
  return newPos;
6,866,731✔
688

689
_error:
×
690
  if (code != TSDB_CODE_SUCCESS) {
×
691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
692
  }
693
  return NULL;
×
694
}
695

696
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
7,171,953✔
697
                             int32_t* pWinCode) {
698
  int32_t code = TSDB_CODE_SUCCESS;
7,171,953✔
699
  int32_t lino = 0;
7,171,953✔
700
  (*pWinCode) = TSDB_CODE_SUCCESS;
7,171,953✔
701
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
7,171,953✔
702
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
7,171,840✔
703
  if (pos) {
7,172,999✔
704
    if (pVal != NULL) {
317,398!
705
      *pVLen = pFileState->rowSize;
317,399✔
706
      *pVal = *pos;
317,399✔
707
      (*pos)->beUsed = true;
317,399✔
708
      (*pos)->beFlushed = false;
317,399✔
709
    }
710
    goto _end;
317,398✔
711
  }
712
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
6,855,601✔
713
  if (!pNewPos || !pNewPos->pRowBuff) {
6,855,580!
714
    code = TSDB_CODE_OUT_OF_MEMORY;
2✔
715
    QUERY_CHECK_CODE(code, lino, _end);
2!
716
  }
717

718
  memcpy(pNewPos->pKey, pKey, keyLen);
6,855,580✔
719
  (*pWinCode) = TSDB_CODE_FAILED;
6,855,580✔
720

721
  TSKEY ts = pFileState->getTs(pKey);
6,855,580✔
722
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
6,855,540✔
723
    int32_t len = 0;
91✔
724
    void*   p = NULL;
91✔
725
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
91✔
726
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
91✔
727
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
91✔
728
      memcpy(pNewPos->pRowBuff, p, len);
25✔
729
    }
730
    taosMemoryFree(p);
91!
731
  }
732

733
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
6,855,247✔
734
  QUERY_CHECK_CODE(code, lino, _end);
6,854,979!
735

736
  if (pVal) {
6,854,979✔
737
    *pVLen = pFileState->rowSize;
6,854,900✔
738
    *pVal = pNewPos;
6,854,900✔
739
  }
740

741
_end:
79✔
742
  if (code != TSDB_CODE_SUCCESS) {
7,172,377!
743
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
744
  }
745
  return code;
7,172,280✔
746
}
747

748
int32_t createRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
292✔
749
  int32_t code = TSDB_CODE_SUCCESS;
292✔
750
  int32_t lino = 0;
292✔
751

752
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
292✔
753

754
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
292✔
755
  if (!pNewPos || !pNewPos->pRowBuff) {
292!
756
    code = TSDB_CODE_OUT_OF_MEMORY;
×
757
    QUERY_CHECK_CODE(code, lino, _end);
×
758
  }
759

760
  memcpy(pNewPos->pKey, pKey, keyLen);
292✔
761

762
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
292✔
763
  QUERY_CHECK_CODE(code, lino, _end);
292!
764

765
  if (pVal) {
292!
766
    *pVLen = pFileState->rowSize;
292✔
767
    *pVal = pNewPos;
292✔
768
  }
769

770
_end:
×
771
  if (code != TSDB_CODE_SUCCESS) {
292!
772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
773
  }
774
  return code;
292✔
775
}
776

777
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
5,167✔
778
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
5,167✔
779
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
5,169✔
780
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
5,169✔
781
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
5,168✔
782
  if (pFileState->searchBuff != NULL) {
5,168✔
783
    deleteHashSortRowBuff(pFileState, pKey);
41✔
784
  }
785
}
5,168✔
786

787
void deleteRowBuffByGroupId(SStreamFileState* pFileState, uint64_t groupId) {
55✔
788
  SSHashObj* pRowMap = pFileState->rowStateBuff;
55✔
789
  void*   pIte = NULL;
55✔
790
  int32_t iter = 0;
55✔
791
  while ((pIte = tSimpleHashIterate(pRowMap, pIte, &iter)) != NULL) {
115✔
792
    size_t keyLen = 0;
60✔
793
    SWinKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
60!
794
    if (pKey->groupId == groupId) {
60!
795
      int32_t tmpRes = tSimpleHashIterateRemove(pRowMap, pKey, keyLen, &pIte, &iter);
60✔
796
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
60!
797
    }
798
  }
799

800
  while (1) {
340✔
801
    SWinKey tmp = {.ts = INT64_MIN, .groupId = groupId};
395✔
802
    SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pFileState->pFileStore, &tmp);
395✔
803
    SWinKey delKey = {.groupId = groupId};
395✔
804
    int32_t code = streamStateGetGroupKVByCur_rocksdb(pFileState->pFileStore, pCur, &delKey, NULL, 0);
395✔
805
    if (code != TSDB_CODE_SUCCESS) {
395✔
806
      break;
55✔
807
    }
808
    code = streamStateDel_rocksdb(pFileState->pFileStore, &delKey);
340✔
809
    qTrace("%s at line %d res:%d", __func__, __LINE__, code);
340!
810
  }
811
}
55✔
812

813
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
358✔
814
  int32_t code = TSDB_CODE_SUCCESS;
358✔
815
  int32_t lino = 0;
358✔
816
  int32_t len = 0;
358✔
817
  void*   pBuff = NULL;
358✔
818
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
358✔
819
  QUERY_CHECK_CODE(code, lino, _end);
358!
820
  memcpy(pPos->pRowBuff, pBuff, len);
358✔
821
  taosMemoryFree(pBuff);
358!
822

823
_end:
358✔
824
  if (code != TSDB_CODE_SUCCESS) {
358!
825
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
826
  }
827
  return code;
358✔
828
}
829

830
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
358✔
831
  int32_t code = TSDB_CODE_SUCCESS;
358✔
832
  int32_t lino = 0;
358✔
833
  pPos->pRowBuff = getFreeBuff(pFileState);
358✔
834
  if (!pPos->pRowBuff) {
358✔
835
    if (pFileState->curRowCount < pFileState->maxRowCount) {
85✔
836
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
1!
837
      if (!pPos->pRowBuff) {
1!
838
        code = terrno;
×
839
        QUERY_CHECK_CODE(code, lino, _end);
×
840
      }
841
      pFileState->curRowCount++;
1✔
842
    } else {
843
      code = clearRowBuff(pFileState);
84✔
844
      QUERY_CHECK_CODE(code, lino, _end);
84!
845
      pPos->pRowBuff = getFreeBuff(pFileState);
84✔
846
    }
847
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
85!
848
  }
849

850
  code = recoverSessionRowBuff(pFileState, pPos);
358✔
851
  QUERY_CHECK_CODE(code, lino, _end);
358!
852

853
_end:
358✔
854
  if (code != TSDB_CODE_SUCCESS) {
358!
855
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
856
  }
857
  return code;
358✔
858
}
859

860
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
6,253,706✔
861
  int32_t code = TSDB_CODE_SUCCESS;
6,253,706✔
862
  int32_t lino = 0;
6,253,706✔
863
  if (pPos->pRowBuff) {
6,253,706✔
864
    if (pPos->needFree) {
6,253,387!
865
      code = recoverSessionRowBuff(pFileState, pPos);
×
866
      QUERY_CHECK_CODE(code, lino, _end);
×
867
    }
868
    (*pVal) = pPos->pRowBuff;
6,253,387✔
869
    goto _end;
6,253,387✔
870
  }
871

872
  code = recoverStateRowBuff(pFileState, pPos);
319✔
873
  QUERY_CHECK_CODE(code, lino, _end);
358!
874

875
  (*pVal) = pPos->pRowBuff;
358✔
876
  // if (!pPos->needFree) {
877
  //   code = tdListPrepend(pFileState->usedBuffs, &pPos);
878
  //   QUERY_CHECK_CODE(code, lino, _end);
879
  // }
880

881
_end:
6,253,745✔
882
  if (code != TSDB_CODE_SUCCESS) {
6,253,745!
883
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
884
  }
885
  return code;
6,253,745✔
886
}
887

888
bool hasRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, bool hasLimit, bool* pIsLast) {
28✔
889
  bool res = false;
28✔
890
  if (pIsLast != NULL) {
28!
891
    (*pIsLast) = false;
×
892
  }
893
  
894
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
28✔
895
  if (pos) {
28✔
896
    res = true;
12✔
897
  }
898
  void* pSearchBuff = getSearchBuff(pFileState);
28✔
899
  if (pSearchBuff != NULL) {
28✔
900
    void** ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
3✔
901
    if (ppBuff != NULL) {
3✔
902
      SArray* pWinStates = (SArray*)(*ppBuff);
2✔
903
      if (pIsLast != NULL) {
2!
904
        SWinKey* pLastKey = (SWinKey*) taosArrayGetLast(pWinStates);
×
905
        *pIsLast = (winKeyCmprImpl(pKey, pLastKey) == 0);
×
906
      }
907
      if (hasLimit && taosArrayGetSize(pWinStates) <= MIN_NUM_OF_SORT_CACHE_WIN) {
2!
908
        res = true;
2✔
909
      }
910
      if (qDebugFlag & DEBUG_DEBUG) {
2!
911
        SWinKey* fistKey = (SWinKey*)taosArrayGet(pWinStates, 0);
2✔
912
        qDebug("===stream===check window state. buff min ts:%" PRId64 ",groupId:%" PRIu64 ".key ts:%" PRId64
2!
913
               ",groupId:%" PRIu64,
914
               fistKey->ts, fistKey->groupId, pKey->ts, pKey->groupId);
915
      }
916
    } else {
917
      res = true;
1✔
918
    }
919
  }
920
  return res;
28✔
921
}
922

923
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
3,472✔
924
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,901✔
925
                     ? INT64_MIN
926
                     : pFileState->maxTs - pFileState->deleteMark;
5,373✔
927
  clearExpiredRowBuff(pFileState, mark, false);
3,472✔
928
  return pFileState->usedBuffs;
3,472✔
929
}
930

931
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
3,684✔
932
  int32_t   code = TSDB_CODE_SUCCESS;
3,684✔
933
  int32_t   lino = 0;
3,684✔
934
  SListIter iter = {0};
3,684✔
935
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
3,684✔
936

937
  int64_t    st = taosGetTimestampMs();
3,684✔
938
  SListNode* pNode = NULL;
3,684✔
939

940
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
3,684✔
941

942
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
3,684✔
943
  char*   buf = taosMemoryCalloc(1, len);
3,684!
944
  if (!buf) {
3,684!
945
    code = terrno;
×
946
    QUERY_CHECK_CODE(code, lino, _end);
×
947
  }
948

949
  void* batch = streamStateCreateBatch();
3,684✔
950
  if (!batch) {
3,684!
951
    code = TSDB_CODE_OUT_OF_MEMORY;
×
952
    QUERY_CHECK_CODE(code, lino, _end);
×
953
  }
954

955
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
608,455!
956
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
604,771✔
957
    if (pPos->beFlushed || !pPos->pRowBuff) {
604,771!
958
      continue;
921✔
959
    }
960
    pPos->beFlushed = true;
603,850✔
961
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
603,850✔
962

963
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
603,850✔
964
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
603,850✔
965
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
2,284✔
966
      streamStateClearBatch(batch);
2,284✔
967
      QUERY_CHECK_CODE(code, lino, _end);
2,284!
968
    }
969

970
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
603,850✔
971
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
603,850!
972

973
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
603,850✔
974
                                       0, buf);
975
    taosMemoryFreeClear(pSKey);
603,850!
976
    QUERY_CHECK_CODE(code, lino, _end);
603,850!
977
    // todo handle failure
978
    memset(buf, 0, len);
603,850✔
979
  }
980
  taosMemoryFreeClear(buf);
3,684!
981

982
  int32_t numOfElems = streamStateGetBatchSize(batch);
3,684✔
983
  if (numOfElems > 0) {
3,684✔
984
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,195✔
985
    QUERY_CHECK_CODE(code, lino, _end);
1,195!
986
  } else {
987
    goto _end;
2,489✔
988
  }
989

990
  streamStateClearBatch(batch);
1,195✔
991

992
  clearSearchBuff(pFileState);
1,195✔
993

994
  int64_t elapsed = taosGetTimestampMs() - st;
1,195✔
995
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
1,195✔
996
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
997

998
  if (flushState) {
1,195✔
999
    void*   valBuf = NULL;
1,152✔
1000
    int32_t len = 0;
1,152✔
1001
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
1,152✔
1002
    QUERY_CHECK_CODE(code, lino, _end);
1,152!
1003

1004
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
1,152✔
1005
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
1,152✔
1006
    taosMemoryFree(valBuf);
1,152!
1007
    QUERY_CHECK_CODE(code, lino, _end);
1,152!
1008

1009
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,152✔
1010
    QUERY_CHECK_CODE(code, lino, _end);
1,152!
1011
  }
1012

1013
_end:
43✔
1014
  if (code != TSDB_CODE_SUCCESS) {
3,684!
1015
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1016
  }
1017
  taosMemoryFree(buf);
3,684!
1018
  streamStateDestroyBatch(batch);
3,684✔
1019
}
3,684✔
1020

1021
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
1022
  char keyBuf[128] = {0};
×
1023
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64, TASK_KEY, checkpointId));
×
1024
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
1025
}
1026

1027
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
1028
  int32_t code = TSDB_CODE_SUCCESS;
×
1029
  int64_t maxCheckPointId = 0;
×
1030
  {
1031
    char    buf[128] = {0};
×
1032
    void*   val = NULL;
×
1033
    int32_t len = 0;
×
1034
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
1035
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1036
    if (code != 0 || len == 0 || val == NULL) {
×
1037
      return TSDB_CODE_FAILED;
×
1038
    }
1039
    memcpy(buf, val, len);
×
1040
    buf[len] = 0;
×
1041
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
1042
    taosMemoryFree(val);
×
1043
  }
1044
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
1045
    char    buf[128] = {0};
×
1046
    void*   val = 0;
×
1047
    int32_t len = 0;
×
1048
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64, TASK_KEY, i));
×
1049
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
1050
    if (code != 0) {
×
1051
      return TSDB_CODE_FAILED;
×
1052
    }
1053
    memcpy(buf, val, len);
×
1054
    buf[len] = 0;
×
1055
    taosMemoryFree(val);
×
1056

1057
    TSKEY ts;
1058
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
1059
    if (ts < mark) {
×
1060
      // statekey winkey.ts < mark
1061
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
1062
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1063
      break;
×
1064
    }
1065
  }
1066
  return code;
×
1067
}
1068

1069
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,791✔
1070
  int32_t code = TSDB_CODE_SUCCESS;
1,791✔
1071
  int32_t lino = 0;
1,791✔
1072
  int32_t winRes = TSDB_CODE_SUCCESS;
1,791✔
1073
  if (pFileState->maxTs != INT64_MIN) {
1,791!
1074
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1075
                       ? INT64_MIN
1076
                       : pFileState->maxTs - pFileState->deleteMark;
×
1077
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1078
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1079
  }
1080

1081
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,791✔
1082
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,792✔
1083
  while (winRes == TSDB_CODE_SUCCESS) {
1,792!
1084
    if (pFileState->curRowCount >= recoverNum) {
1,792!
1085
      break;
1,792✔
1086
    }
1087

1088
    void*       pVal = NULL;
1,792✔
1089
    int32_t     vlen = 0;
1,792✔
1090
    SSessionKey key = {0};
1,792✔
1091
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,792✔
1092
    if (winRes != TSDB_CODE_SUCCESS) {
1,792!
1093
      break;
1,792✔
1094
    }
1095

1096
    if (vlen != pFileState->rowSize) {
×
1097
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1098
      QUERY_CHECK_CODE(code, lino, _end);
×
1099
    }
1100

1101
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
1102
    pPos->beUsed = false;
×
1103
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
1104
    if (winRes != TSDB_CODE_SUCCESS) {
×
1105
      break;
×
1106
    }
1107

1108
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
1109
  }
1110

1111
_end:
×
1112
  if (code != TSDB_CODE_SUCCESS) {
1,792!
1113
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1114
  }
1115
  streamStateFreeCur(pCur);
1,792✔
1116
  return code;
1,792✔
1117
}
1118

1119
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
3,989✔
1120
  int32_t code = TSDB_CODE_SUCCESS;
3,989✔
1121
  int32_t lino = 0;
3,989✔
1122
  int32_t winCode = TSDB_CODE_SUCCESS;
3,989✔
1123
  if (pFileState->maxTs != INT64_MIN) {
3,989!
1124
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1125
                       ? INT64_MIN
1126
                       : pFileState->maxTs - pFileState->deleteMark;
×
1127
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
1128
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
1129
  }
1130

1131
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
3,989✔
1132
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
3,992✔
1133
  while (winCode == TSDB_CODE_SUCCESS) {
3,992!
1134
    if (pFileState->curRowCount >= recoverNum) {
3,992!
1135
      break;
3,992✔
1136
    }
1137

1138
    void*        pVal = NULL;
3,992✔
1139
    int32_t      vlen = 0;
3,992✔
1140
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
3,992✔
1141
    if (!pNewPos || !pNewPos->pRowBuff) {
3,992!
1142
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1143
      QUERY_CHECK_CODE(code, lino, _end);
×
1144
    }
1145

1146
    winCode =
1147
        streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
3,992✔
1148
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
3,992✔
1149
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
3,992!
1150
      destroyRowBuffPos(pNewPos);
3,992✔
1151
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
3,992✔
1152
      taosMemoryFreeClear(pNode);
3,992!
1153
      taosMemoryFreeClear(pVal);
3,992!
1154
      break;
3,992✔
1155
    }
1156
    if (vlen != pFileState->rowSize) {
×
1157
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1158
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1159
      taosMemoryFreeClear(pVal);
×
1160
      QUERY_CHECK_CODE(code, lino, _end);
×
1161
    }
1162
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1163
    taosMemoryFreeClear(pVal);
×
1164
    pNewPos->beFlushed = true;
×
1165
    pNewPos->beUsed = false;
×
1166
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1167
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1168
    if (code != TSDB_CODE_SUCCESS) {
×
1169
      destroyRowBuffPos(pNewPos);
×
1170
      break;
×
1171
    }
1172
    streamStateCurPrev_rocksdb(pCur);
×
1173
  }
1174

1175
_end:
×
1176
  if (code != TSDB_CODE_SUCCESS) {
3,992!
1177
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1178
  }
1179
  streamStateFreeCur(pCur);
3,992✔
1180
  return code;
3,992✔
1181
}
1182

1183
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
91,218✔
1184

1185
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,479✔
1186
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,479✔
1187
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,479✔
1188
}
1,479✔
1189

1190
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
40,079✔
1191
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
7,085,442✔
1192

1193
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
20,548✔
1194

1195
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
6,856,834✔
1196
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
12,659,276!
1197
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,802,442✔
1198
}
1199

1200
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
5,084,750✔
1201

1202
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
12✔
1203

1204
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
92,156✔
1205

1206
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
91,216✔
1207
  int32_t winCode = TSDB_CODE_SUCCESS;
91,216✔
1208
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
91,216✔
1209
}
1210

1211
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
286✔
1212
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1213
  int32_t lino = 0;
286✔
1214
  if (pFileState->maxTs != INT64_MIN) {
286!
1215
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1216
                       ? INT64_MIN
1217
                       : pFileState->maxTs - pFileState->deleteMark;
×
1218
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1219
    QUERY_CHECK_CODE(code, lino, _end);
×
1220
  }
1221

1222
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
286✔
1223
  if (pCur == NULL) {
286✔
1224
    return code;
284✔
1225
  }
1226
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1227
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1228
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1229
    if (pFileState->curRowCount >= recoverNum) {
4!
1230
      break;
2✔
1231
    }
1232

1233
    void*        pVal = NULL;
4✔
1234
    int32_t      vlen = 0;
4✔
1235
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1236
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1237
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1238
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1239
      destroyRowBuffPos(pNewPos);
2✔
1240
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1241
      taosMemoryFreeClear(pNode);
2!
1242
      taosMemoryFreeClear(pVal);
2!
1243
      break;
2✔
1244
    }
1245

1246
    if (vlen != pFileState->rowSize) {
2!
1247
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1248
      destroyRowBuffPos(pNewPos);
×
1249
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1250
      taosMemoryFreeClear(pVal);
×
1251
      QUERY_CHECK_CODE(code, lino, _end);
×
1252
    }
1253

1254
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1255
    taosMemoryFreeClear(pVal);
2!
1256
    pNewPos->beFlushed = true;
2✔
1257
    pNewPos->beUsed = false;
2✔
1258
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1259
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1260
    if (winRes != TSDB_CODE_SUCCESS) {
2!
1261
      destroyRowBuffPos(pNewPos);
×
1262
      break;
×
1263
    }
1264
    streamStateCurPrev_rocksdb(pCur);
2✔
1265
  }
1266
  streamStateFreeCur(pCur);
2✔
1267

1268
_end:
2✔
1269
  if (code != TSDB_CODE_SUCCESS) {
2!
1270
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1271
  }
1272
  return code;
2✔
1273
}
1274

1275
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
1,956✔
1276
                   int32_t* pWinCode) {
1277
  int32_t code = TSDB_CODE_SUCCESS;
1,956✔
1278
  int32_t lino = 0;
1,956✔
1279
  (*pWinCode) = TSDB_CODE_FAILED;
1,956✔
1280
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
1,956✔
1281
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
1,956✔
1282
  if (ppPos) {
1,956✔
1283
    *pVLen = pFileState->rowSize;
902✔
1284
    *pVal = *ppPos;
902✔
1285
    (*ppPos)->beUsed = true;
902✔
1286
    (*ppPos)->beFlushed = false;
902✔
1287
    (*pWinCode) = TSDB_CODE_SUCCESS;
902✔
1288
    if ((*ppPos)->pRowBuff == NULL) {
902!
1289
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1290
      QUERY_CHECK_CODE(code, lino, _end);
×
1291
    }
1292
    goto _end;
902✔
1293
  }
1294
  TSKEY ts = pFileState->getTs(pKey);
1,054✔
1295
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
1,054!
1296
    int32_t len = 0;
4✔
1297
    void*   p = NULL;
4✔
1298
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
4✔
1299
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
4!
1300
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
4!
1301
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1302
      if (!pNewPos || !pNewPos->pRowBuff) {
4!
1303
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1304
        QUERY_CHECK_CODE(code, lino, _end);
×
1305
      }
1306

1307
      memcpy(pNewPos->pKey, pKey, keyLen);
4✔
1308
      memcpy(pNewPos->pRowBuff, p, len);
4✔
1309
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
4✔
1310
      QUERY_CHECK_CODE(code, lino, _end);
4!
1311

1312
      if (pVal) {
4!
1313
        *pVLen = pFileState->rowSize;
4✔
1314
        *pVal = pNewPos;
4✔
1315
      }
1316
    }
1317
    taosMemoryFree(p);
4!
1318
  }
1319

1320
_end:
1,050✔
1321
  if (code != TSDB_CODE_SUCCESS) {
1,956!
1322
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1323
  }
1324
  return code;
1,956✔
1325
}
1326

1327
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
542✔
1328
  int32_t code = TSDB_CODE_SUCCESS;
542✔
1329
  int32_t lino = 0;
542✔
1330
  if (value != NULL) {
542!
1331
    code = TSDB_CODE_INVALID_PARA;
×
1332
    QUERY_CHECK_CODE(code, lino, _end);
×
1333
  }
1334

1335
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
542✔
1336
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
194!
1337
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
194✔
1338
      QUERY_CHECK_CODE(code, lino, _end);
194!
1339
    }
1340
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
194✔
1341
    QUERY_CHECK_CODE(code, lino, _end);
194!
1342
  }
1343

1344
_end:
542✔
1345
  if (code != TSDB_CODE_SUCCESS) {
542!
1346
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1347
  }
1348
  return code;
542✔
1349
}
1350

1351
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
1,451✔
1352
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
1,451✔
1353
  if (pCur->hashIter == -1) {
1,451✔
1354
    streamStateCurNext(pFileState->pFileStore, pCur);
74✔
1355
    return;
74✔
1356
  }
1357

1358
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,377!
1359
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
1,377✔
1360

1361
  SSHashObj* pHash = pFileState->pGroupIdMap;
1,377✔
1362
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
1,377✔
1363
  if (!pCur->pHashData) {
1,377✔
1364
    pCur->hashIter = -1;
883✔
1365
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
883✔
1366
    return;
883✔
1367
  }
1368
}
1369

1370
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
4,739✔
1371
  int32_t code = TSDB_CODE_SUCCESS;
4,739✔
1372
  if (pCur->pHashData) {
4,739✔
1373
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,377!
1374
    return code;
1,377✔
1375
  }
1376
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
3,362✔
1377
}
1378

1379
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
3,290✔
1380
  return pFileState->pGroupIdMap;
3,290✔
1381
}
1382

1383
void clearExpiredState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs) {
5,272✔
1384
  int32_t    code = TSDB_CODE_SUCCESS;
5,272✔
1385
  int32_t    lino = 0;
5,272✔
1386
  SSHashObj* pSearchBuff = pFileState->searchBuff;
5,272✔
1387
  void*      pIte = NULL;
5,272✔
1388
  int32_t    iter = 0;
5,272✔
1389
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
7,685✔
1390
    SArray* pWinStates = *((void**)pIte);
2,413✔
1391
    int32_t arraySize = TARRAY_SIZE(pWinStates);
2,413✔
1392
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
2,413✔
1393
      SWinKey key = {.ts = minTs};
10!
1394
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
10✔
1395
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
10✔
1396
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
10✔
1397
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
10!
1398
    }
1399

1400
    int32_t size = arraySize - numOfKeep;
2,413✔
1401
    for (int32_t i = 0; i < size; i++) {
2,918✔
1402
      SWinKey* pKey = taosArrayGet(pWinStates, i);
505✔
1403
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
505✔
1404
      qTrace("clear expired buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d res:%d", pKey->ts, pKey->groupId, __func__, __LINE__, code_buff);
505!
1405

1406
      if (isFlushedState(pFileState, pKey->ts, 0)) {
505✔
1407
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
6✔
1408
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
6!
1409
      }
1410

1411
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
505!
1412
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SWinKey));
×
1413
      }
1414
    }
1415
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
2,413✔
1416
  }
1417
  code = clearRowBuffNonFlush(pFileState);
5,272✔
1418
  QUERY_CHECK_CODE(code, lino, _end);
5,271!
1419

1420
_end:
5,271✔
1421
  if (code != TSDB_CODE_SUCCESS) {
5,271!
1422
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1423
  }
1424
}
5,271✔
1425

1426
#ifdef BUILD_NO_CALL
1427
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1428
                              int32_t* pWinCode) {
1429
  int32_t code = TSDB_CODE_SUCCESS;
1430
  int32_t lino = 0;
1431

1432
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
1433
  QUERY_CHECK_CODE(code, lino, _end);
1434

1435
  SArray*    pWinStates = NULL;
1436
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
1437
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
1438
  if (ppBuff) {
1439
    pWinStates = (SArray*)(*ppBuff);
1440
  } else {
1441
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
1442
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
1443

1444
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
1445
    QUERY_CHECK_CODE(code, lino, _end);
1446
  }
1447

1448
  // recover
1449
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
1450
    recoverHashSortBuff(pFileState, pWinStates, pKey->groupId);
1451
  }
1452

1453
  int32_t size = taosArrayGetSize(pWinStates);
1454
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1455
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0) {
1456
    // find the first position which is smaller than the pKey
1457
    if (index >= 0) {
1458
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1459
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1460
        goto _end;
1461
      }
1462
    }
1463
    index++;
1464
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1465
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1466
  }
1467

1468
  if (size >= MAX_NUM_OF_CACHE_WIN) {
1469
    int32_t num = size - NUM_OF_CACHE_WIN;
1470
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
1471
  }
1472

1473
_end:
1474
  if (code != TSDB_CODE_SUCCESS) {
1475
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1476
  }
1477
  return code;
1478
}
1479
#endif
1480

1481
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
124✔
1482
                           int32_t* pVLen, int32_t* pWinCode) {
1483
  int32_t    code = TSDB_CODE_SUCCESS;
124✔
1484
  int32_t    lino = 0;
124✔
1485
  SArray*    pWinStates = NULL;
124✔
1486
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
124✔
1487
  void*      pState = getStateFileStore(pFileState);
124✔
1488
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
124✔
1489
  if (ppBuff) {
124!
1490
    pWinStates = (SArray*)(*ppBuff);
124✔
1491
  } else if (needClearDiskBuff(pFileState)) {
×
1492
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
1493
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
1494
    void*            tmpVal = NULL;
×
1495
    int32_t          len = 0;
×
1496
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
1497
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1498
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1499
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1500
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1501
        QUERY_CHECK_CODE(code, lino, _end);
×
1502
      }
1503
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1504
      taosMemoryFreeClear(tmpVal);
×
1505
      *pVLen = getRowStateRowSize(pFileState);
×
1506
      (*ppVal) = pNewPos;
×
1507
    }
1508
    streamStateFreeCur(pCur);
×
1509
    return code;
×
1510
  } else {
1511
    (*pWinCode) = TSDB_CODE_FAILED;
×
1512
    return code;
×
1513
  }
1514
  int32_t size = taosArrayGetSize(pWinStates);
124✔
1515
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
124✔
1516
  if (index >= 0) {
124!
1517
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
124✔
1518
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
124!
1519
      index--;
124✔
1520
    } else {
1521
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__,
×
1522
             __LINE__);
1523
    }
1524
  }
1525
  if (index == -1) {
124✔
1526
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
37✔
1527
    void*            tmpVal = NULL;
37✔
1528
    int32_t          len = 0;
37✔
1529
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
37✔
1530
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
37!
1531
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1532
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1533
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1534
        QUERY_CHECK_CODE(code, lino, _end);
×
1535
      }
1536
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1537
      taosMemoryFreeClear(tmpVal);
×
1538
      *pVLen = getRowStateRowSize(pFileState);
×
1539
      (*ppVal) = pNewPos;
×
1540
    }
1541
    streamStateFreeCur(pCur);
37✔
1542
    return code;
37✔
1543
  } else {
1544
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
87✔
1545
    *pResKey = *pPrevKey;
87✔
1546
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
87✔
1547
  }
1548
  (*pWinCode) = TSDB_CODE_FAILED;
1549

1550
_end:
×
1551
  if (code != TSDB_CODE_SUCCESS) {
×
1552
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1553
  }
1554
  return code;
×
1555
}
1556

1557
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey, bool* pIsEnd) {
2,957✔
1558
  int32_t code = TSDB_CODE_SUCCESS;
2,957✔
1559
  int32_t lino = 0;
2,957✔
1560
  int32_t size = taosArrayGetSize(pWinStates);
2,957✔
1561
  int32_t index = binarySearch(pWinStates, size, pKey, fillTSKeyCompare);
2,957✔
1562
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
2,956!
1563
    if (index >= 0) {
2,956✔
1564
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
2,584✔
1565
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
2,584✔
1566
        goto _end;
1,854✔
1567
      }
1568
    }
1569
    index++;
1,102✔
1570
    (*pIsEnd) = (index >= size);
1,102✔
1571
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
1,102✔
1572
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,103!
1573
  }
1574

1575
_end:
1,103✔
1576
  if (code != TSDB_CODE_SUCCESS) {
2,957!
1577
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1578
  }
1579
  return code;
2,956✔
1580
}
1581

1582
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
4,522✔
1583
  int32_t code = TSDB_CODE_SUCCESS;
4,522✔
1584
  int32_t lino = 0;
4,522✔
1585
  SArray* pWinStates = NULL;
4,522✔
1586
  void**  ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
4,522✔
1587
  if (ppBuff) {
4,522✔
1588
    pWinStates = (SArray*)(*ppBuff);
4,163✔
1589
  } else {
1590
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
359✔
1591
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
359!
1592

1593
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
359✔
1594
    QUERY_CHECK_CODE(code, lino, _end);
359!
1595
  }
1596

1597
  (*ppResStates) = pWinStates;
4,522✔
1598

1599
_end:
4,522✔
1600
  if (code != TSDB_CODE_SUCCESS) {
4,522!
1601
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1602
  }
1603
  return code;
4,522✔
1604
}
1605

1606
static void setValueBuff(TSKEY ts, char* pVal, int32_t len, char* pBuff, int32_t buffLen) {
55✔
1607
  SET_TSDATA_FLAG(pBuff, buffLen);
55✔
1608
  if (len == 0) {
55!
1609
    *(TSKEY*)pBuff = ts;
55✔
1610
    return;
55✔
1611
  }
1612
  memset(pBuff, 0, buffLen - 1);
×
1613
  *(TSKEY*)pBuff = ts;
×
1614
  memcpy(pBuff + sizeof(TSKEY), pVal, len);
×
1615
}
1616

1617
int32_t getAndSetTsData(STableTsDataState* pTsDataState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
55✔
1618
                        TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode) {
1619
  int32_t code = TSDB_CODE_SUCCESS;
55✔
1620
  int32_t lino = 0;
55✔
1621
  bool    hasPk = (lastPkLen != 0);
55✔
1622

1623
  TSKEY* pDataVal = tSimpleHashGet(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t));
55✔
1624
  if (pDataVal != NULL) {
55✔
1625
    (*pWinCode) = TSDB_CODE_SUCCESS;
33✔
1626
    *pCurTs = *pDataVal;
33✔
1627
    if ((*pCurTs) < lastTs) {
33!
1628
      setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
33✔
1629
    } else {
1630
      if (hasPk) {
×
1631
        (*ppCurPkVal) = POINTER_SHIFT(pDataVal, sizeof(TSKEY));
×
1632
        if ((*pCurTs) == lastTs && pTsDataState->comparePkColFn((*ppCurPkVal), pLastPkVal) < 0) {
×
1633
          setValueBuff(lastTs, pLastPkVal, lastPkLen, (char*)pDataVal, pTsDataState->pkValLen);
×
1634
        }
1635
      }
1636
    }
1637
  } else {
1638
    setValueBuff(lastTs, pLastPkVal, lastPkLen, pTsDataState->pPkValBuff, pTsDataState->pkValLen);
22✔
1639
    int32_t size = tSimpleHashGetSize(pTsDataState->pTableTsDataMap);
22✔
1640
    if (size < MAX_STATE_MAP_SIZE) {
22!
1641
      (*pWinCode) = TSDB_CODE_FAILED;
22✔
1642
      code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pTsDataState->pPkValBuff,
22✔
1643
                            pTsDataState->pkValLen);
22✔
1644
      QUERY_CHECK_CODE(code, lino, _end);
22!
1645
    } else {
1646
      (*pWinCode) = streamStateGetParTag_rocksdb(pTsDataState->pState, tableUid, &pTsDataState->pPkValBuff,
×
1647
                                                 &pTsDataState->pkValLen);
1648
      if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1649
        *pCurTs = *(TSKEY*)pTsDataState->pPkValBuff;
×
1650
        if (hasPk) {
×
1651
          (*ppCurPkVal) = POINTER_SHIFT(pTsDataState->pPkValBuff, sizeof(TSKEY));
×
1652
        }
1653
      }
1654

1655
      int32_t tmpCode = streamStatePutParTag_rocksdb(pTsDataState->pState, tableUid, pTsDataState->pPkValBuff,
×
1656
                                                     pTsDataState->pkValLen);
1657
    }
1658
  }
1659

1660
_end:
55✔
1661
  if (code != TSDB_CODE_SUCCESS) {
55!
1662
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1663
  }
1664
  return code;
55✔
1665
}
1666

1667
int32_t doTsDataCommit(STableTsDataState* pTsDataState) {
5✔
1668
  int32_t code = TSDB_CODE_SUCCESS;
5✔
1669
  int32_t lino = 0;
5✔
1670
  void*   batch = NULL;
5✔
1671
  char*   pTempBuf = NULL;
5✔
1672

1673
  batch = streamStateCreateBatch();
5✔
1674
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
5!
1675
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "partag");
5✔
1676
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5✔
1677
  pTempBuf = taosMemoryCalloc(1, len);
5!
1678
  QUERY_CHECK_NULL(pTempBuf, code, lino, _end, terrno);
5!
1679

1680
  void*   pIte = NULL;
5✔
1681
  int32_t iter = 0;
5✔
1682
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
12✔
1683
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
7!
1684
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1685
      streamStateClearBatch(batch);
×
1686
      QUERY_CHECK_CODE(code, lino, _end);
×
1687
    }
1688

1689
    if (HAS_TSDATA_FLAG(pIte, pTsDataState->pkValLen)) {
7!
1690
      void* pKey = tSimpleHashGetKey(pIte, NULL);
7✔
1691
      UNSET_TSDATA_FLAG(pIte, pTsDataState->pkValLen);
7✔
1692
      code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, pKey, pIte, pTsDataState->pkValLen, 0,
7✔
1693
                                         pTempBuf);
1694
      QUERY_CHECK_CODE(code, lino, _end);
7!
1695
      memset(pTempBuf, 0, len);
7✔
1696
      qDebug("flush ts data,table id:%" PRIu64 , *(uint64_t*)pKey);
7!
1697
    }
1698
  }
1699

1700
  int32_t numOfElems = streamStateGetBatchSize(batch);
5✔
1701
  if (numOfElems > 0) {
5✔
1702
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
3✔
1703
    QUERY_CHECK_CODE(code, lino, _end);
3!
1704
  }
1705

1706
_end:
5✔
1707
  if (code != TSDB_CODE_SUCCESS) {
5!
1708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1709
  }
1710
  taosMemoryFree(pTempBuf);
5!
1711
  streamStateDestroyBatch(batch);
5✔
1712
  return code;
5✔
1713
}
1714

1715
int32_t doRangeDataCommit(STableTsDataState* pTsDataState) {
5✔
1716
  int32_t code = TSDB_CODE_SUCCESS;
5✔
1717
  int32_t lino = 0;
5✔
1718
  void*   batch = NULL;
5✔
1719

1720
  batch = streamStateCreateBatch();
5✔
1721
  QUERY_CHECK_NULL(batch, code, lino, _end, terrno);
5!
1722
  int           idx = streamStateGetCfIdx(pTsDataState->pState, "sess");
5✔
1723
  int32_t       len = (pTsDataState->pkValLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
5✔
1724

1725
  int32_t size = taosArrayGetSize(pTsDataState->pScanRanges);
5✔
1726
  for (int32_t i = 0; i < size; i++) {
5!
1727
    SScanRange* pRange = taosArrayGet(pTsDataState->pScanRanges, i);
×
1728
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
×
1729
      code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1730
      streamStateClearBatch(batch);
×
1731
      QUERY_CHECK_CODE(code, lino, _end);
×
1732
    }
1733
    SSessionKey key = {.win = pRange->win, .groupId = 0};
×
1734
    int32_t     uidSize = tSimpleHashGetSize(pRange->pUIds);
×
1735
    int32_t     gpIdSize = tSimpleHashGetSize(pRange->pGroupIds);
×
1736
    int32_t     size = uidSize + gpIdSize;
×
1737
    uint64_t*   pIdBuf = (uint64_t*)taosMemoryCalloc(1, size);
×
1738
    void*       pIte = NULL;
×
1739
    int32_t     iter = 0;
×
1740
    int32_t     i = 0;
×
1741
    while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
×
1742
      void* pTempKey = tSimpleHashGetKey(pIte, NULL);
×
1743
      pIdBuf[i] = *(uint64_t*)pTempKey;
×
1744
      i++;
×
1745
    }
1746

1747
    code = streamStatePutBatchOptimize(pTsDataState->pState, idx, batch, &key, (void*)pIdBuf, size, 0,
×
1748
                                       NULL);
1749
    QUERY_CHECK_CODE(code, lino, _end);
×
1750
  }
1751

1752
  int32_t numOfElems = streamStateGetBatchSize(batch);
5✔
1753
  if (numOfElems > 0) {
5!
1754
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, batch);
×
1755
    QUERY_CHECK_CODE(code, lino, _end);
×
1756
  }
1757

1758
_end:
5✔
1759
  if (code != TSDB_CODE_SUCCESS) {
5!
1760
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1761
  }
1762
  streamStateDestroyBatch(batch);
5✔
1763
  return code;
5✔
1764
}
1765

1766
int32_t initTsDataState(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState, void* pOtherState) {
57✔
1767
  int32_t    code = TSDB_CODE_SUCCESS;
57✔
1768
  int32_t    lino = 0;
57✔
1769

1770
  STableTsDataState* pTsDataState = taosMemoryCalloc(1, sizeof(STableTsDataState));
57!
1771
  QUERY_CHECK_NULL(pTsDataState, code, lino, _end, terrno);
57!
1772

1773
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
57✔
1774
  pTsDataState->pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
57✔
1775
  QUERY_CHECK_NULL(pTsDataState->pTableTsDataMap, code, lino, _end, terrno);
57!
1776

1777
  pTsDataState->pkValLen = sizeof(TSKEY) + pkLen + sizeof(char);
57✔
1778
  pTsDataState->pPkValBuff = taosMemoryCalloc(1, pTsDataState->pkValLen);
57!
1779
  QUERY_CHECK_NULL(pTsDataState->pPkValBuff, code, lino, _end, terrno);
57!
1780

1781
  if (pkLen != 0) {
57!
1782
    pTsDataState->comparePkColFn = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
×
1783
  } else {
1784
    pTsDataState->comparePkColFn = NULL;
57✔
1785
  }
1786

1787
  pTsDataState->pScanRanges = taosArrayInit(64, sizeof(SScanRange));
57✔
1788
  QUERY_CHECK_NULL(pTsDataState->pScanRanges, code, lino, _end, terrno);
57!
1789

1790
  pTsDataState->pState = pState;
57✔
1791
  pTsDataState->recValueLen = sizeof(SRecDataInfo) + pkLen;
57✔
1792
  pTsDataState->pRecValueBuff = taosMemoryCalloc(1, pTsDataState->recValueLen);
57!
1793
  QUERY_CHECK_NULL(pTsDataState->pRecValueBuff, code, lino, _end, terrno);
57!
1794

1795
  pTsDataState->curRecId = -1;
57✔
1796

1797
  pTsDataState->pStreamTaskState = pOtherState;
57✔
1798

1799
  pTsDataState->cfgIndex = streamStateGetCfIdx(pTsDataState->pState, "sess");
57✔
1800
  pTsDataState->pBatch = streamStateCreateBatch();
57✔
1801
  QUERY_CHECK_NULL(pTsDataState->pBatch, code, lino, _end, TSDB_CODE_FAILED);
57!
1802
  
1803
  pTsDataState->batchBufflen = (pTsDataState->recValueLen + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
57✔
1804
  pTsDataState->pBatchBuff = taosMemoryCalloc(1, pTsDataState->batchBufflen);
57!
1805

1806
  (*ppTsDataState) = pTsDataState;
57✔
1807

1808
_end:
57✔
1809
  if (code != TSDB_CODE_SUCCESS) {
57!
1810
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1811
  }
1812
  return code;
57✔
1813
}
1814

1815
static void destroyScanRange(SScanRange* pRange) {
×
1816
  pRange->win.skey = INT64_MIN;
×
1817
  pRange->win.ekey = INT64_MIN;
×
1818
  tSimpleHashCleanup(pRange->pUIds);
×
1819
  pRange->pUIds = NULL;
×
1820
  tSimpleHashCleanup(pRange->pGroupIds);
×
1821
  pRange->pGroupIds = NULL;
×
1822
}
×
1823

1824
void destroyTsDataState(STableTsDataState* pTsDataState) {
57✔
1825
  SArray* pScanRanges = pTsDataState->pScanRanges;
57✔
1826
  int32_t size = taosArrayGetSize(pScanRanges);
57✔
1827
  for (int32_t i = 0; i < size; i++) {
57!
1828
    SScanRange* pRange = taosArrayGet(pScanRanges, i);
×
1829
    destroyScanRange(pRange);
×
1830
  }
1831
  taosArrayDestroy(pTsDataState->pScanRanges);
57✔
1832
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
57✔
1833
  taosMemoryFreeClear(pTsDataState->pPkValBuff);
57!
1834
  taosMemoryFreeClear(pTsDataState->pState);
57!
1835
  taosMemoryFreeClear(pTsDataState->pRecValueBuff);
57!
1836
  pTsDataState->pStreamTaskState = NULL;
57✔
1837

1838
  streamStateClearBatch(pTsDataState->pBatch);
57✔
1839
  streamStateDestroyBatch(pTsDataState->pBatch);
57✔
1840
  pTsDataState->pBatch = NULL;
57✔
1841
  taosMemoryFreeClear(pTsDataState->pBatchBuff);
57!
1842

1843
  taosMemoryFreeClear(pTsDataState);
57!
1844
}
57✔
1845

1846
int32_t recoverTsData(STableTsDataState* pTsDataState) {
61✔
1847
  int32_t          code = TSDB_CODE_SUCCESS;
61✔
1848
  int32_t          lino = 0;
61✔
1849
  SStreamStateCur* pCur = createStateCursor(NULL);
61✔
1850
  streamStateParTagSeekKeyNext_rocksdb(pTsDataState->pState, INT64_MIN, pCur);
61✔
1851
  while (1) {
7✔
1852
    uint64_t tableUid = 0;
68✔
1853
    void*    pVal = NULL;
68✔
1854
    int32_t  len = 0;
68✔
1855
    int32_t  winCode = streamStateParTagGetKVByCur_rocksdb(pCur, &tableUid, &pVal, &len);
68✔
1856
    if (winCode != TSDB_CODE_SUCCESS) {
68✔
1857
      break;
61✔
1858
    }
1859
    if (pTsDataState->pkValLen != len) {
7!
1860
      taosMemoryFree(pVal);
×
1861
      streamStateCurNext_rocksdb(pCur);
×
1862
      continue;
×
1863
    }
1864
    UNSET_TSDATA_FLAG(pVal, len);
7✔
1865
    code = tSimpleHashPut(pTsDataState->pTableTsDataMap, &tableUid, sizeof(uint64_t), pVal, len);
7✔
1866
    taosMemoryFree(pVal);
7!
1867
    QUERY_CHECK_CODE(code, lino, _end);
7!
1868
    streamStateCurNext_rocksdb(pCur);
7✔
1869
  }
1870

1871
_end:
61✔
1872
  streamStateFreeCur(pCur);
61✔
1873
  if (code != TSDB_CODE_SUCCESS) {
61!
1874
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1875
  }
1876
  return code;
61✔
1877
}
1878

1879
SStreamStateCur* getLastStateCur(SStreamFileState* pFileState, getStateBuffFn fn) {
2✔
1880
  SStreamStateCur* pCur = createStateCursor(pFileState);
2✔
1881
  if (pCur == NULL) {
2!
1882
    return NULL;
×
1883
  }
1884
  SSHashObj* pSearchBuff = fn(pFileState);
2✔
1885
  pCur->buffIndex = 0;
2✔
1886
  pCur->hashIter = 0;
2✔
1887
  pCur->pHashData = NULL;
2✔
1888
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
2✔
1889
  return pCur;
2✔
1890
}
1891

1892
void moveLastStateCurNext(SStreamStateCur* pCur, getStateBuffFn fn) {
5✔
1893
  SSHashObj* pSearchBuff = fn(pCur->pStreamFileState);
5✔
1894
  pCur->pHashData = tSimpleHashIterate(pSearchBuff, pCur->pHashData, &pCur->hashIter);
5✔
1895
}
5✔
1896

1897
int32_t getNLastStateKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
7✔
1898
  int32_t code = TSDB_CODE_SUCCESS;
7✔
1899
  int32_t lino = 0;
7✔
1900
  SArray*  pWinStates = NULL;
7✔
1901
  int32_t size = 0;
7✔
1902

1903
  while(1) {
1904
    if (pCur->pHashData == NULL) {
7✔
1905
      return TSDB_CODE_FAILED;
2✔
1906
    }
1907
    pWinStates = *((void**)pCur->pHashData);
5✔
1908
    size = taosArrayGetSize(pWinStates);
5✔
1909
    if (size > 0) {
5!
1910
      break;
5✔
1911
    }
1912
    moveLastStateCurNext(pCur, getSearchBuff);
×
1913
  }
1914

1915
  int32_t i = TMAX(size - num, 0);
5✔
1916

1917
  for ( ; i < size; i++) {
10✔
1918
    SWinKey* pKey = taosArrayGet(pWinStates, i);
5✔
1919
    int32_t  len = 0;
5✔
1920
    void*    pVal = NULL;
5✔
1921
    int32_t  winCode = TSDB_CODE_SUCCESS;
5✔
1922
    code = addRowBuffIfNotExist(pCur->pStreamFileState, (void*)pKey, sizeof(SWinKey), &pVal, &len, &winCode);
5✔
1923
    QUERY_CHECK_CODE(code, lino, _end);
5!
1924

1925
    if (winCode != TSDB_CODE_SUCCESS) {
5!
1926
      qError("%s failed at line %d since window not exist. ts:%" PRId64 ",groupId:%" PRIu64, __func__, __LINE__,
×
1927
             pKey->ts, pKey->groupId);
1928
    }
1929

1930
    void* pTempRes = taosArrayPush(pRes, &pVal);
5✔
1931
    QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
5!
1932
  }
1933

1934
_end:
5✔
1935
  if (code != TSDB_CODE_SUCCESS) {
5!
1936
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1937
  }
1938
  return code;
5✔
1939
}
1940

1941
int32_t reloadTsDataState(STableTsDataState* pTsDataState) {
4✔
1942
  int32_t code = TSDB_CODE_SUCCESS;
4✔
1943
  int32_t lino = 0;
4✔
1944

1945
  STableTsDataState tmpState = *pTsDataState;
4✔
1946
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
4✔
1947
  tmpState.pTableTsDataMap = tSimpleHashInit(DEFAULT_STATE_MAP_CAPACITY, hashFn);
4✔
1948
  QUERY_CHECK_NULL(tmpState.pTableTsDataMap, code, lino, _end, terrno);
4!
1949

1950
  code = recoverTsData(&tmpState);
4✔
1951
  QUERY_CHECK_CODE(code, lino, _end);
4!
1952

1953
  void*   pIte = NULL;
4✔
1954
  int32_t iter = 0;
4✔
1955
  while ((pIte = tSimpleHashIterate(pTsDataState->pTableTsDataMap, pIte, &iter)) != NULL) {
4!
1956
    size_t keyLen = 0;
×
1957
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
1958
    code = tSimpleHashPut(tmpState.pTableTsDataMap, pKey, keyLen, pIte, pTsDataState->pkValLen);
×
1959
    QUERY_CHECK_CODE(code, lino, _end);
×
1960
  }
1961
  tSimpleHashCleanup(pTsDataState->pTableTsDataMap);
4✔
1962
  pTsDataState->pTableTsDataMap = tmpState.pTableTsDataMap;
4✔
1963

1964
_end:
4✔
1965
  if (code != TSDB_CODE_SUCCESS) {
4!
1966
    tSimpleHashCleanup(tmpState.pTableTsDataMap);
×
1967
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1968
  }
1969
  return code;
4✔
1970
}
1971

1972
int32_t saveRecInfoToDisk(STableTsDataState* pTsDataState, SSessionKey* pKey, SRecDataInfo* pVal, int32_t vLen) {
×
1973
  int32_t code = TSDB_CODE_SUCCESS;
×
1974
  int32_t lino = 0;
×
1975

1976
  SStateSessionKey stateKey = {.key = *pKey, .opNum = ((SStreamState*)pTsDataState->pState)->number};
×
1977
  code = streamStatePutBatchOptimize(pTsDataState->pState, pTsDataState->cfgIndex, pTsDataState->pBatch, &stateKey, pVal, vLen, 0,
×
1978
                                     pTsDataState->pBatchBuff);
1979
  QUERY_CHECK_CODE(code, lino, _end);
×
1980

1981
  memset(pTsDataState->pBatchBuff, 0, pTsDataState->batchBufflen);
×
1982

1983
  if (streamStateGetBatchSize(pTsDataState->pBatch) >= BATCH_LIMIT) {
×
1984
    code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
×
1985
    streamStateClearBatch(pTsDataState->pBatch);
×
1986
    QUERY_CHECK_CODE(code, lino, _end);
×
1987
  }
1988

1989
_end:
×
1990
  if (code != TSDB_CODE_SUCCESS) {
×
1991
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1992
  }
1993
  return code;
×
1994
}
1995

1996
int32_t flushRemainRecInfoToDisk(STableTsDataState* pTsDataState) {
6✔
1997
  int32_t code = streamStatePutBatch_rocksdb(pTsDataState->pState, pTsDataState->pBatch);
6✔
1998
  streamStateClearBatch(pTsDataState->pBatch);
6✔
1999
  return code;
6✔
2000
}
2001

2002
int32_t recoverHashSortBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
4✔
2003
  int32_t code = TSDB_CODE_SUCCESS;
4✔
2004
  int32_t lino = 0;
4✔
2005

2006
  SWinKey          start = {.groupId = groupId, .ts = INT64_MAX};
4✔
2007
  void*            pState = getStateFileStore(pFileState);
4✔
2008
  SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
4✔
2009
  for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
8!
2010
    SWinKey tmpKey = {.groupId = groupId};
8✔
2011
    int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
8✔
2012
    if (tmpRes != TSDB_CODE_SUCCESS) {
8✔
2013
      break;
4✔
2014
    }
2015
    void* tmp = taosArrayPush(pWinStates, &tmpKey);
4✔
2016
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
4!
2017
    streamStateCurPrev_rocksdb(pCur);
4✔
2018
  }
2019
  taosArraySort(pWinStates, winKeyCmprImpl);
4✔
2020
  streamStateFreeCur(pCur);
4✔
2021

2022
_end:
4✔
2023
  if (code != TSDB_CODE_SUCCESS) {
4!
2024
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2025
  }
2026
  return code;
4✔
2027
}
2028

2029
int32_t getRowStateAllPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum) {
6✔
2030
  int32_t    code = TSDB_CODE_SUCCESS;
6✔
2031
  int32_t    lino = 0;
6✔
2032
  SWinKey*   pPrevKey = NULL;
6✔
2033
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
6✔
2034
  void*      pState = getStateFileStore(pFileState);
6✔
2035
  void**     ppBuff = (void**)tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
6✔
2036
  int32_t    num = 0;
6✔
2037
  if (ppBuff) {
6!
2038
    SArray* pWinStates = (SArray*)(*ppBuff);
6✔
2039
    int32_t size = taosArrayGetSize(pWinStates);
6✔
2040
    int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
6✔
2041
    for (; index >= 0 && num <= maxNum; index--) {
13!
2042
      pPrevKey = taosArrayGet(pWinStates, index);
7✔
2043
      if (winKeyCmprImpl(pPrevKey, pKey) == 0) {
7!
2044
        continue;
×
2045
      }
2046
      void*   pVal = NULL;
7✔
2047
      int32_t len = 0;
7✔
2048
      int32_t winCode = TSDB_CODE_SUCCESS;
7✔
2049
      code = addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), &pVal, &len, &winCode);
7✔
2050
      QUERY_CHECK_CODE(code, lino, _end);
7!
2051
      void* tempRes = taosArrayPush(pResArray, &pVal);
7✔
2052
      QUERY_CHECK_NULL(tempRes, code, lino, _end, terrno);
7!
2053
      num++;
7✔
2054
    }
2055
  }
2056

2057
_end:
6✔
2058
  if (code != TSDB_CODE_SUCCESS) {
6!
2059
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2060
  }
2061
  return code;
6✔
2062
}
2063

2064
int32_t setStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t mode) {
×
2065
  return tSimpleHashPut(pFileState->pRecFlagMap, pKey, keyLen, &mode, sizeof(int32_t));
×
2066
}
2067

2068
int32_t getStateRecFlag(SStreamFileState* pFileState, const void* pKey, int32_t keyLen, int32_t* pMode) {
34✔
2069
  void* pVal = tSimpleHashGet(pFileState->pRecFlagMap, pKey, keyLen);
34✔
2070
  if (pVal == NULL) {
34!
2071
    return TSDB_CODE_FAILED;
34✔
2072
  }
2073
  *pMode = *(int32_t*) pVal;
×
2074
  return TSDB_CODE_SUCCESS;
×
2075
}
2076

2077
void clearExpiredSessionState(SStreamFileState* pFileState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup) {
×
2078
  int32_t    code = TSDB_CODE_SUCCESS;
×
2079
  int32_t    lino = 0;
×
2080
  SSHashObj* pSessionBuff = pFileState->rowStateBuff;
×
2081
  SStreamSnapshot* pFlushList = NULL;
×
2082
  if (pFlushGroup != NULL) {
×
2083
    pFlushList = tdListNew(POINTER_BYTES);
×
2084
  }
2085
  void*      pIte = NULL;
×
2086
  int32_t    iter = 0;
×
2087
  while ((pIte = tSimpleHashIterate(pSessionBuff, pIte, &iter)) != NULL) {
×
2088
    SArray* pWinStates = *((void**)pIte);
×
2089
    int32_t arraySize = TARRAY_SIZE(pWinStates);
×
2090
    if (minTs != INT64_MAX && arraySize > numOfKeep) {
×
2091
      SSessionKey key = {.win.skey = minTs, .win.ekey = minTs};
×
2092
      key.groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
2093
      int32_t index = binarySearch(pWinStates, arraySize, &key, fillStateKeyCompare);
×
2094
      numOfKeep = TMAX(arraySize - index, MIN_NUM_OF_SORT_CACHE_WIN);
×
2095
      qDebug("modify numOfKeep, numOfKeep:%d. %s at line %d", numOfKeep, __func__, __LINE__);
×
2096
    }
2097

2098
    int32_t size = arraySize - numOfKeep;
×
2099
    for (int32_t i = 0; i < size; i++) {
×
2100
      SRowBuffPos* pPos = taosArrayGetP(pWinStates, i);
×
2101
      SSessionKey* pKey = pPos->pKey;
×
2102
      if (tSimpleHashGetSize(pFileState->pRecFlagMap) > 0) {
×
2103
        tSimpleHashRemove(pFileState->pRecFlagMap, pKey, sizeof(SSessionKey));
×
2104
      }
2105
      pPos->invalid = true;
×
2106

2107
      if (i == 0 && pFlushGroup != NULL) {
×
2108
        void* pGpVal = tSimpleHashGet(pFlushGroup, &pKey->groupId, sizeof(uint64_t));
×
2109
        if (pGpVal == NULL) {
×
2110
          code = tdListAppend(pFlushList, &pPos);
×
2111
          QUERY_CHECK_CODE(code, lino, _end);
×
2112
          code = tSimpleHashPut(pFlushGroup, &pKey->groupId, sizeof(uint64_t), NULL, 0);
×
2113
          QUERY_CHECK_CODE(code, lino, _end);
×
2114
          continue;
×
2115
        }
2116
      }
2117
      pPos->beFlushed = true;
×
2118
      qTrace("clear expired session buff, ts:%" PRId64 ",groupid:%" PRIu64 ". %s at line %d", pKey->win.skey, pKey->groupId, __func__, __LINE__);
×
2119

2120
      if (isFlushedState(pFileState, pKey->win.skey, 0)) {
×
2121
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
2122
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->win.skey, __func__, __LINE__, code_file);
×
2123
      }
2124
    }
2125
    taosArrayRemoveBatch(pWinStates, 0, size, NULL);
×
2126
  }
2127

2128
  if (pFlushList != NULL) {
×
2129
    flushSnapshot(pFileState, pFlushList, false);
×
2130
    code = clearRowBuffNonFlush(pFileState);
×
2131
    QUERY_CHECK_CODE(code, lino, _end);
×
2132
    tdListFreeP(pFlushList, destroyRowBuffPosPtr);
×
2133
  }
2134

2135
_end:
×
2136
  if (code != TSDB_CODE_SUCCESS) {
×
2137
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2138
  }
2139
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc