• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.7
/source/libs/stream/src/streamState.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamState.h"
17
#include "executor.h"
18
#include "osMemory.h"
19
#include "rocksdb/c.h"
20
#include "streamBackendRocksdb.h"
21
#include "streamInt.h"
22
#include "tcoding.h"
23
#include "tcommon.h"
24
#include "tcompare.h"
25
#include "tref.h"
26

27
#define MAX_TABLE_NAME_NUM 200000
28

29
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
1✔
30
  if (pWin1->groupId > pWin2->groupId) {
1✔
31
    return 1;
×
32
  } else if (pWin1->groupId < pWin2->groupId) {
1✔
33
    return -1;
×
34
  }
35

36
  if (pWin1->win.skey > pWin2->win.ekey) {
1✔
37
    return 1;
×
38
  } else if (pWin1->win.ekey < pWin2->win.skey) {
1✔
39
    return -1;
×
40
  }
41

42
  return 0;
1✔
43
}
44

45
int countRangeKeyEqual(const SSessionKey* pWin1, const SSessionKey* pWin2) {
×
46
  if (pWin1->groupId == pWin2->groupId && pWin1->win.skey <= pWin2->win.skey && pWin2->win.skey <= pWin1->win.ekey) {
×
47
    return 0;
×
48
  }
49

50
  return 1;
×
51
}
52

53
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
4,294✔
54
  if (pWin1->groupId > pWin2->groupId) {
4,294✔
55
    return 1;
×
56
  } else if (pWin1->groupId < pWin2->groupId) {
4,294✔
57
    return -1;
×
58
  }
59

60
  if (pWin1->win.skey > pWin2->win.skey) {
4,294✔
61
    return 1;
671✔
62
  } else if (pWin1->win.skey < pWin2->win.skey) {
3,623✔
63
    return -1;
2,685✔
64
  }
65

66
  if (pWin1->win.ekey > pWin2->win.ekey) {
938✔
67
    return 1;
×
68
  } else if (pWin1->win.ekey < pWin2->win.ekey) {
938✔
69
    return -1;
×
70
  }
71

72
  return 0;
938✔
73
}
74

75
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
4,294✔
76
  SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
4,294✔
77
  SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
4,294✔
78

79
  if (pWin1->opNum > pWin2->opNum) {
4,294✔
80
    return 1;
×
81
  } else if (pWin1->opNum < pWin2->opNum) {
4,294✔
82
    return -1;
×
83
  }
84

85
  return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
4,294✔
86
}
87

88
int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
2,674✔
89
  SStateKey* pWin1 = (SStateKey*)pKey1;
2,674✔
90
  SStateKey* pWin2 = (SStateKey*)pKey2;
2,674✔
91

92
  if (pWin1->opNum > pWin2->opNum) {
2,674✔
93
    return 1;
14✔
94
  } else if (pWin1->opNum < pWin2->opNum) {
2,660✔
95
    return -1;
92✔
96
  }
97

98
  return winKeyCmprImpl(&pWin1->key, &pWin2->key);
2,568✔
99
}
100

101
SStreamState* streamStateRecalatedOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
×
102
  int32_t      code = TSDB_CODE_SUCCESS;
×
103
  int32_t      lino = 0;
×
104
  SStreamTask* pStreamTask = pTask;
×
105
  if (!streamTaskShouldRecalated(pStreamTask)) {
×
106
    stError("failed to open recalation stream state %s", path);
×
107
    terrno = TSDB_CODE_INVALID_PARA;
×
108
    return NULL;
×
109
  }
110

111
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
×
112
  stDebug("open stream state %p, %s", pState, path);
×
113

114
  if (pState == NULL) {
×
115
    code = terrno;
×
116
    QUERY_CHECK_CODE(code, lino, _end);
×
117
  }
118

119
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
×
120
  if (pState->pTdbState == NULL) {
×
121
    streamStateDestroy(pState, true);
×
122
    code = terrno;
×
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
  pState->streamId = streamId;
×
127
  pState->taskId = taskId;
×
128

129
  pState->pTdbState->recalc = 1;
×
130

131
  TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x-%s",
×
132
                        pState->streamId, pState->taskId, "recalc"));
133

134
  code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr, 1);
×
135
  QUERY_CHECK_CODE(code, lino, _end);
×
136

137
  SStreamMeta* pMeta = pStreamTask->pMeta;
×
138
  pState->pTdbState->pOwner = pTask;
×
139
  pState->pFileState = NULL;
×
140
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
×
141
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
×
142
  QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno);
×
143

144
  stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
×
145
         pState->taskId);
146
  return pState;
×
147
_end:
×
148
  if (code != TSDB_CODE_SUCCESS) {
×
149
    qError("0x%x %s recalated failed at line %d since %s", taskId, __func__, lino, tstrerror(code));
×
150
  }
151

152
  return NULL;
×
153
}
154
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
23✔
155
  int32_t      code = TSDB_CODE_SUCCESS;
23✔
156
  int32_t      lino = 0;
23✔
157
  SStreamTask* pStreamTask = pTask;
23✔
158

159
  SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
23✔
160
  stDebug("s-task:%s open stream state %p, %s", pStreamTask->id.idStr, pState, path);
23✔
161

162
  TAOS_UNUSED(tsnprintf(pState->pTaskIdStr, sizeof(pState->pTaskIdStr), "TID:0x%x QID:0x%" PRIx64,
23✔
163
                        taskId, streamId));
164

165
  if (pState == NULL) {
23✔
166
    code = terrno;
×
167
    QUERY_CHECK_CODE(code, lino, _end);
×
168
  }
169

170
  pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
23✔
171
  if (pState->pTdbState == NULL) {
23✔
172
    streamStateDestroy(pState, true);
×
173
    code = terrno;
×
174
    QUERY_CHECK_CODE(code, lino, _end);
×
175
  }
176

177
  pState->streamId = streamId;
23✔
178
  pState->taskId = taskId;
23✔
179
  TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x",
23✔
180
                        pState->streamId, pState->taskId));
181
  // recal id + cal
182
  code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr, 0);
23✔
183
  QUERY_CHECK_CODE(code, lino, _end);
23✔
184

185
  SStreamMeta* pMeta = pStreamTask->pMeta;
23✔
186
  pState->pTdbState->pOwner = pTask;
23✔
187
  pState->pFileState = NULL;
23✔
188
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
23✔
189
  pState->parNameMap = tSimpleHashInit(1024, hashFn);
23✔
190
  QUERY_CHECK_NULL(pState->parNameMap, code, lino, _end, terrno);
23✔
191

192
  stInfo("s-task:%s open state %p on backend %p 0x%" PRIx64 "-%d succ", pStreamTask->id.idStr, pState,
23✔
193
         pMeta->streamBackend, pState->streamId, pState->taskId);
194
  return pState;
23✔
195

196
_end:
×
197
  if (code != TSDB_CODE_SUCCESS) {
×
198
    qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code));
×
199
  }
200
  return NULL;
×
201
}
202

203
int32_t streamStateDelTaskDb(SStreamState* pState) {
×
204
  SStreamTask* pTask = pState->pTdbState->pOwner;
×
205
  taskDbRemoveRef(pTask->pBackend);
×
206
  taosMemoryFree(pTask);
×
207
  return 0;
×
208
}
209
void streamStateClose(SStreamState* pState, bool remove) {
20✔
210
  SStreamTask* pTask = pState->pTdbState->pOwner;
20✔
211
  streamStateDestroy(pState, remove);
20✔
212
}
20✔
213

214
int32_t streamStateBegin(SStreamState* pState) { return 0; }
×
215

216
void streamStateCommit(SStreamState* pState) {
6✔
217
  if (pState->pFileState) {
6✔
218
    SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
6✔
219
    flushSnapshot(pState->pFileState, pShot, true);
6✔
220
  }
221
}
6✔
222

223
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
×
224
  int32_t code = TSDB_CODE_SUCCESS;
×
225
  int32_t lino = 0;
×
226
  void*   pVal = NULL;
×
227
  int32_t len = getRowStateRowSize(pState->pFileState);
×
228
  int32_t tmpLen = len;
×
229
  code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
×
230
  QUERY_CHECK_CODE(code, lino, _end);
×
231

232
  char*   buf = ((SRowBuffPos*)pVal)->pRowBuff;
×
233
  int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
×
234
  memcpy(buf + len - rowSize, value, vLen);
×
235

236
_end:
×
237
  if (code != TSDB_CODE_SUCCESS) {
×
238
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
239
  }
240
  return code;
×
241
}
242
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
×
243
  int32_t code = TSDB_CODE_SUCCESS;
×
244
  int32_t lino = 0;
×
245
  void*   pVal = NULL;
×
246
  int32_t len = getRowStateRowSize(pState->pFileState);
×
247
  int32_t tmpLen = len;
×
248
  code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
×
249
  QUERY_CHECK_CODE(code, lino, _end);
×
250

251
  char*   buf = ((SRowBuffPos*)pVal)->pRowBuff;
×
252
  int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
×
253
  *ppVal = buf + len - rowSize;
×
254
  streamStateReleaseBuf(pState, pVal, false);
×
255

256
_end:
×
257
  if (code != TSDB_CODE_SUCCESS) {
×
258
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
259
  }
260
  return code;
×
261
}
262

263
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { return 0; }
×
264

265
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
1,349✔
266
  return addRowBuffIfNotExist(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
1,349✔
267
}
268

269
bool streamStateCheck(SStreamState* pState, const SWinKey* pKey, bool hasLimit, bool* pIsLast) {
×
270
  return hasRowBuff(pState->pFileState, pKey, hasLimit, pIsLast);
×
271
}
272

273
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
1,386✔
274
  int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
1,386✔
275
  streamStateReleaseBuf(pState, pos, false);
1,386✔
276
  return code;
1,386✔
277
}
278

279
void streamStateDel(SStreamState* pState, const SWinKey* key) {
×
280
  deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
×
281
}
×
282

283
void streamStateDelByGroupId(SStreamState* pState, uint64_t groupId) {
×
284
  deleteRowBuffByGroupId(pState->pFileState, groupId);
×
285
}
×
286

287
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
×
288
  return streamStateFillPut_rocksdb(pState, key, value, vLen);
×
289
}
290

291
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
×
292
  if (pState->pFileState) {
×
293
    return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
×
294
  }
295
  return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
×
296
}
297

298
int32_t streamStateFillAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
×
299
                                     int32_t* pWinCode) {
300
  return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
×
301
}
302

303
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
×
304
                               int32_t* pWinCode) {
305
  return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
×
306
}
307

308
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
×
309
                               int32_t* pWinCode) {
310
  return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
×
311
}
312

313
void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
×
314
  int32_t code = streamStateFillDel_rocksdb(pState, key);
×
315
  qTrace("%s at line %d res %d", __func__, __LINE__, code);
×
316
}
×
317

318
void streamStateClear(SStreamState* pState) { streamFileStateClear(pState->pFileState); }
188✔
319

320
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
29✔
321
  pState->number = number;
29✔
322
  pState->tsIndex = tsIdex;
29✔
323
}
29✔
324

325
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
3✔
326
  int32_t code = TSDB_CODE_SUCCESS;
3✔
327
  int32_t lino = 0;
3✔
328

329
  char* cfName = "default";
3✔
330
  void* batch = streamStateCreateBatch();
3✔
331
  code = streamStatePutBatch(pState, cfName, batch, pKey, pVal, vLen, 0);
3✔
332
  QUERY_CHECK_CODE(code, lino, _end);
3✔
333

334
  code = streamStatePutBatch_rocksdb(pState, batch);
3✔
335
  QUERY_CHECK_CODE(code, lino, _end);
3✔
336

337
_end:
3✔
338
  if (code != TSDB_CODE_SUCCESS) {
3✔
339
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
340
  }
341
  streamStateDestroyBatch(batch);
3✔
342
}
3✔
343

344
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) {
36✔
345
  int32_t code = TSDB_CODE_SUCCESS;
36✔
346
  code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen);
36✔
347
  return code;
36✔
348
}
349

350
int32_t streamStateCreate(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
×
351
  return createRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
×
352
}
353

354
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
1,349✔
355
                                 int32_t* pWinCode) {
356
  int32_t code = TSDB_CODE_SUCCESS;
1,349✔
357
  int32_t lino = 0;
1,349✔
358

359
  bool       isEnd = false;
1,349✔
360
  SSHashObj* pSearchBuff = getSearchBuff(pState->pFileState);
1,349✔
361
  if (pSearchBuff != NULL) {
1,349✔
362
    SArray* pWinStates = NULL;
×
363
    code = addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates);
×
364
    QUERY_CHECK_CODE(code, lino, _end);
×
365

366
    // recover
367
    if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pState->pFileState)) {
×
368
      code = recoverHashSortBuff(pState->pFileState, pWinStates, pKey->groupId);
×
369
      QUERY_CHECK_CODE(code, lino, _end);
×
370
    }
371

372
    code = addSearchItem(pState->pFileState, pWinStates, pKey, &isEnd);
×
373
    QUERY_CHECK_CODE(code, lino, _end);
×
374
  }
375

376
  if (isEnd) {
1,349✔
377
    code = streamStateCreate(pState, pKey, pVal, pVLen);
×
378
    QUERY_CHECK_CODE(code, lino, _end);
×
379
    (*pWinCode) = TSDB_CODE_FAILED;
×
380
  } else {
381
    code = streamStateGet(pState, pKey, pVal, pVLen, pWinCode);
1,349✔
382
    QUERY_CHECK_CODE(code, lino, _end);
1,349✔
383
  }
384

385
_end:
1,349✔
386
  if (code != TSDB_CODE_SUCCESS) {
1,349✔
387
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
388
  }
389
  return code;
1,349✔
390
}
391

392
void streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
1,386✔
393
  if (!pVal) {
1,386✔
394
    return;
×
395
  }
396
  streamFileStateReleaseBuff(pState->pFileState, pVal, used);
1,386✔
397
}
398

399
void streamStateClearBuff(SStreamState* pState, void* pVal) { streamFileStateClearBuff(pState->pFileState, pVal); }
×
400

401
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
×
402
  return streamStateFillGetCur_rocksdb(pState, key);
×
403
}
404

405
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
×
406
  return streamStateGetAndCheckCur_rocksdb(pState, key);
×
407
}
408

409
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
×
410
  return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen);
×
411
}
412

413
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
×
414
  return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
×
415
}
416

417
int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
×
418
  return streamStateFillGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
×
419
}
420

421
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
×
422
  return streamStateSeekKeyNext_rocksdb(pState, key);
×
423
}
424

425
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
×
426
  return streamStateFillSeekKeyNext_rocksdb(pState, key);
×
427
}
428

429
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
×
430
  return streamStateFillSeekKeyPrev_rocksdb(pState, key);
×
431
}
432

433
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { sessionWinStateMoveToNext(pCur); }
×
434

435
void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
×
436
  qTrace("move cursor to next");
×
437
  streamStateCurPrev_rocksdb(pCur);
×
438
}
×
439

440
void streamStateResetCur(SStreamStateCur* pCur) {
225✔
441
  if (!pCur) {
225✔
442
    return;
×
443
  }
444
  if (pCur->iter) rocksdb_iter_destroy(pCur->iter);
225✔
445
  if (pCur->snapshot) rocksdb_release_snapshot(pCur->db, pCur->snapshot);
225✔
446
  if (pCur->readOpt) rocksdb_readoptions_destroy(pCur->readOpt);
225✔
447

448
  memset(pCur, 0, sizeof(SStreamStateCur));
225✔
449

450
  pCur->buffIndex = -1;
225✔
451
}
452

453
void streamStateFreeCur(SStreamStateCur* pCur) {
242✔
454
  if (!pCur) {
242✔
455
    return;
17✔
456
  }
457
  streamStateResetCur(pCur);
225✔
458
  taosMemoryFree(pCur);
225✔
459
}
460

461
void streamStateFreeVal(void* val) { taosMemoryFree(val); }
780✔
462

463
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
×
464
  int32_t      code = TSDB_CODE_SUCCESS;
×
465
  int32_t      lino = 0;
×
466
  SRowBuffPos* pos = (SRowBuffPos*)value;
×
467
  if (pos->needFree) {
×
468
    if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
×
469
      if (!pos->pRowBuff) {
×
470
        goto _end;
×
471
      }
472
      code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, getFileStateRowSize(pState->pFileState));
×
473
      QUERY_CHECK_CODE(code, lino, _end);
×
474

475
      streamStateReleaseBuf(pState, pos, true);
×
476
      code = putFreeBuff(pState->pFileState, pos);
×
477
      QUERY_CHECK_CODE(code, lino, _end);
×
478

479
      stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
×
480
              key->win.ekey, key->groupId, code);
481
    } else {
482
      pos->beFlushed = false;
×
483
      code = putSessionWinResultBuff(pState->pFileState, value);
×
484
      QUERY_CHECK_CODE(code, lino, _end);
×
485
    }
486
  }
487

488
_end:
×
489
  if (code != TSDB_CODE_SUCCESS) {
×
490
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
491
  }
492
  return code;
×
493
}
494

495
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur,
×
496
                                                     const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
497
  return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
×
498
}
499

500
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
×
501
  return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen, pWinCode);
×
502
}
503

504
void streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
×
505
  qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
×
506
         key->groupId);
507
  deleteRowBuff(pState->pFileState, key, sizeof(SSessionKey));
×
508
}
×
509

510
void streamStateSessionReset(SStreamState* pState, void* pVal) {
×
511
  int32_t len = getRowStateRowSize(pState->pFileState);
×
512
  memset(pVal, 0, len);
×
513
}
×
514

515
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
×
516
  return sessionWinStateSeekKeyCurrentPrev(pState->pFileState, key);
×
517
}
518

519
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
×
520
  if (pState->pFileState != NULL) {
×
521
    return sessionWinStateSeekKeyCurrentNext(pState->pFileState, key);
×
522
  }
523
  return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
×
524
}
525

526
SStreamStateCur *streamStateSessionSeekKeyPrev(SStreamState *pState, const SSessionKey *key) {
×
527
  return sessionWinStateSeekKeyPrev(pState->pFileState, key);
×
528
}
529

530
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
×
531
  return sessionWinStateSeekKeyNext(pState->pFileState, key);
×
532
}
533

534
SStreamStateCur* streamStateCountSeekKeyPrev(SStreamState* pState, const SSessionKey* key, COUNT_TYPE count) {
×
535
  return countWinStateSeekKeyPrev(pState->pFileState, key, count);
×
536
}
537

538
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
×
539
  if (pCur != NULL && pCur->pStreamFileState != NULL) {
×
540
    return sessionWinStateGetKVByCur(pCur, pKey, pVal, pVLen);
×
541
  }
542
  return streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, pVal, pVLen);
×
543
}
544

545
void streamStateSessionClear(SStreamState* pState) {
×
546
  sessionWinStateClear(pState->pFileState);
×
547
  streamStateSessionClear_rocksdb(pState);
×
548
}
×
549

550
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
×
551
  return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey, sessionRangeKeyCmpr);
×
552
}
553

554
int32_t streamStateCountGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
×
555
  return sessionWinStateGetKeyByRange(pState->pFileState, key, curKey, countRangeKeyEqual);
×
556
}
557

558
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen,
×
559
                                        int32_t* pWinCode) {
560
  return getSessionWinResultBuff(pState->pFileState, key, gap, pVal, pVLen, pWinCode);
×
561
}
562

563
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
×
564
                                      state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
565
  return getStateWinResultBuff(pState->pFileState, key, pKeyData, keyDataLen, fn, pVal, pVLen, pWinCode);
×
566
}
567

568
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
10✔
569
  int32_t code = TSDB_CODE_SUCCESS;
10✔
570
  int32_t lino = 0;
10✔
571
  if (pState->parNameMap == NULL) {
10✔
572
    return TSDB_CODE_SUCCESS;
×
573
  }
574

575
  if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
10✔
576
    if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
10✔
577
      code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
10✔
578
      QUERY_CHECK_CODE(code, lino, _end);
10✔
579
    }
580
    code = streamStatePutParName_rocksdb(pState, groupId, tbname);
10✔
581
    QUERY_CHECK_CODE(code, lino, _end);
10✔
582
  }
583

584
_end:
10✔
585
  qTrace("%s tbname:%s, groupId:%"PRId64, __func__, tbname, groupId);
10✔
586
  if (code != TSDB_CODE_SUCCESS) {
10✔
587
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
588
  }
589
  return code;
10✔
590
}
591

592
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode) {
780✔
593
  int32_t code = TSDB_CODE_SUCCESS;
780✔
594
  int32_t lino = 0;
780✔
595
  if (pState->parNameMap == NULL) {
780✔
596
    (*pWinCode) = TSDB_CODE_FAILED;
382✔
597
    return code;
382✔
598
  }
599

600
  void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
398✔
601
  if (!pStr) {
398✔
602
    if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
279✔
603
      (*pWinCode) = TSDB_CODE_FAILED;
10✔
604
      goto _end;
10✔
605
    }
606
    (*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
269✔
607
    if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
269✔
608
      code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
×
609
      qDebug("put into parNameMap, total size:%d, groupId:%" PRId64 ", name:%s", tSimpleHashGetSize(pState->parNameMap),
×
610
             groupId, (char*) (*pVal));
611

612
      QUERY_CHECK_CODE(code, lino, _end);
×
613
    }
614
    goto _end;
269✔
615
  }
616

617
  *pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
119✔
618
  if (!(*pVal)) {
119✔
619
    code = terrno;
×
620
    QUERY_CHECK_CODE(code, lino, _end);
×
621
  }
622

623
  memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);
119✔
624
  (*pWinCode) = TSDB_CODE_SUCCESS;
119✔
625

626
_end:
398✔
627
  qTrace("%s tbname:%s, groupId:%"PRId64, __func__, (char*)(*pVal), groupId);
398✔
628
  if (code != TSDB_CODE_SUCCESS) {
398✔
629
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
630
  }
631
  return code;
398✔
632
}
633

634
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
×
635
  if (pState->parNameMap == NULL) {
×
636
    return TSDB_CODE_INVALID_PARA;
×
637
  }
638

639
  int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
×
640
  if (TSDB_CODE_SUCCESS != code) {
×
641
    qWarn("failed to remove parname from cache, code:%d", code);
×
642
  }
643
  code = streamStateDeleteParName_rocksdb(pState, groupId);
×
644
  if (TSDB_CODE_SUCCESS != code) {
×
645
    qWarn("failed to remove parname from rocksdb, code:%d", code);
×
646
  }
647
  return TSDB_CODE_SUCCESS;
×
648
}
649

650
void streamStateSetParNameInvalid(SStreamState* pState) {
207,382✔
651
  if (pState != NULL) {
207,382✔
652
    tSimpleHashCleanup(pState->parNameMap);
722✔
653
    pState->parNameMap = NULL;
722✔
654
  }
655
}
207,382✔
656

657
void streamStateDestroy(SStreamState* pState, bool remove) {
20✔
658
  streamFileStateDestroy(pState->pFileState);
20✔
659
  tSimpleHashCleanup(pState->parNameMap);
20✔
660
  // do nothong
661
  taosMemoryFreeClear(pState->pTdbState);
20✔
662
  taosMemoryFreeClear(pState);
20✔
663
}
20✔
664

665
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
1✔
666

667
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
13✔
668
  dst->pFileState = src->pFileState;
13✔
669
  dst->parNameMap = src->parNameMap;
13✔
670
  dst->number = src->number;
13✔
671
  dst->taskId = src->taskId;
13✔
672
  dst->streamId = src->streamId;
13✔
673
  if (dst->pTdbState == NULL) {
13✔
674
    dst->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
13✔
675
    dst->pTdbState->pOwner = taosMemoryCalloc(1, sizeof(SStreamTask));
13✔
676
  }
677
  dst->dump = 1;
13✔
678
  dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
13✔
679
  dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut;
13✔
680
  dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet;
13✔
681
  dst->pExprSupp = src->pExprSupp;
13✔
682
  return;
13✔
683
}
684
SStreamStateCur* createStreamStateCursor() {
230✔
685
  SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
230✔
686
  if (pCur == NULL) {
230✔
687
    return NULL;
×
688
  }
689
  pCur->buffIndex = -1;
230✔
690
  return pCur;
230✔
691
}
692

693
// count window
694
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
×
695
                                         int32_t* pVLen, int32_t* pWinCode) {
696
  return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
×
697
}
698

699
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
×
700
                               int32_t* pVLen) {
701
  return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
×
702
}
703

704
int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen) {
×
705
  return streamFileStateGroupPut(pState->pFileState, groupId, value, vLen);
×
706
}
707

708
SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
×
709
  SStreamStateCur* pCur = createStateCursor(pState->pFileState);
×
710
  pCur->hashIter = 0;
×
711
  pCur->pHashData = NULL;
×
712
  SSHashObj* pMap = getGroupIdCache(pState->pFileState);
×
713
  pCur->pHashData = tSimpleHashIterate(pMap, pCur->pHashData, &pCur->hashIter);
×
714
  if (pCur->pHashData == NULL) {
×
715
    pCur->hashIter = -1;
×
716
    streamStateParTagSeekKeyNext_rocksdb(pState, INT64_MIN, pCur);
×
717
  }
718
  return pCur;
×
719
}
720

721
void streamStateGroupCurNext(SStreamStateCur* pCur) { streamFileStateGroupCurNext(pCur); }
×
722

723
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
×
724
  if (pVal != NULL) {
×
725
    return -1;
×
726
  }
727
  return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
×
728
}
729

730
void streamStateClearExpiredState(SStreamState* pState, int32_t numOfKeep, TSKEY minTs) {
×
731
  qDebug("===stream=== clear stream state. keep:%d, ts:%" PRId64, numOfKeep, minTs);
×
732
  if (numOfKeep == 0) {
×
733
    streamFileStateClear(pState->pFileState);
×
734
    SSHashObj* pSearchBuff = getSearchBuff(pState->pFileState);
×
735
    void*      pIte = NULL;
×
736
    int32_t    iter = 0;
×
737
    while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) {
×
738
      SArray* pWinStates = *((void**)pIte);
×
739
      taosArrayClear(pWinStates);
×
740
    }
741
    return;
×
742
  }
743
  clearExpiredState(pState->pFileState, numOfKeep, minTs);
×
744
}
745

746
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
×
747
                           int32_t* pWinCode) {
748
  return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
×
749
}
750

751
int32_t streamStateGetAllPrev(SStreamState* pState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum) {
×
752
  return getRowStateAllPrevRow(pState->pFileState, pKey, pResArray, maxNum);
×
753
}
754

755
int32_t streamStateGetAndSetTsData(STableTsDataState* pState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
×
756
                                   TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode) {
757
  return getAndSetTsData(pState, tableUid, pCurTs, ppCurPkVal, lastTs, pLastPkVal, lastPkLen, pWinCode);
×
758
}
759

760
int32_t streamStateTsDataCommit(STableTsDataState* pState) {
×
761
  int32_t code = doTsDataCommit(pState);
×
762
  if (code != TSDB_CODE_SUCCESS) return code;
×
763
  return doRangeDataCommit(pState);
×
764
}
765

766
int32_t streamStateInitTsDataState(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState,
×
767
                                   void* pOtherState) {
768
  return initTsDataState(ppTsDataState, pkType, pkLen, pState, pOtherState);
×
769
}
770

771
void streamStateDestroyTsDataState(STableTsDataState* pTsDataState) { destroyTsDataState(pTsDataState); }
×
772

773
int32_t streamStateRecoverTsData(STableTsDataState* pTsDataState) { return recoverTsData(pTsDataState); }
×
774

775
SStreamStateCur* streamStateGetLastStateCur(SStreamState* pState) { return getLastStateCur(pState->pFileState, getSearchBuff); }
×
776

777
void streamStateLastStateCurNext(SStreamStateCur* pCur) { moveLastStateCurNext(pCur, getSearchBuff); }
×
778

779
int32_t streamStateNLastStateGetKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
780
  return getNLastStateKVByCur(pCur, num, pRes);
×
781
}
782

783
SStreamStateCur* streamStateGetLastSessionStateCur(SStreamState* pState) { return getLastStateCur(pState->pFileState, getRowStateBuff); }
×
784

785
void streamStateLastSessionStateCurNext(SStreamStateCur* pCur) { moveLastStateCurNext(pCur, getRowStateBuff); }
×
786

787
int32_t streamStateNLastSessionStateGetKVByCur(SStreamStateCur* pCur, int32_t num, SArray* pRes) {
×
788
  return getNLastSessionStateKVByCur(pCur, num, pRes);
×
789
}
790

791
int32_t streamStateReloadTsDataState(STableTsDataState* pTsDataState) { return reloadTsDataState(pTsDataState); }
×
792

793
int32_t streamStateMergeAndSaveScanRange(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId,
×
794
                                         SRecDataInfo* pRecData, int32_t len) {
795
  return mergeAndSaveScanRange(pTsDataState, pWin, gpId, pRecData, len);
×
796
}
797

798
int32_t streamStateMergeAllScanRange(STableTsDataState* pTsDataState) { return mergeAllScanRange(pTsDataState); }
×
799

800
int32_t streamStatePopScanRange(STableTsDataState* pTsDataState, SScanRange* pRange) {
×
801
  return popScanRange(pTsDataState, pRange);
×
802
}
803

804
int32_t streamStateGetNumber(SStreamState* pState) { return pState->number; }
×
805

806
int32_t streamStateDeleteInfo(SStreamState* pState, void* pKey, int32_t keyLen) {
×
807
  return streamDefaultDel_rocksdb(pState, pKey);
×
808
}
809

810
int32_t streamStateSessionSaveToDisk(STableTsDataState* pTblState, SSessionKey* pKey, SRecDataInfo* pVal,
×
811
                                     int32_t vLen) {
812
  SStreamState* pState = pTblState->pState;
×
813
  qDebug("===stream===%s save recalculate range.recId:%d. start:%" PRId64 ",end:%" PRId64 ",groupId:%" PRIu64
×
814
         ". cal start:%" PRId64 ",cal end:%" PRId64 ",tbl uid:%" PRIu64 ",data version:%" PRId64 ",mode:%d",
815
         pState->pTaskIdStr, pState->number, pKey->win.skey, pKey->win.ekey, pKey->groupId, pVal->calWin.skey,
816
         pVal->calWin.ekey, pVal->tableUid, pVal->dataVersion, pVal->mode);
817
  return saveRecInfoToDisk(pTblState, pKey, pVal, vLen);
×
818
}
819

820
int32_t streamStateFlushReaminInfoToDisk(STableTsDataState* pTblState) {
×
821
  return flushRemainRecInfoToDisk(pTblState);
×
822
}
823

824
int32_t streamStateSessionDeleteAll(SStreamState* pState) {
×
825
  SSessionKey key = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
826
  while (1) {
×
827
    SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
×
828
    SSessionKey      delKey = {0};
×
829
    int32_t          winRes = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, NULL, 0);
×
830
    if (winRes != TSDB_CODE_SUCCESS) {
×
831
      streamStateFreeCur(pCur);
×
832
      break;
×
833
    }
834
    streamStateSessionDel_rocksdb(pState, &delKey);
×
835
    streamStateFreeCur(pCur);
×
836
  }
837
  return TSDB_CODE_SUCCESS;
×
838
}
839

840
int32_t streamStateSetRecFlag(SStreamState* pState, const void* pKey, int32_t keyLen, int32_t mode) {
×
841
  return setStateRecFlag(pState->pFileState, pKey, keyLen, mode);
×
842
}
843

844
int32_t streamStateGetRecFlag(SStreamState* pState, const void* pKey, int32_t keyLen, int32_t* pMode) {
×
845
  return getStateRecFlag(pState->pFileState, pKey, keyLen, pMode);
×
846
}
847

848
void streamStateClearExpiredSessionState(SStreamState* pState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup) {
×
849
  if (numOfKeep == 0) {
×
850
    void* pBuff = getRowStateBuff(pState->pFileState);
×
851
    tSimpleHashSetFreeFp(pBuff, freeArrayPtr);
×
852
    streamFileStateClear(pState->pFileState);
×
853
    return;
×
854
  }
855
  clearExpiredSessionState(pState->pFileState, numOfKeep, minTs, pFlushGroup);
×
856
}
857

858
bool streamStateCheckSessionState(SStreamState* pState, SSessionKey* pKey, TSKEY gap, bool* pIsLast) {
×
859
  return hasSessionState(pState->pFileState, pKey, gap, pIsLast);
×
860
}
861

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc