• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.52
/source/libs/stream/src/tstreamFileState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tstreamFileState.h"
17

18
#include "query.h"
19
#include "streamBackendRocksdb.h"
20
#include "taos.h"
21
#include "tcommon.h"
22
#include "thash.h"
23
#include "tsimplehash.h"
24

25
#define FLUSH_RATIO                    0.5
26
#define FLUSH_NUM                      4
27
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
28
#define MIN_NUM_OF_ROW_BUFF            10240
29
#define MIN_NUM_OF_RECOVER_ROW_BUFF    128
30
#define MIN_NUM_SEARCH_BUCKET          128
31
#define MAX_ARRAY_SIZE                 1024
32
#define MAX_GROUP_ID_NUM               200000
33
#define NUM_OF_CACHE_WIN               64
34
#define MAX_NUM_OF_CACHE_WIN           128
35

36
#define TASK_KEY               "streamFileState"
37
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
38

39
struct SStreamFileState {
40
  SList*     usedBuffs;
41
  SList*     freeBuffs;
42
  void*      rowStateBuff;
43
  void*      pFileStore;
44
  int32_t    rowSize;
45
  int32_t    selectivityRowSize;
46
  int32_t    keyLen;
47
  uint64_t   preCheckPointVersion;
48
  uint64_t   checkPointVersion;
49
  TSKEY      maxTs;
50
  TSKEY      deleteMark;
51
  TSKEY      flushMark;
52
  uint64_t   maxRowCount;
53
  uint64_t   curRowCount;
54
  GetTsFun   getTs;
55
  char*      id;
56
  char*      cfName;
57
  void*      searchBuff;
58
  SSHashObj* pGroupIdMap;
59
  bool       hasFillCatch;
60

61
  _state_buff_cleanup_fn         stateBuffCleanupFn;
62
  _state_buff_remove_fn          stateBuffRemoveFn;
63
  _state_buff_remove_by_pos_fn   stateBuffRemoveByPosFn;
64
  _state_buff_create_statekey_fn stateBuffCreateStateKeyFn;
65

66
  _state_file_remove_fn stateFileRemoveFn;
67
  _state_file_get_fn    stateFileGetFn;
68

69
  _state_fun_get_fn stateFunctionGetFn;
70
};
71

72
typedef SRowBuffPos SRowBuffInfo;
73

74
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
5,206✔
75
  SWinKey* pWin2 = taosArrayGet(pDatas, pos);
5,206✔
76
  return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
5,206✔
77
}
78

79
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
3,990✔
80
  SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
3,990✔
81
  if (pos) {
3,992✔
82
    (*pos)->beFlushed = true;
2,174✔
83
  }
84
  return tSimpleHashRemove(pBuff, pKey, keyLen);
3,992✔
85
}
86

87
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
5,787,532✔
88
  size_t        keyLen = pFileState->keyLen;
5,787,532✔
89
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
5,787,532✔
90
  if (ppPos) {
5,787,689✔
91
    if ((*ppPos) == pPos) {
5,787,401!
92
      int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
5,787,406✔
93
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
5,787,390✔
94
    }
95
  }
96
}
5,787,852✔
97

98
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
×
99

100
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
4,086✔
101

102
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
3,682✔
103
  return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
3,682✔
104
}
105

106
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
134,235✔
107
  return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
134,235✔
108
}
109

110
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
1,171,321✔
111
  SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
1,171,321!
112
  if (pStateKey == NULL) {
1,171,321!
113
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
114
    return NULL;
×
115
  }
116
  SWinKey* pWinKey = pPos->pKey;
1,171,321✔
117
  pStateKey->key = *pWinKey;
1,171,321✔
118
  pStateKey->opNum = num;
1,171,321✔
119
  return pStateKey;
1,171,321✔
120
}
121

122
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
679✔
123
  SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
679!
124
  if (pStateKey == NULL) {
679!
125
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
126
    return NULL;
×
127
  }
128
  SWinKey* pWinKey = pPos->pKey;
679✔
129
  *pStateKey = *pWinKey;
679✔
130
  return pStateKey;
679✔
131
}
132

133
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
1,896✔
134
  return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
1,896✔
135
}
136

137
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
216✔
138
  return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
216✔
139
}
140

141
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
928✔
142
  SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
928!
143
  if (pStateKey == NULL) {
928!
144
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
145
    return NULL;
×
146
  }
147
  SSessionKey* pWinKey = pPos->pKey;
928✔
148
  pStateKey->key = *pWinKey;
928✔
149
  pStateKey->opNum = num;
928✔
150
  return pStateKey;
928✔
151
}
152

153
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
14!
154

155
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
1,148✔
156
  *pLen = sizeof(TSKEY);
1,148✔
157
  (*pVal) = taosMemoryCalloc(1, *pLen);
1,148!
158
  if ((*pVal) == NULL) {
1,148!
159
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
160
    return terrno;
×
161
  }
162
  void*   buff = *pVal;
1,148✔
163
  int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
1,148!
164
  return TSDB_CODE_SUCCESS;
1,148✔
165
}
166

167
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
5,890✔
168
                            void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type,
169
                            SStreamFileState** ppFileState) {
170
  int32_t code = TSDB_CODE_SUCCESS;
5,890✔
171
  int32_t lino = 0;
5,890✔
172
  if (memSize <= 0) {
5,890!
173
    memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
×
174
  }
175
  if (rowSize == 0) {
5,890!
176
    code = TSDB_CODE_INVALID_PARA;
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178
  }
179

180
  SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
5,890!
181
  QUERY_CHECK_NULL(pFileState, code, lino, _end, terrno);
5,889!
182

183
  rowSize += selectRowSize;
5,889✔
184
  pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
5,889✔
185
  pFileState->usedBuffs = tdListNew(POINTER_BYTES);
5,889✔
186
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
5,891!
187

188
  pFileState->freeBuffs = tdListNew(POINTER_BYTES);
5,891✔
189
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
5,891!
190

191
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5,891✔
192
  int32_t    cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
5,891✔
193
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
5,891✔
194
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
4,063✔
195
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
4,061✔
196
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
4,061✔
197
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
4,061✔
198
    pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
4,061✔
199

200
    pFileState->stateFileRemoveFn = intervalFileRemoveFn;
4,061✔
201
    pFileState->stateFileGetFn = intervalFileGetFn;
4,061✔
202
    pFileState->cfName = taosStrdup("state");
4,061!
203
    pFileState->stateFunctionGetFn = addRowBuffIfNotExist;
4,063✔
204
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,828✔
205
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
1,691✔
206
    pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
1,691✔
207
    pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
1,691✔
208
    pFileState->stateBuffRemoveByPosFn = deleteSessionWinStateBuffByPosFn;
1,691✔
209
    pFileState->stateBuffCreateStateKeyFn = sessionCreateStateKey;
1,691✔
210

211
    pFileState->stateFileRemoveFn = sessionFileRemoveFn;
1,691✔
212
    pFileState->stateFileGetFn = sessionFileGetFn;
1,691✔
213
    pFileState->cfName = taosStrdup("sess");
1,691!
214
    pFileState->stateFunctionGetFn = getSessionRowBuff;
1,691✔
215
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
137!
216
    pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
137✔
217
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
137✔
218
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
137!
219
    pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
137✔
220
    pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
137✔
221
    pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
137✔
222
    pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
137✔
223

224
    pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
137✔
225
    pFileState->stateFileGetFn = hashSortFileGetFn;
137✔
226
    pFileState->cfName = taosStrdup("fill");
137!
227
    pFileState->stateFunctionGetFn = NULL;
137✔
228
  }
229

230
  QUERY_CHECK_NULL(pFileState->usedBuffs, code, lino, _end, terrno);
5,891!
231
  QUERY_CHECK_NULL(pFileState->freeBuffs, code, lino, _end, terrno);
5,891!
232
  QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
5,891!
233
  QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
5,891!
234

235
  if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
5,891✔
236
    pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
70✔
237
    QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
70!
238
  }
239

240
  pFileState->keyLen = keySize;
5,891✔
241
  pFileState->rowSize = rowSize;
5,891✔
242
  pFileState->selectivityRowSize = selectRowSize;
5,891✔
243
  pFileState->preCheckPointVersion = 0;
5,891✔
244
  pFileState->checkPointVersion = 1;
5,891✔
245
  pFileState->pFileStore = pFile;
5,891✔
246
  pFileState->getTs = fp;
5,891✔
247
  pFileState->curRowCount = 0;
5,891✔
248
  pFileState->deleteMark = delMark;
5,891✔
249
  pFileState->flushMark = INT64_MIN;
5,891✔
250
  pFileState->maxTs = INT64_MIN;
5,891✔
251
  pFileState->id = taosStrdup(taskId);
5,891!
252
  QUERY_CHECK_NULL(pFileState->id, code, lino, _end, terrno);
5,890!
253

254
  pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
5,890✔
255
  QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
5,890!
256

257
  pFileState->hasFillCatch = true;
5,890✔
258

259
  if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
5,890✔
260
    code = recoverSnapshot(pFileState, checkpointId);
4,062✔
261
  } else if (type == STREAM_STATE_BUFF_SORT) {
1,828✔
262
    code = recoverSession(pFileState, checkpointId);
1,691✔
263
  } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
137!
264
    code = recoverFillSnapshot(pFileState, checkpointId);
137✔
265
  }
266
  QUERY_CHECK_CODE(code, lino, _end);
5,891!
267

268
  void*   valBuf = NULL;
5,891✔
269
  int32_t len = 0;
5,891✔
270
  int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
5,891✔
271
  if (tmpRes == TSDB_CODE_SUCCESS) {
5,891✔
272
    QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
14!
273
    streamFileStateDecode(&pFileState->flushMark, valBuf, len);
14✔
274
    qDebug("===stream===flushMark  read:%" PRId64, pFileState->flushMark);
14✔
275
  }
276
  taosMemoryFreeClear(valBuf);
5,891!
277
  (*ppFileState) = pFileState;
5,891✔
278

279
_end:
5,891✔
280
  if (code != TSDB_CODE_SUCCESS) {
5,891!
281
    streamFileStateDestroy(pFileState);
×
282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
283
  }
284
  return code;
5,891✔
285
}
286

287
void destroyRowBuffPos(SRowBuffPos* pPos) {
10,917,896✔
288
  taosMemoryFreeClear(pPos->pKey);
10,917,896!
289
  taosMemoryFreeClear(pPos->pRowBuff);
10,941,832!
290
  taosMemoryFree(pPos);
10,948,711!
291
}
10,957,337✔
292

293
void destroyRowBuffPosPtr(void* ptr) {
1,330✔
294
  if (!ptr) {
1,330!
295
    return;
×
296
  }
297
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
1,330✔
298
  if (!pPos->beUsed) {
1,330✔
299
    destroyRowBuffPos(pPos);
1,254✔
300
  }
301
}
302

303
void destroyRowBuffAllPosPtr(void* ptr) {
5,070,244✔
304
  if (!ptr) {
5,070,244!
305
    return;
×
306
  }
307
  SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
5,070,244✔
308
  destroyRowBuffPos(pPos);
5,070,244✔
309
}
310

311
void destroyRowBuff(void* ptr) {
5,412,200✔
312
  if (!ptr) {
5,412,200!
313
    return;
×
314
  }
315
  taosMemoryFree(*(void**)ptr);
5,412,200!
316
}
317

318
void streamFileStateDestroy(SStreamFileState* pFileState) {
12,867✔
319
  if (!pFileState) {
12,867✔
320
    return;
7,108✔
321
  }
322

323
  taosMemoryFree(pFileState->id);
5,759!
324
  taosMemoryFree(pFileState->cfName);
5,759!
325
  tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
5,759✔
326
  tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
5,759✔
327
  pFileState->stateBuffCleanupFn(pFileState->rowStateBuff);
5,759✔
328
  sessionWinStateCleanup(pFileState->searchBuff);
5,759✔
329
  tSimpleHashCleanup(pFileState->pGroupIdMap);
5,759✔
330
  taosMemoryFree(pFileState);
5,759!
331
}
332

333
int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
5,845,686✔
334
  int32_t code = TSDB_CODE_SUCCESS;
5,845,686✔
335
  int32_t lino = 0;
5,845,686✔
336
  if (pPos->pRowBuff) {
5,845,686✔
337
    code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
5,845,671✔
338
    QUERY_CHECK_CODE(code, lino, _end);
5,845,546!
339
    pPos->pRowBuff = NULL;
5,845,546✔
340
  }
341

342
_end:
15✔
343
  if (code != TSDB_CODE_SUCCESS) {
5,845,561!
344
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
345
  }
346
  return code;
5,845,475✔
347
}
348

349
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
5,472✔
350
  int32_t   code = TSDB_CODE_SUCCESS;
5,472✔
351
  int32_t   lino = 0;
5,472✔
352
  SListIter iter = {0};
5,472✔
353
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,472✔
354

355
  SListNode* pNode = NULL;
5,473✔
356
  while ((pNode = tdListNext(&iter)) != NULL) {
7,030,766✔
357
    SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
7,025,259✔
358
    if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
7,025,259!
359
      code = putFreeBuff(pFileState, pPos);
5,843,750✔
360
      QUERY_CHECK_CODE(code, lino, _end);
5,843,516!
361

362
      if (!all) {
5,843,516✔
363
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
5,786,326✔
364
      }
365
      destroyRowBuffPos(pPos);
5,843,844✔
366
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
5,843,829✔
367
      taosMemoryFreeClear(tmp);
5,843,668!
368
    }
369
  }
370

371
_end:
5,470✔
372
  if (code != TSDB_CODE_SUCCESS) {
5,470!
373
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
374
  }
375
}
5,470✔
376

377
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool all) {
7,149✔
378
  int32_t   code = TSDB_CODE_SUCCESS;
7,149✔
379
  int32_t   lino = 0;
7,149✔
380
  uint64_t  i = 0;
7,149✔
381
  SListIter iter = {0};
7,149✔
382
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
7,149✔
383

384
  SListNode* pNode = NULL;
7,149✔
385
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
9,691✔
386
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
2,542✔
387
    if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
2,542✔
388
      if (all || !pPos->beUsed) {
779✔
389
        if (all && !pPos->pRowBuff) {
675!
390
          continue;
×
391
        }
392
        code = tdListAppend(pFlushList, &pPos);
675✔
393
        QUERY_CHECK_CODE(code, lino, _end);
675!
394

395
        pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
675✔
396
        pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
675✔
397
        SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
675✔
398
        taosMemoryFreeClear(tmp);
675!
399
        if (pPos->pRowBuff) {
675✔
400
          i++;
660✔
401
        }
402
      }
403
    }
404
  }
405

406
_end:
7,148✔
407
  if (code != TSDB_CODE_SUCCESS) {
7,148!
408
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
409
  }
410
  return code;
7,148✔
411
}
412

413
void streamFileStateClear(SStreamFileState* pFileState) {
1,965✔
414
  pFileState->flushMark = INT64_MIN;
1,965✔
415
  pFileState->maxTs = INT64_MIN;
1,965✔
416
  tSimpleHashClear(pFileState->rowStateBuff);
1,965✔
417
  clearExpiredRowBuff(pFileState, 0, true);
1,965✔
418
}
1,965✔
419

420
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
170✔
421

422
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
10,362,380✔
423

424
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
5,959✔
425
  int32_t   code = TSDB_CODE_SUCCESS;
5,959✔
426
  int32_t   lino = 0;
5,959✔
427
  uint64_t  i = 0;
5,959✔
428
  SListIter iter = {0};
5,959✔
429
  tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
5,959✔
430

431
  SListNode* pNode = NULL;
5,959✔
432
  while ((pNode = tdListNext(&iter)) != NULL && i < max) {
6,776✔
433
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
817✔
434
    if (pPos->beUsed == used) {
817✔
435
      if (used && !pPos->pRowBuff) {
655!
436
        QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
437
        continue;
×
438
      }
439
      code = tdListAppend(pFlushList, &pPos);
655✔
440
      QUERY_CHECK_CODE(code, lino, _end);
655!
441

442
      pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
655✔
443
      pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
655✔
444
      SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
655✔
445
      taosMemoryFreeClear(tmp);
655!
446
      if (pPos->pRowBuff) {
655!
447
        i++;
655✔
448
      }
449
    }
450
  }
451

452
  qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
5,959!
453

454
_end:
×
455
  if (code != TSDB_CODE_SUCCESS) {
5,959!
456
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
457
  }
458
  return code;
5,959✔
459
}
460

461
int32_t flushRowBuff(SStreamFileState* pFileState) {
3,622✔
462
  int32_t          code = TSDB_CODE_SUCCESS;
3,622✔
463
  int32_t          lino = 0;
3,622✔
464
  SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
3,622✔
465
  if (!pFlushList) {
3,622!
466
    code = TSDB_CODE_OUT_OF_MEMORY;
×
467
    QUERY_CHECK_CODE(code, lino, _end);
×
468
  }
469

470
  uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
3,622✔
471
  num = TMAX(num, FLUSH_NUM);
3,622✔
472
  code = clearFlushedRowBuff(pFileState, pFlushList, num, false);
3,622✔
473
  QUERY_CHECK_CODE(code, lino, _end);
3,622!
474

475
  if (isListEmpty(pFlushList)) {
3,622✔
476
    code = popUsedBuffs(pFileState, pFlushList, num, false);
3,152✔
477
    QUERY_CHECK_CODE(code, lino, _end);
3,152!
478

479
    if (isListEmpty(pFlushList)) {
3,152✔
480
      code = popUsedBuffs(pFileState, pFlushList, num, true);
2,807✔
481
      QUERY_CHECK_CODE(code, lino, _end);
2,807!
482
    }
483
  }
484

485
  if (pFileState->searchBuff) {
3,622✔
486
    code = clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount, true);
3,527✔
487
    QUERY_CHECK_CODE(code, lino, _end);
3,527!
488
  }
489

490
  flushSnapshot(pFileState, pFlushList, false);
3,622✔
491

492
  SListIter fIter = {0};
3,622✔
493
  tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
3,622✔
494
  SListNode* pNode = NULL;
3,622✔
495
  while ((pNode = tdListNext(&fIter)) != NULL) {
4,952✔
496
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,330✔
497
    code = putFreeBuff(pFileState, pPos);
1,330✔
498
    QUERY_CHECK_CODE(code, lino, _end);
1,330!
499
  }
500

501
  tdListFreeP(pFlushList, destroyRowBuffPosPtr);
3,622✔
502

503
_end:
3,622✔
504
  if (code != TSDB_CODE_SUCCESS) {
3,622!
505
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
506
  }
507
  return code;
3,622✔
508
}
509

510
int32_t clearRowBuff(SStreamFileState* pFileState) {
4,221✔
511
  if (pFileState->deleteMark != INT64_MAX) {
4,221!
512
    clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
×
513
  }
514
  if (isListEmpty(pFileState->freeBuffs)) {
4,221✔
515
    return flushRowBuff(pFileState);
3,622✔
516
  }
517
  return TSDB_CODE_SUCCESS;
599✔
518
}
519

520
void* getFreeBuff(SStreamFileState* pFileState) {
10,934,336✔
521
  SList*     lists = pFileState->freeBuffs;
10,934,336✔
522
  int32_t    buffSize = pFileState->rowSize;
10,934,336✔
523
  SListNode* pNode = tdListPopHead(lists);
10,934,336✔
524
  if (!pNode) {
10,924,889✔
525
    return NULL;
10,872,508✔
526
  }
527
  void* ptr = *(void**)pNode->data;
52,381✔
528
  memset(ptr, 0, buffSize);
52,381✔
529
  taosMemoryFree(pNode);
52,381!
530
  return ptr;
52,506✔
531
}
532

533
void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
×
534
  if (pPos->pRowBuff) {
×
535
    memset(pPos->pRowBuff, 0, pFileState->rowSize);
×
536
  }
537
}
×
538

539
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
10,928,876✔
540
  int32_t      code = TSDB_CODE_SUCCESS;
10,928,876✔
541
  int32_t      lino = 0;
10,928,876✔
542
  SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
10,928,876!
543
  if (!pPos) {
10,924,029!
544
    code = terrno;
×
545
    QUERY_CHECK_CODE(code, lino, _error);
×
546
  }
547

548
  pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
10,924,029!
549
  if (!pPos->pKey) {
10,944,302!
550
    code = terrno;
×
551
    QUERY_CHECK_CODE(code, lino, _error);
×
552
  }
553

554
  void* pBuff = getFreeBuff(pFileState);
10,944,302✔
555
  if (pBuff) {
10,923,312✔
556
    pPos->pRowBuff = pBuff;
52,178✔
557
    goto _end;
52,178✔
558
  }
559

560
  if (pFileState->curRowCount < pFileState->maxRowCount) {
10,871,134✔
561
    pBuff = taosMemoryCalloc(1, pFileState->rowSize);
10,871,120!
562
    QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
10,851,341!
563
    pPos->pRowBuff = pBuff;
10,851,341✔
564
    pFileState->curRowCount++;
10,851,341✔
565
    goto _end;
10,851,341✔
566
  }
567

568
  code = clearRowBuff(pFileState);
14✔
569
  QUERY_CHECK_CODE(code, lino, _error);
53!
570

571
  pPos->pRowBuff = getFreeBuff(pFileState);
53✔
572

573
_end:
10,903,572✔
574
  code = tdListAppend(pFileState->usedBuffs, &pPos);
10,903,572✔
575
  QUERY_CHECK_CODE(code, lino, _error);
10,945,556!
576

577
  QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
10,945,556!
578
_error:
10,945,556✔
579
  if (code != TSDB_CODE_SUCCESS) {
10,945,556!
580
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
581
    return NULL;
×
582
  }
583

584
  return pPos;
10,945,556✔
585
}
586

587
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
10,932,230✔
588
  int32_t      code = TSDB_CODE_SUCCESS;
10,932,230✔
589
  int32_t      lino = 0;
10,932,230✔
590
  SRowBuffPos* newPos = getNewRowPos(pFileState);
10,932,230✔
591
  if (!newPos) {
10,943,209!
592
    code = TSDB_CODE_OUT_OF_MEMORY;
×
593
    QUERY_CHECK_CODE(code, lino, _error);
×
594
  }
595
  newPos->beUsed = true;
10,943,209✔
596
  newPos->beFlushed = false;
10,943,209✔
597
  newPos->needFree = false;
10,943,209✔
598
  newPos->beUpdated = true;
10,943,209✔
599
  return newPos;
10,943,209✔
600

601
_error:
×
602
  if (code != TSDB_CODE_SUCCESS) {
×
603
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
604
  }
605
  return NULL;
×
606
}
607

608
int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
11,693,098✔
609
                             int32_t* pWinCode) {
610
  int32_t code = TSDB_CODE_SUCCESS;
11,693,098✔
611
  int32_t lino = 0;
11,693,098✔
612
  (*pWinCode) = TSDB_CODE_SUCCESS;
11,693,098✔
613
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
11,693,098✔
614
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
11,688,363✔
615
  if (pos) {
11,726,616✔
616
    if (pVal != NULL) {
801,085!
617
      *pVLen = pFileState->rowSize;
801,095✔
618
      *pVal = *pos;
801,095✔
619
      (*pos)->beUsed = true;
801,095✔
620
      (*pos)->beFlushed = false;
801,095✔
621
    }
622
    goto _end;
801,085✔
623
  }
624
  SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
10,925,531✔
625
  if (!pNewPos || !pNewPos->pRowBuff) {
10,932,242!
626
    code = TSDB_CODE_OUT_OF_MEMORY;
×
627
    QUERY_CHECK_CODE(code, lino, _end);
×
628
  }
629

630
  memcpy(pNewPos->pKey, pKey, keyLen);
10,932,242✔
631
  (*pWinCode) = TSDB_CODE_FAILED;
10,932,242✔
632

633
  TSKEY ts = pFileState->getTs(pKey);
10,932,242✔
634
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
10,924,459✔
635
    int32_t len = 0;
134,638✔
636
    void*   p = NULL;
134,638✔
637
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
134,638✔
638
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
134,638✔
639
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
134,638✔
640
      memcpy(pNewPos->pRowBuff, p, len);
541✔
641
    }
642
    taosMemoryFree(p);
134,638!
643
  }
644

645
  code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
10,906,579✔
646
  QUERY_CHECK_CODE(code, lino, _end);
10,860,449!
647

648
  if (pVal) {
10,860,449✔
649
    *pVLen = pFileState->rowSize;
10,857,470✔
650
    *pVal = pNewPos;
10,857,470✔
651
  }
652

653
_end:
2,979✔
654
  if (code != TSDB_CODE_SUCCESS) {
11,661,534!
655
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
656
  }
657
  return code;
11,658,049✔
658
}
659

660
void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
5,469✔
661
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
5,469✔
662
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
5,471✔
663
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
5,471✔
664
  qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
5,469✔
665
  if (pFileState->searchBuff != NULL) {
5,469!
666
    deleteHashSortRowBuff(pFileState, pKey);
×
667
  }
668
}
5,469✔
669

670
int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
×
671
  int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
×
672
  int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
×
673
  if (pFileState->searchBuff != NULL) {
×
674
    deleteHashSortRowBuff(pFileState, pKey);
×
675
  }
676
  if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
×
677
    return TSDB_CODE_SUCCESS;
×
678
  }
679
  return TSDB_CODE_FAILED;
×
680
}
681

682
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
276✔
683
  int32_t code = TSDB_CODE_SUCCESS;
276✔
684
  int32_t lino = 0;
276✔
685
  int32_t len = 0;
276✔
686
  void*   pBuff = NULL;
276✔
687
  code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
276✔
688
  QUERY_CHECK_CODE(code, lino, _end);
276!
689
  memcpy(pPos->pRowBuff, pBuff, len);
276✔
690
  taosMemoryFree(pBuff);
276!
691

692
_end:
276✔
693
  if (code != TSDB_CODE_SUCCESS) {
276!
694
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
695
  }
696
  return code;
276✔
697
}
698

699
static int32_t recoverStateRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
276✔
700
  int32_t code = TSDB_CODE_SUCCESS;
276✔
701
  int32_t lino = 0;
276✔
702
  pPos->pRowBuff = getFreeBuff(pFileState);
276✔
703
  if (!pPos->pRowBuff) {
276✔
704
    if (pFileState->curRowCount < pFileState->maxRowCount) {
43✔
705
      pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
1!
706
      if (!pPos->pRowBuff) {
1!
707
        code = terrno;
×
708
        QUERY_CHECK_CODE(code, lino, _end);
×
709
      }
710
      pFileState->curRowCount++;
1✔
711
    } else {
712
      code = clearRowBuff(pFileState);
42✔
713
      QUERY_CHECK_CODE(code, lino, _end);
42!
714
      pPos->pRowBuff = getFreeBuff(pFileState);
42✔
715
    }
716
    QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
43!
717
  }
718

719
  code = recoverSessionRowBuff(pFileState, pPos);
276✔
720
  QUERY_CHECK_CODE(code, lino, _end);
276!
721

722
_end:
276✔
723
  if (code != TSDB_CODE_SUCCESS) {
276!
724
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
725
  }
726
  return code;
276✔
727
}
728

729
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
10,293,543✔
730
  int32_t code = TSDB_CODE_SUCCESS;
10,293,543✔
731
  int32_t lino = 0;
10,293,543✔
732
  if (pPos->pRowBuff) {
10,293,543✔
733
    if (pPos->needFree) {
10,293,304!
734
      code = recoverSessionRowBuff(pFileState, pPos);
×
735
      QUERY_CHECK_CODE(code, lino, _end);
×
736
    }
737
    (*pVal) = pPos->pRowBuff;
10,293,304✔
738
    goto _end;
10,293,304✔
739
  }
740

741
  code = recoverStateRowBuff(pFileState, pPos);
239✔
742
  QUERY_CHECK_CODE(code, lino, _end);
276!
743

744
  (*pVal) = pPos->pRowBuff;
276✔
745
  if (!pPos->needFree) {
276✔
746
    code = tdListPrepend(pFileState->usedBuffs, &pPos);
76✔
747
    QUERY_CHECK_CODE(code, lino, _end);
76!
748
  }
749

750
_end:
276✔
751
  if (code != TSDB_CODE_SUCCESS) {
10,293,580!
752
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
753
  }
754
  return code;
10,293,419✔
755
}
756

757
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
28✔
758
  SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
28✔
759
  if (pos) {
28✔
760
    return true;
13✔
761
  }
762
  return false;
15✔
763
}
764

765
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
3,507✔
766
  int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
1,936✔
767
                     ? INT64_MIN
768
                     : pFileState->maxTs - pFileState->deleteMark;
5,443✔
769
  clearExpiredRowBuff(pFileState, mark, false);
3,507✔
770
  return pFileState->usedBuffs;
3,508✔
771
}
772

773
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
7,130✔
774
  int32_t   code = TSDB_CODE_SUCCESS;
7,130✔
775
  int32_t   lino = 0;
7,130✔
776
  SListIter iter = {0};
7,130✔
777
  tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
7,130✔
778

779
  const int32_t BATCH_LIMIT = 256;
7,130✔
780

781
  int64_t    st = taosGetTimestampMs();
7,129✔
782
  SListNode* pNode = NULL;
7,129✔
783

784
  int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
7,129✔
785

786
  int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2;
7,128✔
787
  char*   buf = taosMemoryCalloc(1, len);
7,128!
788
  if (!buf) {
7,130!
789
    code = terrno;
×
790
    QUERY_CHECK_CODE(code, lino, _end);
×
791
  }
792

793
  void* batch = streamStateCreateBatch();
7,130✔
794
  if (!batch) {
7,130!
795
    code = TSDB_CODE_OUT_OF_MEMORY;
×
796
    QUERY_CHECK_CODE(code, lino, _end);
×
797
  }
798

799
  while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
1,189,959!
800
    SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
1,182,828✔
801
    if (pPos->beFlushed || !pPos->pRowBuff) {
1,182,828!
802
      continue;
9,901✔
803
    }
804
    pPos->beFlushed = true;
1,172,927✔
805
    pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
1,172,927✔
806

807
    qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
1,172,927✔
808
    if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
1,172,928✔
809
      code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
4,504✔
810
      streamStateClearBatch(batch);
4,504✔
811
      QUERY_CHECK_CODE(code, lino, _end);
4,504!
812
    }
813

814
    void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
1,172,928✔
815
    QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
1,172,928!
816

817
    code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
1,172,928✔
818
                                       0, buf);
819
    taosMemoryFreeClear(pSKey);
1,172,928!
820
    QUERY_CHECK_CODE(code, lino, _end);
1,172,928!
821
    // todo handle failure
822
    memset(buf, 0, len);
1,172,928✔
823
  }
824
  taosMemoryFreeClear(buf);
7,130!
825

826
  int32_t numOfElems = streamStateGetBatchSize(batch);
7,130✔
827
  if (numOfElems > 0) {
7,130✔
828
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,812✔
829
    QUERY_CHECK_CODE(code, lino, _end);
1,812!
830
  } else {
831
    goto _end;
5,318✔
832
  }
833

834
  streamStateClearBatch(batch);
1,812✔
835

836
  clearSearchBuff(pFileState);
1,812✔
837

838
  int64_t elapsed = taosGetTimestampMs() - st;
1,812✔
839
  qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
1,812✔
840
         pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
841

842
  if (flushState) {
1,812✔
843
    void*   valBuf = NULL;
1,148✔
844
    int32_t len = 0;
1,148✔
845
    code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
1,148✔
846
    QUERY_CHECK_CODE(code, lino, _end);
1,148!
847

848
    qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
1,148✔
849
    code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
1,148✔
850
    taosMemoryFree(valBuf);
1,148!
851
    QUERY_CHECK_CODE(code, lino, _end);
1,148!
852

853
    code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
1,148✔
854
    QUERY_CHECK_CODE(code, lino, _end);
1,148!
855
  }
856

857
_end:
664✔
858
  if (code != TSDB_CODE_SUCCESS) {
7,130!
859
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
860
  }
861
  taosMemoryFree(buf);
7,130!
862
  streamStateDestroyBatch(batch);
7,130✔
863
}
7,130✔
864

865
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
×
866
  char keyBuf[128] = {0};
×
867
  TAOS_UNUSED(tsnprintf(keyBuf, sizeof(keyBuf), "%s:%" PRId64 "", TASK_KEY, checkpointId));
×
868
  return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
×
869
}
870

871
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
×
872
  return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
×
873
}
874

875
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
×
876
  int32_t code = TSDB_CODE_SUCCESS;
×
877
  int64_t maxCheckPointId = 0;
×
878
  {
879
    char    buf[128] = {0};
×
880
    void*   val = NULL;
×
881
    int32_t len = 0;
×
882
    memcpy(buf, TASK_KEY, strlen(TASK_KEY));
×
883
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
884
    if (code != 0 || len == 0 || val == NULL) {
×
885
      return TSDB_CODE_FAILED;
×
886
    }
887
    memcpy(buf, val, len);
×
888
    buf[len] = 0;
×
889
    maxCheckPointId = taosStr2Int64((char*)buf, NULL, 10);
×
890
    taosMemoryFree(val);
×
891
  }
892
  for (int64_t i = maxCheckPointId; i > 0; i--) {
×
893
    char    buf[128] = {0};
×
894
    void*   val = 0;
×
895
    int32_t len = 0;
×
896
    TAOS_UNUSED(tsnprintf(buf, sizeof(buf), "%s:%" PRId64 "", TASK_KEY, i));
×
897
    code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
×
898
    if (code != 0) {
×
899
      return TSDB_CODE_FAILED;
×
900
    }
901
    memcpy(buf, val, len);
×
902
    buf[len] = 0;
×
903
    taosMemoryFree(val);
×
904

905
    TSKEY ts;
906
    ts = taosStr2Int64((char*)buf, NULL, 10);
×
907
    if (ts < mark) {
×
908
      // statekey winkey.ts < mark
909
      int32_t tmpRes = forceRemoveCheckpoint(pFileState, i);
×
910
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
911
      break;
×
912
    }
913
  }
914
  return code;
×
915
}
916

917
int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
1,691✔
918
  int32_t code = TSDB_CODE_SUCCESS;
1,691✔
919
  int32_t lino = 0;
1,691✔
920
  int32_t winRes = TSDB_CODE_SUCCESS;
1,691✔
921
  if (pFileState->maxTs != INT64_MIN) {
1,691!
922
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
923
                       ? INT64_MIN
924
                       : pFileState->maxTs - pFileState->deleteMark;
×
925
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
926
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
927
  }
928

929
  SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore, INT64_MAX);
1,691✔
930
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
1,691✔
931
  while (winRes == TSDB_CODE_SUCCESS) {
1,691!
932
    if (pFileState->curRowCount >= recoverNum) {
1,691!
933
      break;
1,691✔
934
    }
935

936
    void*       pVal = NULL;
1,691✔
937
    int32_t     vlen = 0;
1,691✔
938
    SSessionKey key = {0};
1,691✔
939
    winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
1,691✔
940
    if (winRes != TSDB_CODE_SUCCESS) {
1,691!
941
      break;
1,691✔
942
    }
943

UNCOV
944
    if (vlen != pFileState->rowSize) {
×
945
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
946
      QUERY_CHECK_CODE(code, lino, _end);
×
947
    }
948

UNCOV
949
    SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
×
UNCOV
950
    pPos->beUsed = false;
×
UNCOV
951
    winRes = putSessionWinResultBuff(pFileState, pPos);
×
UNCOV
952
    if (winRes != TSDB_CODE_SUCCESS) {
×
953
      break;
×
954
    }
955

UNCOV
956
    winRes = streamStateSessionCurPrev_rocksdb(pCur);
×
957
  }
958

959
_end:
×
960
  if (code != TSDB_CODE_SUCCESS) {
1,691!
961
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
962
  }
963
  streamStateFreeCur(pCur);
1,691✔
964
  return code;
1,691✔
965
}
966

967
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
4,062✔
968
  int32_t code = TSDB_CODE_SUCCESS;
4,062✔
969
  int32_t lino = 0;
4,062✔
970
  int32_t winCode = TSDB_CODE_SUCCESS;
4,062✔
971
  if (pFileState->maxTs != INT64_MIN) {
4,062!
972
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
973
                       ? INT64_MIN
974
                       : pFileState->maxTs - pFileState->deleteMark;
×
975
    int32_t tmpRes = deleteExpiredCheckPoint(pFileState, mark);
×
976
    qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
977
  }
978

979
  SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
4,062✔
980
  int32_t          recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
4,063✔
981
  while (winCode == TSDB_CODE_SUCCESS) {
4,063!
982
    if (pFileState->curRowCount >= recoverNum) {
4,063!
983
      break;
4,063✔
984
    }
985

986
    void*        pVal = NULL;
4,063✔
987
    int32_t      vlen = 0;
4,063✔
988
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4,063✔
989
    if (!pNewPos || !pNewPos->pRowBuff) {
4,063!
990
      code = TSDB_CODE_OUT_OF_MEMORY;
×
991
      QUERY_CHECK_CODE(code, lino, _end);
×
992
    }
993

994
    winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4,063✔
995
    qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
4,063✔
996
    if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
4,063!
997
      destroyRowBuffPos(pNewPos);
4,063✔
998
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
4,063✔
999
      taosMemoryFreeClear(pNode);
4,063!
1000
      taosMemoryFreeClear(pVal);
4,063!
1001
      break;
4,063✔
1002
    }
1003
    if (vlen != pFileState->rowSize) {
×
1004
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1005
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1006
      taosMemoryFreeClear(pVal);
×
1007
      QUERY_CHECK_CODE(code, lino, _end);
×
1008
    }
1009
    memcpy(pNewPos->pRowBuff, pVal, vlen);
×
1010
    taosMemoryFreeClear(pVal);
×
1011
    pNewPos->beFlushed = true;
×
1012
    pNewPos->beUsed = false;
×
1013
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
×
1014
    code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
×
1015
    if (code != TSDB_CODE_SUCCESS) {
×
1016
      destroyRowBuffPos(pNewPos);
×
1017
      break;
×
1018
    }
1019
    streamStateCurPrev_rocksdb(pCur);
×
1020
  }
1021

1022
_end:
×
1023
  if (code != TSDB_CODE_SUCCESS) {
4,063!
1024
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1025
  }
1026
  streamStateFreeCur(pCur);
4,063✔
1027
  return code;
4,063✔
1028
}
1029

1030
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
247,448✔
1031

1032
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
1,490✔
1033
  pFileState->flushMark = TMAX(pFileState->flushMark, ts);
1,490✔
1034
  pFileState->maxTs = TMAX(pFileState->maxTs, ts);
1,490✔
1035
}
1,490✔
1036

1037
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
38,833✔
1038
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
11,410,172✔
1039

1040
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
19,436✔
1041

1042
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
10,916,456✔
1043
  return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 &&
16,679,810✔
1044
         ts < (pFileState->maxTs - pFileState->deleteMark);
5,763,354✔
1045
}
1046

1047
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
7,461,277✔
1048

1049
TSKEY getFlushMark(SStreamFileState* pFileState) { return pFileState->flushMark; };
629✔
1050

1051
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
248,453✔
1052

1053
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
247,449✔
1054
  int32_t winCode = TSDB_CODE_SUCCESS;
247,449✔
1055
  return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen, &winCode);
247,449✔
1056
}
1057

1058
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
137✔
1059
  int32_t code = TSDB_CODE_SUCCESS;
137✔
1060
  int32_t lino = 0;
137✔
1061
  if (pFileState->maxTs != INT64_MIN) {
137!
1062
    int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
×
1063
                       ? INT64_MIN
1064
                       : pFileState->maxTs - pFileState->deleteMark;
×
1065
    code = deleteExpiredCheckPoint(pFileState, mark);
×
1066
    QUERY_CHECK_CODE(code, lino, _end);
×
1067
  }
1068

1069
  SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
137✔
1070
  if (pCur == NULL) {
137✔
1071
    return code;
135✔
1072
  }
1073
  int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
2✔
1074
  int32_t winRes = TSDB_CODE_SUCCESS;
2✔
1075
  while (winRes == TSDB_CODE_SUCCESS) {
4!
1076
    if (pFileState->curRowCount >= recoverNum) {
4!
1077
      break;
2✔
1078
    }
1079

1080
    void*        pVal = NULL;
4✔
1081
    int32_t      vlen = 0;
4✔
1082
    SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
4✔
1083
    winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
4✔
1084
    qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
4!
1085
    if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
4!
1086
      destroyRowBuffPos(pNewPos);
2✔
1087
      SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
2✔
1088
      taosMemoryFreeClear(pNode);
2!
1089
      taosMemoryFreeClear(pVal);
2!
1090
      break;
2✔
1091
    }
1092

1093
    if (vlen != pFileState->rowSize) {
2!
1094
      qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
×
1095
      destroyRowBuffPos(pNewPos);
×
1096
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1097
      taosMemoryFreeClear(pVal);
×
1098
      QUERY_CHECK_CODE(code, lino, _end);
×
1099
    }
1100

1101
    memcpy(pNewPos->pRowBuff, pVal, vlen);
2✔
1102
    taosMemoryFreeClear(pVal);
2!
1103
    pNewPos->beFlushed = true;
2✔
1104
    pNewPos->beUsed = false;
2✔
1105
    qDebug("===stream=== read checkpoint state from disc. %s", __func__);
2!
1106
    winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
2✔
1107
    if (winRes != TSDB_CODE_SUCCESS) {
2!
1108
      destroyRowBuffPos(pNewPos);
×
1109
      break;
×
1110
    }
1111
    streamStateCurPrev_rocksdb(pCur);
2✔
1112
  }
1113
  streamStateFreeCur(pCur);
2✔
1114

1115
_end:
2✔
1116
  if (code != TSDB_CODE_SUCCESS) {
2!
1117
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1118
  }
1119
  return code;
2✔
1120
}
1121

1122
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
1,368✔
1123
                   int32_t* pWinCode) {
1124
  int32_t code = TSDB_CODE_SUCCESS;
1,368✔
1125
  int32_t lino = 0;
1,368✔
1126
  (*pWinCode) = TSDB_CODE_FAILED;
1,368✔
1127
  pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
1,368✔
1128
  SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
1,368✔
1129
  if (ppPos) {
1,368✔
1130
    *pVLen = pFileState->rowSize;
462✔
1131
    *pVal = *ppPos;
462✔
1132
    (*ppPos)->beUsed = true;
462✔
1133
    (*ppPos)->beFlushed = false;
462✔
1134
    (*pWinCode) = TSDB_CODE_SUCCESS;
462✔
1135
    if ((*ppPos)->pRowBuff == NULL) {
462!
1136
      code = recoverStateRowBuff(pFileState, *ppPos);
×
1137
      QUERY_CHECK_CODE(code, lino, _end);
×
1138
    }
1139
    goto _end;
462✔
1140
  }
1141
  TSKEY ts = pFileState->getTs(pKey);
906✔
1142
  if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
906!
1143
    int32_t len = 0;
21✔
1144
    void*   p = NULL;
21✔
1145
    (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
21✔
1146
    qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
21!
1147
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
21✔
1148
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
17✔
1149
      if (!pNewPos || !pNewPos->pRowBuff) {
17!
1150
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1151
        QUERY_CHECK_CODE(code, lino, _end);
×
1152
      }
1153

1154
      memcpy(pNewPos->pKey, pKey, keyLen);
17✔
1155
      memcpy(pNewPos->pRowBuff, p, len);
17✔
1156
      code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
17✔
1157
      QUERY_CHECK_CODE(code, lino, _end);
17!
1158

1159
      if (pVal) {
17!
1160
        *pVLen = pFileState->rowSize;
17✔
1161
        *pVal = pNewPos;
17✔
1162
      }
1163
    }
1164
    taosMemoryFree(p);
21!
1165
  }
1166

1167
_end:
885✔
1168
  if (code != TSDB_CODE_SUCCESS) {
1,368!
1169
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1170
  }
1171
  return code;
1,368✔
1172
}
1173

1174
int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen) {
468✔
1175
  int32_t code = TSDB_CODE_SUCCESS;
468✔
1176
  int32_t lino = 0;
468✔
1177
  if (value != NULL) {
468!
1178
    code = TSDB_CODE_INVALID_PARA;
×
1179
    QUERY_CHECK_CODE(code, lino, _end);
×
1180
  }
1181

1182
  if (tSimpleHashGet(pFileState->pGroupIdMap, &groupId, sizeof(int64_t)) == NULL) {
468✔
1183
    if (tSimpleHashGetSize(pFileState->pGroupIdMap) <= MAX_GROUP_ID_NUM) {
168!
1184
      code = tSimpleHashPut(pFileState->pGroupIdMap, &groupId, sizeof(int64_t), NULL, 0);
168✔
1185
      QUERY_CHECK_CODE(code, lino, _end);
168!
1186
    }
1187
    code = streamStatePutParTag_rocksdb(pFileState->pFileStore, groupId, value, vLen);
168✔
1188
    QUERY_CHECK_CODE(code, lino, _end);
168!
1189
  }
1190

1191
_end:
468✔
1192
  if (code != TSDB_CODE_SUCCESS) {
468!
1193
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1194
  }
1195
  return code;
468✔
1196
}
1197

1198
void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
1,291✔
1199
  SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState;
1,291✔
1200
  if (pCur->hashIter == -1) {
1,291✔
1201
    streamStateCurNext(pFileState->pFileStore, pCur);
68✔
1202
    return;
68✔
1203
  }
1204

1205
  int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,223!
1206
  pCur->minGpId = TMAX(pCur->minGpId, gpId);
1,223✔
1207

1208
  SSHashObj* pHash = pFileState->pGroupIdMap;
1,223✔
1209
  pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
1,223✔
1210
  if (!pCur->pHashData) {
1,223✔
1211
    pCur->hashIter = -1;
753✔
1212
    streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
753✔
1213
    return;
753✔
1214
  }
1215
}
1216

1217
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
3,857✔
1218
  int32_t code = TSDB_CODE_SUCCESS;
3,857✔
1219
  if (pCur->pHashData) {
3,857✔
1220
    *pKey = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
1,223!
1221
    return code;
1,223✔
1222
  }
1223
  return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL);
2,634✔
1224
}
1225

1226
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
2,566✔
1227
  return pFileState->pGroupIdMap;
2,566✔
1228
}
1229

1230
void setFillInfo(SStreamFileState* pFileState) {
×
1231
  pFileState->hasFillCatch = false;
×
1232
}
×
1233

1234
void clearExpiredState(SStreamFileState* pFileState) {
4,125✔
1235
  int32_t    code = TSDB_CODE_SUCCESS;
4,125✔
1236
  int32_t    lino = 0;
4,125✔
1237
  SSHashObj* pSearchBuff = pFileState->searchBuff;
4,125✔
1238
  void*      pIte = NULL;
4,125✔
1239
  int32_t    iter = 0;
4,125✔
1240
  while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
6,151✔
1241
    SArray* pWinStates = *((void**)pIte);
2,026✔
1242
    int32_t size = taosArrayGetSize(pWinStates);
2,026✔
1243
    for (int32_t i = 0; i < size - 1; i++) {
2,445✔
1244
      SWinKey* pKey = taosArrayGet(pWinStates, i);
419✔
1245
      int32_t  code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
419✔
1246
      qTrace("clear expired buff, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_buff);
419!
1247

1248
      if (isFlushedState(pFileState, pKey->ts, 0)) {
419✔
1249
        int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
310✔
1250
        qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
310!
1251
      }
1252
    }
1253
    taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
2,026✔
1254
  }
1255
  code = clearRowBuff(pFileState);
4,125✔
1256
  QUERY_CHECK_CODE(code, lino, _end);
4,126!
1257

1258
_end:
4,126✔
1259
  if (code != TSDB_CODE_SUCCESS) {
4,126!
1260
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1261
  }
1262
}
4,126✔
1263

1264
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
×
1265
                           int32_t* pWinCode) {
1266
  int32_t code = TSDB_CODE_SUCCESS;
×
1267
  int32_t lino = 0;
×
1268

1269
  code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
×
1270
  QUERY_CHECK_CODE(code, lino, _end);
×
1271

1272
  SArray*    pWinStates = NULL;
×
1273
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
×
1274
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
×
1275
  if (ppBuff) {
×
1276
    pWinStates = (SArray*)(*ppBuff);
×
1277
  } else {
1278
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
×
1279
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
×
1280

1281
    code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
×
1282
    QUERY_CHECK_CODE(code, lino, _end);
×
1283
  }
1284

1285
  // recover
1286
  if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
×
1287
    TSKEY            ts = getFlushMark(pFileState);
×
1288
    SWinKey          start = {.groupId = pKey->groupId, .ts = INT64_MAX};
×
1289
    void*            pState = getStateFileStore(pFileState);
×
1290
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
×
1291
    for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
×
1292
      SWinKey tmpKey = {.groupId = pKey->groupId};
×
1293
      int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pState, pCur, &tmpKey, NULL, 0);
×
1294
      if (tmpRes != TSDB_CODE_SUCCESS) {
×
1295
        break;
×
1296
      }
1297
      void* tmp = taosArrayPush(pWinStates, &tmpKey);
×
1298
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1299
      streamStateCurPrev_rocksdb(pCur);
×
1300
    }
1301
    taosArraySort(pWinStates, winKeyCmprImpl);
×
1302
    streamStateFreeCur(pCur);
×
1303
  }
1304

1305
  int32_t size = taosArrayGetSize(pWinStates);
×
1306
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
×
1307
  if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
×
1308
    // find the first position which is smaller than the pKey
1309
    if (index >= 0) {
×
1310
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
×
1311
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
×
1312
        goto _end;
×
1313
      }
1314
    }
1315
    index++;
×
1316
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
×
1317
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1318
  }
1319

1320
  if (size >= MAX_NUM_OF_CACHE_WIN) {
×
1321
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1322
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1323
  }
1324

1325
_end:
×
1326
  if (code != TSDB_CODE_SUCCESS) {
×
1327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1328
  }
1329
  return code;
×
1330
}
1331

1332
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
105✔
1333
                           int32_t* pVLen, int32_t* pWinCode) {
1334
  int32_t    code = TSDB_CODE_SUCCESS;
105✔
1335
  int32_t    lino = 0;
105✔
1336
  SArray*    pWinStates = NULL;
105✔
1337
  SSHashObj* pSearchBuff = getSearchBuff(pFileState);
105✔
1338
  void*      pState = getStateFileStore(pFileState);
105✔
1339
  void**     ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
105✔
1340
  if (ppBuff) {
105!
1341
    pWinStates = (SArray*)(*ppBuff);
105✔
1342
  } else {
UNCOV
1343
    qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
×
UNCOV
1344
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
×
UNCOV
1345
    void*            tmpVal = NULL;
×
UNCOV
1346
    int32_t          len = 0;
×
UNCOV
1347
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
×
UNCOV
1348
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
×
1349
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1350
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1351
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1352
        QUERY_CHECK_CODE(code, lino, _end);
×
1353
      }
1354
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1355
      taosMemoryFreeClear(tmpVal);
×
1356
      *pVLen = getRowStateRowSize(pFileState);
×
1357
      (*ppVal) = pNewPos;
×
1358
    }
UNCOV
1359
    streamStateFreeCur(pCur);
×
UNCOV
1360
    return code;
×
1361
  }
1362
  int32_t size = taosArrayGetSize(pWinStates);
105✔
1363
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
105✔
1364
  if (index >= 0) {
105!
1365
    SWinKey* pCurKey = taosArrayGet(pWinStates, index);
105✔
1366
    if (winKeyCmprImpl(pCurKey, pKey) == 0) {
105!
1367
      index--;
105✔
1368
    } else {
UNCOV
1369
      qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
×
1370
    }
1371
  }
1372
  if (index == -1) {
105✔
1373
    SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
28✔
1374
    void*            tmpVal = NULL;
28✔
1375
    int32_t          len = 0;
28✔
1376
    (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pState, pCur, pResKey, (const void**)&tmpVal, &len);
28✔
1377
    if ((*pWinCode) == TSDB_CODE_SUCCESS) {
28!
1378
      SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
×
1379
      if (!pNewPos || !pNewPos->pRowBuff) {
×
1380
        code = TSDB_CODE_OUT_OF_MEMORY;
×
1381
        QUERY_CHECK_CODE(code, lino, _end);
×
1382
      }
1383
      memcpy(pNewPos->pRowBuff, tmpVal, len);
×
1384
      taosMemoryFreeClear(tmpVal);
×
1385
      *pVLen = getRowStateRowSize(pFileState);
×
1386
      (*ppVal) = pNewPos;
×
1387
    }
1388
    streamStateFreeCur(pCur);
28✔
1389
    return code;
28✔
1390
  } else {
1391
    SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
77✔
1392
    *pResKey = *pPrevKey;
77✔
1393
    return addRowBuffIfNotExist(pFileState, (void*)pPrevKey, sizeof(SWinKey), ppVal, pVLen, pWinCode);
77✔
1394
  }
1395
  (*pWinCode) = TSDB_CODE_FAILED;
1396

1397
_end:
×
1398
  if (code != TSDB_CODE_SUCCESS) {
×
1399
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1400
  }
1401
  return code;
×
1402
}
1403

1404
int32_t addSearchItem(SStreamFileState* pFileState, SArray* pWinStates, const SWinKey* pKey) {
1,682✔
1405
  int32_t code = TSDB_CODE_SUCCESS;
1,682✔
1406
  int32_t lino = 0;
1,682✔
1407
  int32_t size = taosArrayGetSize(pWinStates);
1,682✔
1408
  int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
1,682✔
1409
  if (!isFlushedState(pFileState, pKey->ts, 0) || index >= 0 || size == 0) {
1,682!
1410
    if (index >= 0) {
1,682✔
1411
      SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
1,471✔
1412
      if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
1,471✔
1413
        goto _end;
1,052✔
1414
      }
1415
    }
1416
    index++;
630✔
1417
    void* tmp = taosArrayInsert(pWinStates, index, pKey);
630✔
1418
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
630!
1419
  }
1420

1421
  if (size >= MAX_NUM_OF_CACHE_WIN) {
630!
1422
    int32_t num = size - NUM_OF_CACHE_WIN;
×
1423
    taosArrayRemoveBatch(pWinStates, 0, num, NULL);
×
1424
  }
1425
_end:
630✔
1426
  if (code != TSDB_CODE_SUCCESS) {
1,682!
1427
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1428
  }
1429
  return code;
1,682✔
1430
}
1431

1432
int32_t addArrayBuffIfNotExist(SSHashObj* pSearchBuff, uint64_t groupId, SArray** ppResStates) {
2,701✔
1433
  int32_t code = TSDB_CODE_SUCCESS;
2,701✔
1434
  int32_t lino = 0; 
2,701✔
1435
  SArray*    pWinStates = NULL;
2,701✔
1436
  void**     ppBuff = tSimpleHashGet(pSearchBuff, &groupId, sizeof(uint64_t));
2,701✔
1437
  if (ppBuff) {
2,701✔
1438
    pWinStates = (SArray*)(*ppBuff);
2,488✔
1439
  } else {
1440
    pWinStates = taosArrayInit(16, sizeof(SWinKey));
213✔
1441
    QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
213!
1442

1443
    code = tSimpleHashPut(pSearchBuff, &groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
213✔
1444
    QUERY_CHECK_CODE(code, lino, _end);
213!
1445
  }
1446

1447
  (*ppResStates) = pWinStates;
2,701✔
1448

1449
_end:
2,701✔
1450
  if (code != TSDB_CODE_SUCCESS) {
2,701!
1451
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1452
  }
1453
  return code;
2,701✔
1454
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc