• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4143

24 May 2025 03:30AM UTC coverage: 32.868% (-29.4%) from 62.238%
#4143

push

travis-ci

web-flow
test: migrate stream cases (#31164)

76401 of 312956 branches covered (24.41%)

Branch coverage included in aggregate %.

128686 of 311012 relevant lines covered (41.38%)

579734.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/include/libs/executor/storageapi.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifndef TDENGINE_STORAGEAPI_H
17
#define TDENGINE_STORAGEAPI_H
18

19
#include "function.h"
20
#include "index.h"
21
#include "taosdef.h"
22
#include "tcommon.h"
23
#include "tmsg.h"
24
#include "tscalablebf.h"
25
#include "tsimplehash.h"
26

27
#ifdef __cplusplus
28
extern "C" {
29
#endif
30

31
#define TIMEWINDOW_RANGE_CONTAINED 1
32
#define TIMEWINDOW_RANGE_EXTERNAL  2
33

34
#define CACHESCAN_RETRIEVE_TYPE_ALL    0x1
35
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
36
#define CACHESCAN_RETRIEVE_LAST_ROW    0x4
37
#define CACHESCAN_RETRIEVE_LAST        0x8
38

39
#define META_READER_LOCK   0x0
40
#define META_READER_NOLOCK 0x1
41

42
#define STREAM_STATE_BUFF_HASH        1
43
#define STREAM_STATE_BUFF_SORT        2
44
#define STREAM_STATE_BUFF_HASH_SORT   3
45
#define STREAM_STATE_BUFF_HASH_SEARCH 4
46

47
typedef struct SMeta SMeta;
48
typedef TSKEY (*GetTsFun)(void*);
49

50
typedef struct SMetaEntry {
51
  int64_t  version;
52
  int8_t   type;
53
  int8_t   flags;  // TODO: need refactor?
54
  tb_uid_t uid;
55
  char*    name;
56
  union {
57
    struct {
58
      SSchemaWrapper schemaRow;
59
      SSchemaWrapper schemaTag;
60
      SRSmaParam     rsmaParam;
61
      int64_t        keep;
62
    } stbEntry;
63
    struct {
64
      int64_t  btime;
65
      int32_t  ttlDays;
66
      int32_t  commentLen;  // not include '\0'
67
      char*    comment;
68
      tb_uid_t suid;
69
      uint8_t* pTags;
70
    } ctbEntry;
71
    struct {
72
      int64_t        btime;
73
      int32_t        ttlDays;
74
      int32_t        commentLen;
75
      char*          comment;
76
      int32_t        ncid;  // next column id
77
      SSchemaWrapper schemaRow;
78
    } ntbEntry;
79
    struct {
80
      STSma* tsma;
81
    } smaEntry;
82
  };
83

84
  uint8_t* pBuf;
85

86
  SColCmprWrapper colCmpr;  // col compress alg
87
  SExtSchema*     pExtSchemas;
88
  SColRefWrapper  colRef;   // col reference for virtual table
89
} SMetaEntry;
90

91
typedef struct SMetaReader {
92
  int32_t            flags;
93
  void*              pMeta;
94
  SDecoder           coder;
95
  SMetaEntry         me;
96
  void*              pBuf;
97
  int32_t            szBuf;
98
  struct SStoreMeta* pAPI;
99
} SMetaReader;
100

101
typedef struct SMTbCursor {
102
  void*       pMeta;
103
  void*       pDbc;
104
  void*       pKey;
105
  void*       pVal;
106
  int32_t     kLen;
107
  int32_t     vLen;
108
  SMetaReader mr;
109
  int8_t      paused;
110
} SMTbCursor;
111

112
typedef struct SMCtbCursor {
113
  struct SMeta* pMeta;
114
  void*         pCur;
115
  tb_uid_t      suid;
116
  void*         pKey;
117
  void*         pVal;
118
  int           kLen;
119
  int           vLen;
120
  int8_t        paused;
121
  int           lock;
122
} SMCtbCursor;
123

124
typedef struct SRowBuffPos {
125
  void* pRowBuff;
126
  void* pKey;
127
  bool  beFlushed;
128
  bool  beUsed;
129
  bool  needFree;
130
  bool  beUpdated;
131
  bool  invalid;
132
} SRowBuffPos;
133

134
// tq
135
typedef struct SMetaTableInfo {
136
  int64_t         suid;
137
  int64_t         uid;
138
  SSchemaWrapper* schema;
139
  char            tbName[TSDB_TABLE_NAME_LEN];
140
} SMetaTableInfo;
141

142
static FORCE_INLINE void destroyMetaTableInfo(SMetaTableInfo* mtInfo){
143
  if (mtInfo == NULL) return;
×
144
  tDeleteSchemaWrapper(mtInfo->schema);
×
145
}
146

147
typedef struct SSnapContext {
148
  struct SMeta* pMeta;
149
  int64_t       snapVersion;
150
  void*         pCur;
151
  int64_t       suid;
152
  int8_t        subType;
153
  SHashObj*     idVersion;
154
  SHashObj*     suidInfo;
155
  SArray*       idList;
156
  int32_t       index;
157
  int8_t        withMeta;
158
  int8_t        queryMeta;  // true-get meta, false-get data
159
  bool          hasPrimaryKey;
160
} SSnapContext;
161

162
typedef struct {
163
  int64_t uid;
164
  int64_t ctbNum;
165
  int32_t colNum;
166
  int8_t  flags;
167
  int64_t keep;
168
} SMetaStbStats;
169

170
// clang-format off
171
/*-------------------------------------------------new api format---------------------------------------------------*/
172
typedef enum {
173
  TSD_READER_NOTIFY_DURATION_START,
174
  TSD_READER_NOTIFY_NEXT_DURATION_BLOCK,
175
} ETsdReaderNotifyType;
176

177
typedef union {
178
  struct {
179
    int32_t filesetId;
180
  } duration;
181
} STsdReaderNotifyInfo;
182

183
typedef void (*TsdReaderNotifyCbFn)(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param);
184

185
struct SFileSetReader;
186

187
typedef struct TsdReader {
188
  int32_t      (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
189
                           SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
190
  void         (*tsdReaderClose)();
191
  int32_t      (*tsdSetReaderTaskId)(void *pReader, const char *pId);
192
  int32_t      (*tsdSetQueryTableList)();
193
  int32_t      (*tsdNextDataBlock)();
194

195
  int32_t      (*tsdReaderRetrieveBlockSMAInfo)();
196
  int32_t      (*tsdReaderRetrieveDataBlock)();
197

198
  void         (*tsdReaderReleaseDataBlock)();
199

200
  int32_t      (*tsdReaderResetStatus)();
201
  int32_t      (*tsdReaderGetDataBlockDistInfo)();
202
  int64_t      (*tsdReaderGetNumOfInMemRows)();
203
  void         (*tsdReaderNotifyClosing)();
204

205
  void         (*tsdSetFilesetDelimited)(void* pReader);
206
  void         (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param);
207

208
  // for fileset query
209
  int32_t (*fileSetReaderOpen)(void *pVnode, struct SFileSetReader **ppReader);
210
  int32_t (*fileSetReadNext)(struct SFileSetReader *);
211
  int32_t (*fileSetGetEntryField)(struct SFileSetReader *, const char *, void *);
212
  void (*fileSetReaderClose)(struct SFileSetReader **);
213

214
  int32_t (*getProgress)(const void* pReader, void** pBuf, uint64_t* pLen);
215
  int32_t (*setProgress)(void *pReader, const void *pBuf, uint64_t len);
216
} TsdReader;
217

218
typedef struct SStoreCacheReader {
219
  int32_t  (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
220
                         SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
221
                         SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks);
222
  void     (*closeReader)(void *pReader);
223
  int32_t  (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
224
                           SArray *pTableUidList, bool* pGotAllRows);
225
  int32_t  (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
226
} SStoreCacheReader;
227

228
// clang-format on
229

230
/*------------------------------------------------------------------------------------------------------------------*/
231
// todo rename
232
typedef struct SStoreTqReader {
233
  struct STqReader* (*tqReaderOpen)();
234
  void (*tqReaderClose)();
235

236
  int32_t (*tqReaderSeek)();
237
  int32_t (*tqRetrieveBlock)();
238
  bool (*tqReaderNextBlockInWal)();
239
  bool (*tqNextBlockImpl)();  // todo remove it
240
  SSDataBlock* (*tqGetResultBlock)();
241
  int64_t (*tqGetResultBlockTime)();
242
  int32_t (*tqGetStreamExecProgress)();
243

244
  int32_t (*tqReaderSetColIdList)();
245
  int32_t (*tqReaderSetQueryTableList)();
246

247
  void (*tqReaderAddTables)();
248
  void (*tqReaderRemoveTables)();
249

250
  void (*tqSetTablePrimaryKey)();
251
  bool (*tqGetTablePrimaryKey)();
252
  bool (*tqReaderIsQueriedTable)();
253
  bool (*tqReaderCurrentBlockConsumed)();
254

255
  struct SWalReader* (*tqReaderGetWalReader)();  // todo remove it
256
                                                 //  int32_t (*tqReaderRetrieveTaosXBlock)();       // todo remove it
257

258
  int32_t (*tqReaderSetSubmitMsg)();  // todo remove it
259
  //  bool (*tqReaderNextBlockFilterOut)();
260

261
  int32_t (*tqReaderSetVtableInfo)();
262
} SStoreTqReader;
263

264
typedef struct SStoreSnapshotFn {
265
  bool (*taosXGetTablePrimaryKey)(SSnapContext* ctx);
266
  void (*taosXSetTablePrimaryKey)(SSnapContext* ctx, int64_t uid);
267
  int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid);
268
  void (*destroySnapshot)(SSnapContext* ctx);
269
  int32_t (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx, SMetaTableInfo* info);
270
  int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
271
} SStoreSnapshotFn;
272

273
typedef struct SStoreMeta {
274
  SMTbCursor* (*openTableMetaCursor)(void* pVnode);                                 // metaOpenTbCursor
275
  void (*closeTableMetaCursor)(SMTbCursor* pTbCur);                                 // metaCloseTbCursor
276
  void (*pauseTableMetaCursor)(SMTbCursor* pTbCur);                                 // metaPauseTbCursor
277
  int32_t (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first, int8_t move);  // metaResumeTbCursor
278
  int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType);              // metaTbCursorNext
279
  int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType);              // metaTbCursorPrev
280

281
  int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
282
  int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
283
  const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal);  // todo remove it
284

285
  int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid);
286
  int32_t (*getTableTypeSuidByName)(void* pVnode, char* tbName, ETableType* tbType, uint64_t* suid);
287
  int32_t (*getTableNameByUid)(void* pVnode, uint64_t uid, char* tbName);
288
  bool (*isTableExisted)(void* pVnode, tb_uid_t uid);
289

290
  int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
291
  int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
292
                                   int32_t payloadLen);
293

294
  int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
295
                                bool* acquireRes);
296
  int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
297
                                int32_t payloadLen, double selectivityRatio);
298

299
  int32_t (*metaGetCachedRefDbs)(void* pVnode, tb_uid_t suid, SArray* pList);
300
  int32_t (*metaPutRefDbsToCache)(void* pVnode, tb_uid_t suid, SArray* pList);
301

302
  void* (*storeGetIndexInfo)(void* pVnode);
303
  void* (*getInvertIndex)(void* pVnode);
304
  // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
305
  int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list);
306
  int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);
307
  int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
308
  int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols, int8_t* flags);
309
  void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
310
                       int64_t* numOfNormalTables);
311
  int32_t (*getDBSize)(void* pVnode, SDbSizeStatisInfo* pInfo);
312

313
  SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
314
  int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
315
  void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
316
  void (*closeCtbCursor)(SMCtbCursor* pCtbCur);
317
  tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
318
} SStoreMeta;
319

320
typedef struct SStoreMetaReader {
321
  void (*initReader)(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI);
322
  void (*clearReader)(SMetaReader* pReader);
323
  void (*readerReleaseLock)(SMetaReader* pReader);
324
  int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid);
325
  int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name);
326
  int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid);
327
} SStoreMetaReader;
328

329
typedef struct SUpdateInfo {
330
  SArray*      pTsBuckets;
331
  uint64_t     numBuckets;
332
  SArray*      pTsSBFs;
333
  uint64_t     numSBFs;
334
  int64_t      interval;
335
  int64_t      watermark;
336
  TSKEY        minTS;
337
  SScalableBf* pCloseWinSBF;
338
  SHashObj*    pMap;
339
  int64_t      maxDataVersion;
340
  int8_t       pkColType;
341
  int32_t      pkColLen;
342
  char*        pKeyBuff;
343
  char*        pValueBuff;
344

345
  int (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn);
346
  __compar_fn_t comparePkCol;
347
} SUpdateInfo;
348

349
typedef struct SRecDataInfo {
350
  STimeWindow calWin;
351
  uint64_t    tableUid;
352
  int64_t     dataVersion;
353
  EStreamType mode;
354
  char        pPkColData[];
355
} SRecDataInfo;
356

357
typedef struct SScanRange {
358
  STimeWindow win;
359
  STimeWindow calWin;
360
  SSHashObj*  pGroupIds;
361
  SSHashObj*  pUIds;
362
} SScanRange;
363

364
typedef struct SResultWindowInfo {
365
  SRowBuffPos* pStatePos;
366
  SSessionKey  sessionWin;
367
  bool         isOutput;
368
} SResultWindowInfo;
369

370
typedef struct {
371
  void*   iter;      //  rocksdb_iterator_t*    iter;
372
  void*   snapshot;  //  rocksdb_snapshot_t*    snapshot;
373
  void*   readOpt;   //  rocksdb_readoptions_t* readOpt;
374
  void*   db;        //  rocksdb_t*             db;
375
  void*   pCur;
376
  int64_t number;
377
  void*   pStreamFileState;
378
  int32_t buffIndex;
379
  int32_t hashIter;
380
  void*   pHashData;
381
  int64_t minGpId;
382
} SStreamStateCur;
383

384
typedef struct STableTsDataState {
385
  SSHashObj*       pTableTsDataMap;
386
  __compar_fn_t    comparePkColFn;
387
  void*            pPkValBuff;
388
  int32_t          pkValLen;
389
  SStreamState*    pState;
390
  int32_t          curRecId;
391
  void*            pStreamTaskState;
392
  SArray*          pScanRanges;
393
  SRecDataInfo*    pRecValueBuff;
394
  int32_t          recValueLen;
395
  SStreamStateCur* pRecCur;
396
  int32_t          cfgIndex;
397
  void*            pBatch;
398
  int32_t          batchBufflen;
399
  void*            pBatchBuff;
400
} STableTsDataState;
401

402
typedef struct SStateStore {
403
  int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname);
404
  int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache,
405
                                   int32_t* pWinCode);
406
  int32_t (*streamStateDeleteParName)(SStreamState* pState, int64_t groupId);
407
  void (*streamStateSetParNameInvalid)(SStreamState* pState);
408

409
  int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
410
                                      int32_t* pWinCode);
411
  void (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used);
412
  void (*streamStateClearBuff)(SStreamState* pState, void* pVal);
413
  void (*streamStateFreeVal)(void* val);
414
  int32_t (*streamStateGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
415
                                int32_t* pVLen, int32_t* pWinCode);
416
  int32_t (*streamStateGetAllPrev)(SStreamState* pState, const SWinKey* pKey, SArray* pResArray, int32_t maxNum);
417

418
  int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
419
  int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
420
  bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key, bool hasLimit, bool* pIsLast);
421
  bool (*streamStateCheckSessionState)(SStreamState* pState, SSessionKey* pKey, TSKEY gap, bool* pIsLast);
422
  int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
423
  void (*streamStateDel)(SStreamState* pState, const SWinKey* key);
424
  void (*streamStateDelByGroupId)(SStreamState* pState, uint64_t groupId);
425
  void (*streamStateClear)(SStreamState* pState);
426
  void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
427
  void (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
428
  int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
429
  int32_t (*streamStateGetNumber)(SStreamState* pState);
430
  int32_t (*streamStateDeleteInfo)(SStreamState* pState, void* pKey, int32_t keyLen);
431

432
  int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
433
  int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
434
                                int32_t* pWinCode);
435
  int32_t (*streamStateFillAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen,
436
                                          int32_t* pWinCode);
437
  void (*streamStateFillDel)(SStreamState* pState, const SWinKey* key);
438
  int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
439
                                    int32_t* pVLen, int32_t* pWinCode);
440
  int32_t (*streamStateFillGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
441
                                    int32_t* pVLen, int32_t* pWinCode);
442

443
  void (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
444
  void (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);
445

446
  SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
447
  SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
448
  SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
449
  SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
450
  void (*streamStateFreeCur)(SStreamStateCur* pCur);
451

452
  int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
453
  int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
454

455
  void (*streamStateClearExpiredState)(SStreamState* pState, int32_t numOfKeep, TSKEY minTs);
456
  void (*streamStateClearExpiredSessionState)(SStreamState* pState, int32_t numOfKeep, TSKEY minTs, SSHashObj* pFlushGroup);
457
  int32_t (*streamStateSetRecFlag)(SStreamState* pState, const void* pKey, int32_t keyLen, int32_t mode);
458
  int32_t (*streamStateGetRecFlag)(SStreamState* pState, const void* pKey, int32_t keyLen, int32_t* pMode);
459

460
  int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
461
                                             int32_t* pVLen, int32_t* pWinCode);
462
  int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);
463
  int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen,
464
                                   int32_t* pWinCode);
465
  void (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
466
  void (*streamStateSessionReset)(SStreamState* pState, void* pVal);
467
  void (*streamStateSessionClear)(SStreamState* pState);
468
  int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
469
  int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
470
                                           state_key_cmpr_fn fn, void** pVal, int32_t* pVLen, int32_t* pWinCode);
471
  int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
472
  int32_t (*streamStateCountGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
473
  int32_t (*streamStateSessionAllocWinBuffByNextPosition)(SStreamState* pState, SStreamStateCur* pCur,
474
                                                          const SSessionKey* pKey, void** pVal, int32_t* pVLen);
475
  int32_t (*streamStateSessionSaveToDisk)(STableTsDataState* pTblState, SSessionKey* pKey, SRecDataInfo* pVal, int32_t vLen);
476
  int32_t (*streamStateFlushReaminInfoToDisk)(STableTsDataState* pTblState);
477
  int32_t (*streamStateSessionDeleteAll)(SStreamState* pState);
478

479
  int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
480
                                              void** ppVal, int32_t* pVLen, int32_t* pWinCode);
481
  int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
482
                                    int32_t* pVLen);
483

484
  int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
485
                            int32_t pkLen, SUpdateInfo** ppInfo);
486
  int32_t (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol,
487
                                     int32_t primaryKeyCol, TSKEY* pMaxResTs);
488
  bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
489
  bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
490
  bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
491

492
  void (*updateInfoDestroy)(SUpdateInfo* pInfo);
493
  void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
494
  int32_t (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
495

496
  int32_t (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
497
                             SUpdateInfo** ppInfo);
498
  void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
499
  void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
500
  int32_t (*updateInfoSerialize)(SEncoder* pEncoder, const SUpdateInfo* pInfo);
501
  int32_t (*updateInfoDeserialize)(SDecoder* pDeCoder, SUpdateInfo* pInfo);
502

503
  SStreamStateCur* (*streamStateSessionSeekKeyPrev)(SStreamState* pState, const SSessionKey* key);
504
  SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
505
  SStreamStateCur* (*streamStateCountSeekKeyPrev)(SStreamState* pState, const SSessionKey* pKey, COUNT_TYPE count);
506
  SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
507
  SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
508

509
  int32_t (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
510
                                 GetTsFun fp, void* pFile, TSKEY delMark, const char* id, int64_t ckId, int8_t type,
511
                                 struct SStreamFileState** ppFileState);
512

513
  int32_t (*streamStateGroupPut)(SStreamState* pState, int64_t groupId, void* value, int32_t vLen);
514
  SStreamStateCur* (*streamStateGroupGetCur)(SStreamState* pState);
515
  void (*streamStateGroupCurNext)(SStreamStateCur* pCur);
516
  int32_t (*streamStateGroupGetKVByCur)(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
517

518
  void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
519
  void (*streamFileStateClear)(struct SStreamFileState* pFileState);
520
  bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
521

522
  SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId);
523
  SStreamState* (*streamStateRecalatedOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId);
524
  void (*streamStateClose)(SStreamState* pState, bool remove);
525
  int32_t (*streamStateBegin)(SStreamState* pState);
526
  void (*streamStateCommit)(SStreamState* pState);
527
  void (*streamStateDestroy)(SStreamState* pState, bool remove);
528
  void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
529
  void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
530

531
  int32_t (*streamStateGetAndSetTsData)(STableTsDataState* pState, uint64_t tableUid, TSKEY* pCurTs, void** ppCurPkVal,
532
                                        TSKEY lastTs, void* pLastPkVal, int32_t lastPkLen, int32_t* pWinCode);
533
  int32_t (*streamStateTsDataCommit)(STableTsDataState* pState);
534
  int32_t (*streamStateInitTsDataState)(STableTsDataState** ppTsDataState, int8_t pkType, int32_t pkLen, void* pState, void* pOtherState);
535
  void (*streamStateDestroyTsDataState)(STableTsDataState* pTsDataState);
536
  int32_t (*streamStateRecoverTsData)(STableTsDataState* pTsDataState);
537
  int32_t (*streamStateReloadTsDataState)(STableTsDataState* pTsDataState);
538
  int32_t (*streamStateMergeAndSaveScanRange)(STableTsDataState* pTsDataState, STimeWindow* pWin, uint64_t gpId,
539
                                              SRecDataInfo* pRecData, int32_t len);
540
  int32_t (*streamStateMergeAllScanRange)(STableTsDataState* pTsDataState);
541
  int32_t (*streamStatePopScanRange)(STableTsDataState* pTsDataState, SScanRange* pRange);
542

543
  SStreamStateCur* (*streamStateGetLastStateCur)(SStreamState* pState);
544
  void (*streamStateLastStateCurNext)(SStreamStateCur* pCur);
545
  int32_t (*streamStateNLastStateGetKVByCur)(SStreamStateCur* pCur, int32_t num, SArray* pRes);
546
  SStreamStateCur* (*streamStateGetLastSessionStateCur)(SStreamState* pState);
547
  void (*streamStateLastSessionStateCurNext)(SStreamStateCur* pCur);
548
  int32_t (*streamStateNLastSessionStateGetKVByCur)(SStreamStateCur* pCur, int32_t num, SArray* pRes);
549
} SStateStore;
550

551
typedef struct SStorageAPI {
552
  SStoreMeta          metaFn;  // todo: refactor
553
  TsdReader           tsdReader;
554
  SStoreMetaReader    metaReaderFn;
555
  SStoreCacheReader   cacheFn;
556
  SStoreSnapshotFn    snapshotFn;
557
  SStoreTqReader      tqReaderFn;
558
  SStateStore         stateStore;
559
  SMetaDataFilterAPI  metaFilter;
560
  SFunctionStateStore functionStore;
561
} SStorageAPI;
562

563
#ifdef __cplusplus
564
}
565
#endif
566

567
#endif  // TDENGINE_STORAGEAPI_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc