• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4932

19 Jan 2026 12:29PM UTC coverage: 66.646% (-0.1%) from 66.749%
#4932

push

travis-ci

web-flow
chore: upgrade taospy (#34272)

202981 of 304565 relevant lines covered (66.65%)

126831443.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/source/libs/executor/inc/executorInt.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifndef TDENGINE_EXECUTORINT_H
16
#define TDENGINE_EXECUTORINT_H
17

18
#ifdef __cplusplus
19
extern "C" {
20
#endif
21

22
#include "os.h"
23
#include "tcommon.h"
24
#include "theap.h"
25
#include "tlosertree.h"
26
#include "tsort.h"
27
#include "tvariant.h"
28

29
#include "dataSinkMgt.h"
30
#include "executil.h"
31
#include "executor.h"
32
#include "planner.h"
33
#include "scalar.h"
34
#include "taosdef.h"
35
#include "tarray.h"
36
#include "tfill.h"
37
#include "thash.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tpagedbuf.h"
41
#include "tlrucache.h"
42
#include "tworker.h"
43

44
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
45

46
typedef struct STsdbReader STsdbReader;
47
typedef struct STqReader   STqReader;
48

49
typedef enum EExtWinMode {
50
  EEXT_MODE_SCALAR = 1,
51
  EEXT_MODE_AGG,
52
  EEXT_MODE_INDEFR_FUNC,
53
} EExtWinMode;
54

55
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
56
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
57
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
58
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
59

60
#define IS_STREAM_MODE(_task) ((_task)->execModel == OPTR_EXEC_MODEL_STREAM)
61

62
/**
63
 * If the number of generated results is greater than this value,
64
 * query query will be halt and return results to client immediate.
65
 */
66
typedef struct SResultInfo {  // TODO refactor
67
  int64_t totalRows;          // total generated result size in rows
68
  int64_t totalBytes;         // total results in bytes.
69
  int32_t capacity;           // capacity of current result output buffer
70
  int32_t threshold;          // result size threshold in rows.
71
} SResultInfo;
72

73
typedef struct STableQueryInfo {
74
  TSKEY              lastKey;  // last check ts, todo remove it later
75
  SResultRowPosition pos;      // current active time window
76
} STableQueryInfo;
77

78
typedef struct SLimit {
79
  int64_t limit;       // default -1, no limit
80
  int64_t offset;
81
} SLimit;
82

83
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
84

85
enum {
86
  STREAM_RECOVER_STEP__NONE = 0,
87
  STREAM_RECOVER_STEP__PREPARE1,
88
  STREAM_RECOVER_STEP__PREPARE2,
89
  STREAM_RECOVER_STEP__SCAN1,
90
};
91

92
extern int32_t exchangeObjRefPool;
93

94
typedef struct {
95
  char*   pData;
96
  bool    isNull;
97
  int16_t type;
98
  int32_t bytes;
99
} SGroupKeys, SStateKeys;
100

101
typedef struct {
102
  char*           tablename;
103
  char*           dbname;
104
  int32_t         tversion;
105
  int32_t         rversion;
106
  SSchemaWrapper* sw;
107
  SSchemaWrapper* qsw;
108
} SSchemaInfo;
109

110
typedef struct SExchangeOpStopInfo {
111
  int32_t operatorType;
112
  int64_t refId;
113
} SExchangeOpStopInfo;
114

115
typedef struct SGcOperatorParam {
116
  int64_t sessionId;
117
  int32_t downstreamIdx;
118
  int32_t vgId;
119
  int64_t tbUid;
120
  bool    needCache;
121
} SGcOperatorParam;
122

123
typedef struct SGcNotifyOperatorParam {
124
  int32_t downstreamIdx;
125
  int32_t vgId;
126
  int64_t tbUid;
127
} SGcNotifyOperatorParam;
128

129
struct SExprSupp {
130
  SExprInfo*      pExprInfo;
131
  int32_t         numOfExprs;  // the number of scalar expression in group operator
132
  SqlFunctionCtx* pCtx;
133
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
134
  SFilterInfo*    pFilterInfo;
135
  bool            hasWindowOrGroup;    // denote that the function is used with time window or group
136
  bool            hasWindow;           // denote that the function is used with time window
137
  bool            hasIndefRowsFunc;
138
};
139

140
typedef enum {
141
  EX_SOURCE_DATA_NOT_READY = 0x1,
142
  EX_SOURCE_DATA_STARTED,
143
  EX_SOURCE_DATA_READY,
144
  EX_SOURCE_DATA_EXHAUSTED,
145
} EX_SOURCE_STATUS;
146

147
#define COL_MATCH_FROM_COL_ID  0x1
148
#define COL_MATCH_FROM_SLOT_ID 0x2
149

150
typedef struct SLoadRemoteDataInfo {
151
  uint64_t totalSize;     // total load bytes from remote
152
  uint64_t totalRows;     // total number of rows
153
  uint64_t totalElapsed;  // total elapsed time
154
} SLoadRemoteDataInfo;
155

156
typedef struct SLimitInfo {
157
  SLimit   limit;
158
  SLimit   slimit;
159
  uint64_t currentGroupId;
160
  int64_t  remainGroupOffset;
161
  int64_t  numOfOutputGroups;
162
  int64_t  remainOffset;
163
  int64_t  numOfOutputRows;
164
} SLimitInfo;
165

166
typedef struct SSortMergeJoinOperatorParam {
167
  bool initDownstream;
168
} SSortMergeJoinOperatorParam;
169

170
typedef enum EExchangeSourceType {
171
  EX_SRC_TYPE_STB_JOIN_SCAN = 1,
172
  EX_SRC_TYPE_VSTB_SCAN,
173
  EX_SRC_TYPE_VSTB_WIN_SCAN,
174
  EX_SRC_TYPE_VSTB_AGG_SCAN,
175
  EX_SRC_TYPE_VSTB_TAG_SCAN,
176
  EX_SRC_TYPE_VTB_WIN_SCAN,
177
} EExchangeSourceType;
178

179
typedef enum {
180
  DYN_TYPE_EXCHANGE_PARAM = 1,
181
  NOTIFY_TYPE_EXCHANGE_PARAM,
182
} EExchangeGetParamType;
183

184
typedef struct SExchangeOperatorBasicParam {
185
  EExchangeGetParamType paramType;
186
  /* dynamic scan params */
187
  int32_t               vgId;
188
  int32_t               srcOpType;
189
  bool                  tableSeq;
190
  SArray*               uidList;
191
  EExchangeSourceType   type;
192
  bool                  isNewDeployed; // used with newDeployedSrc
193
  bool                  isNewParam;
194
  uint64_t              groupid;
195
  SOrgTbInfo*           orgTbInfo;
196
  SArray*               batchOrgTbInfo; // SArray<SOrgTbInfo>
197
  SArray*               tagList;
198
  STimeWindow           window;
199
  SDownstreamSourceNode newDeployedSrc; // used with isNewDeployed
200
  /* notify scan params */
201
  TSKEY notifyTs;
202
} SExchangeOperatorBasicParam;
203

204
typedef struct SExchangeOperatorBatchParam {
205
  bool       multiParams;
206
  SSHashObj* pBatchs;  // SExchangeOperatorBasicParam
207
} SExchangeOperatorBatchParam;
208

209
typedef struct SExchangeOperatorParam {
210
  bool                        multiParams;
211
  SExchangeOperatorBasicParam basic;
212
} SExchangeOperatorParam;
213

214
typedef struct SExchangeSrcIndex {
215
  int32_t srcIdx;
216
  int32_t inUseIdx;
217
} SExchangeSrcIndex;
218

219
typedef struct SExchangeInfo {
220
  int64_t    seqId;
221
  SArray*    pSources;
222
  SSHashObj* pHashSources;
223
  SArray*    pSourceDataInfo;
224
  tsem_t     ready;
225
  void*      pTransporter;
226

227
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
228
  // passed by downstream operator
229
  SArray*      pResultBlockList;
230
  SArray*      pRecycledBlocks;  // build a pool for small data block to avoid to repeatly create and then destroy.
231
  SSDataBlock* pDummyBlock;      // dummy block, not keep data
232
  bool         seqLoadData;      // sequential load data or not, false by default
233
  bool         dynamicOp;
234
  bool         dynTbname;         // %%tbname for stream    
235
  int32_t      current;
236
  SLoadRemoteDataInfo loadInfo;
237
  int64_t             self;
238
  SLimitInfo          limitInfo;
239
  int64_t             openedTs;  // start exec time stamp, todo: move to SLoadRemoteDataInfo
240
  char*               pTaskId;
241
  SArray*             pFetchRpcHandles;
242
  bool                notifyToSend;  // need to send notify STEP DONE message
243
  TSKEY               notifyTs;      // notify timestamp
244
} SExchangeInfo;
245

246
typedef struct SScanInfo {
247
  int32_t numOfAsc;
248
  int32_t numOfDesc;
249
} SScanInfo;
250

251
typedef struct SSampleExecInfo {
252
  double   sampleRatio;  // data block sample ratio, 1 by default
253
  uint32_t seed;         // random seed value
254
} SSampleExecInfo;
255

256
enum {
257
  TABLE_SCAN__TABLE_ORDER = 1,
258
  TABLE_SCAN__BLOCK_ORDER = 2,
259
};
260

261
typedef enum ETableCountState {
262
  TABLE_COUNT_STATE_NONE = 0,       // before start scan
263
  TABLE_COUNT_STATE_SCAN = 1,       // cur group scanning
264
  TABLE_COUNT_STATE_PROCESSED = 2,  // cur group processed
265
  TABLE_COUNT_STATE_END = 3,        // finish or noneed to process
266
} ETableCountState;
267

268
struct SAggSupporter {
269
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
270
  char*          keyBuf;               // window key buffer
271
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
272
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
273
  int32_t        currentPageId;  // current write page id
274
};
275

276
typedef struct {
277
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
278
  // current data block needs to be loaded.
279
  SInterval      interval;
280
  SAggSupporter* pAggSup;
281
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
282
} SAggOptrPushDownInfo;
283

284
typedef struct STableMetaCacheInfo {
285
  SLRUCache* pTableMetaEntryCache;  // 100 by default
286
  uint64_t   metaFetch;
287
  uint64_t   cacheHit;
288
} STableMetaCacheInfo;
289

290
typedef struct STableScanBase {
291
  STsdbReader*           dataReader;
292
  SFileBlockLoadRecorder readRecorder;
293
  SQueryTableDataCond    cond;
294
  SQueryTableDataCond    orgCond; // use for virtual super table scan
295
  SAggOptrPushDownInfo   pdInfo;
296
  SColMatchInfo          matchInfo;
297
  SReadHandle            readHandle;
298
  SExprSupp              pseudoSup;
299
  STableMetaCacheInfo    metaCache;
300
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
301
  int32_t                dataBlockLoadFlag;
302
  SLimitInfo             limitInfo;
303
  // there are more than one table list exists in one task, if only one vnode exists.
304
  STableListInfo* pTableListInfo;
305
  TsdReader       readerAPI;
306
} STableScanBase;
307

308
typedef struct STableScanInfo {
309
  STableScanBase  base;
310
  SScanInfo       scanInfo;
311
  int32_t         scanTimes;
312
  SSDataBlock*    pResBlock;
313
  SHashObj*       pIgnoreTables;
314
  SSampleExecInfo sample;           // sample execution info
315
  int32_t         tableStartIndex;  // current group scan start
316
  int32_t         tableEndIndex;    // current group scan end
317
  int32_t         currentGroupId;
318
  int32_t         currentTable;
319
  int8_t          scanMode;
320
  int8_t          assignBlockUid;
321
  uint8_t         countState;  // empty table count state
322
  bool            hasGroupByTag;
323
  bool            filesetDelimited;
324
  bool            needCountEmptyTable;
325
  // for virtual super table scan
326
  SSDataBlock*    pOrgBlock;
327
  bool            ignoreTag;
328
  bool            virtualStableScan;
329
  SHashObj*       readerCache;
330
  bool            newReader;
331
  SArray*         pBlockColMap;
332
  // for virtual super table batch scan
333
  int32_t         lastBatchIdx;
334
  int32_t         currentBatchIdx;
335
  STimeWindow     lastTimeWindow;
336
  SArray*         lastColArray;
337
  SArray*         lastBlockColArray;
338
  SArray*         pBatchColMap;  // SArray<SOrgTbInfo>
339
  STimeWindow     cachedTimeWindow;
340
  SArray*         cachedTagList;
341
  uint64_t        cachedGroupId;
342
} STableScanInfo;
343

344
typedef enum ESubTableInputType {
345
  SUB_TABLE_MEM_BLOCK,
346
  SUB_TABLE_EXT_PAGES,
347
} ESubTableInputType;
348

349
typedef struct STmsSubTableInput {
350
  STsdbReader*        pReader;
351
  SQueryTableDataCond tblCond;
352
  STableKeyInfo*      pKeyInfo;
353
  bool                bInMemReader;
354
  ESubTableInputType  type;
355
  SSDataBlock*        pReaderBlock;
356

357
  SArray*      aBlockPages;
358
  SSDataBlock* pPageBlock;
359
  int32_t      pageIdx;
360

361
  int32_t      rowIdx;
362
  int64_t*     aTs;
363
  SSDataBlock* pInputBlock;
364
} STmsSubTableInput;
365

366
typedef struct SBlockOrderInfo SBlockOrderInfo;
367
typedef struct STmsSubTablesMergeInfo {
368
  SBlockOrderInfo* pTsOrderInfo;
369
  SBlockOrderInfo* pPkOrderInfo;
370

371
  int32_t                 numSubTables;
372
  STmsSubTableInput*      aInputs;
373
  SMultiwayMergeTreeInfo* pTree;
374
  int32_t                 numSubTablesCompleted;
375

376
  int32_t        numTableBlocksInMem;
377
  SDiskbasedBuf* pBlocksBuf;
378

379
  int32_t numInMemReaders;
380
} STmsSubTablesMergeInfo;
381

382
typedef struct STableMergeScanInfo {
383
  int32_t         tableStartIndex;
384
  int32_t         tableEndIndex;
385
  bool            hasGroupId;
386
  uint64_t        groupId;
387
  STableScanBase  base;
388
  int32_t         bufPageSize;
389
  uint32_t        sortBufSize;  // max buffer size for in-memory sort
390
  SArray*         pSortInfo;
391
  SSortHandle*    pSortHandle;
392
  SSDataBlock*    pSortInputBlock;
393
  SSDataBlock*    pReaderBlock;
394
  int64_t         startTs;  // sort start time
395
  SLimitInfo      limitInfo;
396
  int64_t         numOfRows;
397
  SScanInfo       scanInfo;
398
  int32_t         scanTimes;
399
  int32_t         readIdx;
400
  SSDataBlock*    pResBlock;
401
  SSampleExecInfo sample;         // sample execution info
402
  SSHashObj*      mTableNumRows;  // uid->num of table rows
403
  SHashObj*       mSkipTables;
404
  int64_t         mergeLimit;
405
  SSortExecInfo   sortExecInfo;
406
  bool            needCountEmptyTable;
407
  bool            bGroupProcessed;  // the group return data means processed
408
  bool            filesetDelimited;
409
  bool            bNewFilesetEvent;
410
  bool            bNextDurationBlockEvent;
411
  int32_t         numNextDurationBlocks;
412
  SSDataBlock*    nextDurationBlocks[2];
413
  bool            rtnNextDurationBlocks;
414
  int32_t         nextDurationBlocksIdx;
415
  bool            bSortRowId;
416

417
  STmsSubTablesMergeInfo* pSubTablesMergeInfo;
418
} STableMergeScanInfo;
419

420
typedef struct STagScanFilterContext {
421
  SHashObj* colHash;
422
  int32_t   index;
423
  SArray*   cInfoList;
424
  int32_t   code;
425
} STagScanFilterContext;
426

427
typedef struct STagScanInfo {
428
  SColumnInfo*          pCols;
429
  SSDataBlock*          pRes;
430
  SColMatchInfo         matchInfo;
431
  int32_t               curPos;
432
  SReadHandle           readHandle;
433
  STableListInfo*       pTableListInfo;
434
  uint64_t              suid;
435
  void*                 pCtbCursor;
436
  SNode*                pTagCond;
437
  SNode*                pTagIndexCond;
438
  STagScanFilterContext filterCtx;
439
  SArray*               aUidTags;     // SArray<STUidTagInfo>
440
  SArray*               aFilterIdxs;  // SArray<int32_t>
441
  SStorageAPI*          pStorageAPI;
442
  SLimitInfo            limitInfo;
443
} STagScanInfo;
444

445
typedef enum EStreamScanMode {
446
  STREAM_SCAN_FROM_READERHANDLE = 1,
447
  STREAM_SCAN_FROM_RES,
448
  STREAM_SCAN_FROM_UPDATERES,
449
  STREAM_SCAN_FROM_DELETE_DATA,
450
  STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
451
  STREAM_SCAN_FROM_DATAREADER_RANGE,
452
  STREAM_SCAN_FROM_CREATE_TABLERES,
453
} EStreamScanMode;
454

455
enum {
456
  PROJECT_RETRIEVE_CONTINUE = 0x1,
457
  PROJECT_RETRIEVE_DONE = 0x2,
458
};
459

460
typedef struct SStreamAggSupporter {
461
  int32_t         resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
462
  SSDataBlock*    pScanBlock;
463
  SStreamState*   pState;
464
  int64_t         gap;        // stream session window gap
465
  SqlFunctionCtx* pDummyCtx;  // for combine
466
  SSHashObj*      pResultRows;
467
  int32_t         stateKeySize;
468
  int16_t         stateKeyType;
469
  SDiskbasedBuf*  pResultBuf;
470
  SStateStore     stateStore;
471
  STimeWindow     winRange;
472
  SStorageAPI*    pSessionAPI;
473
  struct SUpdateInfo* pUpdateInfo;
474
  int32_t             windowCount;
475
  int32_t             windowSliding;
476
  SStreamStateCur*    pCur;
477
} SStreamAggSupporter;
478

479
typedef struct SWindowSupporter {
480
  SStreamAggSupporter* pStreamAggSup;
481
  int64_t              gap;
482
  uint16_t             parentType;
483
  SAggSupporter*       pIntervalAggSup;
484
} SWindowSupporter;
485

486
typedef struct SPartitionBySupporter {
487
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
488
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
489
  char*   keyBuf;         // group by keys for hash
490
  bool    needCalc;       // partition by column
491
} SPartitionBySupporter;
492

493
typedef struct SPartitionDataInfo {
494
  uint64_t groupId;
495
  char*    tbname;
496
  SArray*  rowIds;
497
} SPartitionDataInfo;
498

499
typedef struct STimeWindowAggSupp {
500
  TSKEY           maxTs;
501
  TSKEY           minTs;
502
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
503
} STimeWindowAggSupp;
504

505
typedef struct SStreamNotifyEventSupp {
506
  SHashObj*    pWindowEventHashMap;  // Hash map from gorupid+skey+eventType to the list node of window event.
507
  SHashObj*    pTableNameHashMap;    // Hash map from groupid to the dest child table name.
508
  SSDataBlock* pEventBlock;          // The datablock contains all window events and results.
509
  SArray*      pSessionKeys;
510
  const char*  windowType;
511
} SStreamNotifyEventSupp;
512

513
typedef struct SSteamOpBasicInfo {
514
  int32_t                primaryPkIndex;
515
  int16_t                operatorFlag;
516
  SStreamNotifyEventSupp notifyEventSup;
517
  bool                   recvCkBlock;
518
  SSDataBlock*           pCheckpointRes;
519
  SSHashObj*             pSeDeleted;
520
  void*                  pDelIterator;
521
  SSDataBlock*           pDelRes;
522
  SArray*                pUpdated;
523
  STableTsDataState*     pTsDataState;
524
  int32_t                numOfRecv;
525
} SSteamOpBasicInfo;
526

527
typedef struct SStreamFillSupporter {
528
  int32_t        type;  // fill type
529
  SInterval      interval;
530
  SResultRowData prev;
531
  TSKEY          prevOriginKey;
532
  SResultRowData cur;
533
  SResultRowData next;
534
  TSKEY          nextOriginKey;
535
  SResultRowData nextNext;
536
  SFillColInfo*  pAllColInfo;  // fill exprs and not fill exprs
537
  SExprSupp      notFillExprSup;
538
  int32_t        numOfAllCols;  // number of all exprs, including the tags columns
539
  int32_t        numOfFillCols;
540
  int32_t        numOfNotFillCols;
541
  int32_t        rowSize;
542
  SSHashObj*     pResMap;
543
  bool           hasDelete;
544
  SStorageAPI*   pAPI;
545
  STimeWindow    winRange;
546
  int32_t        pkColBytes;
547
  __compar_fn_t  comparePkColFn;
548
  int32_t*       pOffsetInfo;
549
  bool           normalFill;
550
  void*          pEmptyRow;
551
  SArray*        pResultRange;
552
} SStreamFillSupporter;
553

554
typedef struct SStreamScanInfo {
555
  SSteamOpBasicInfo basic;
556
  SExprInfo*        pPseudoExpr;
557
  int32_t           numOfPseudoExpr;
558
  SExprSupp         tbnameCalSup;
559
  SExprSupp*        pPartTbnameSup;
560
  SExprSupp         tagCalSup;
561
  int32_t           primaryTsIndex;  // primary time stamp slot id
562
  int32_t           primaryKeyIndex;
563
  SReadHandle       readHandle;
564
  SInterval         interval;  // if the upstream is an interval operator, the interval info is also kept here.
565
  SColMatchInfo     matchInfo;
566

567
  SArray*      pBlockLists;  // multiple SSDatablock.
568
  SSDataBlock* pRes;         // result SSDataBlock
569
  SSDataBlock* pUpdateRes;   // update SSDataBlock
570
  int32_t      updateResIndex;
571
  int32_t      blockType;        // current block type
572
  int32_t      validBlockIndex;  // Is current data has returned?
573
  uint64_t     numOfExec;        // execution times
574
  STqReader*   tqReader;
575

576
  SHashObj*       pVtableMergeHandles;  // key: vtable uid, value: SStreamVtableMergeHandle
577
  SDiskbasedBuf*  pVtableMergeBuf;      // page buffer used by vtable merge
578
  SArray*         pVtableReadyHandles;
579
  STableListInfo* pTableListInfo;
580

581
  uint64_t            groupId;
582
  bool                igCheckGroupId;
583
  struct SUpdateInfo* pUpdateInfo;
584

585
  EStreamScanMode       scanMode;
586
  struct SOperatorInfo* pStreamScanOp;
587
  struct SOperatorInfo* pTableScanOp;
588
  SArray*               childIds;
589
  SWindowSupporter      windowSup;
590
  SPartitionBySupporter partitionSup;
591
  SExprSupp*            pPartScalarSup;
592
  bool                  assignBlockUid;  // assign block uid to groupId, temporarily used for generating rollup SMA.
593
  int32_t               scanWinIndex;    // for state operator
594
  SSDataBlock*          pDeleteDataRes;  // delete data SSDataBlock
595
  int32_t               deleteDataIndex;
596
  STimeWindow           updateWin;
597
  STimeWindowAggSupp    twAggSup;
598
  SSDataBlock*          pUpdateDataRes;
599
  SStreamFillSupporter* pFillSup;
600
  // status for tmq
601
  SNodeList* pGroupTags;
602
  SNode*     pTagCond;
603
  SNode*     pTagIndexCond;
604

605
  // recover
606
  int32_t      blockRecoverTotCnt;
607
  SSDataBlock* pRecoverRes;
608

609
  SSDataBlock*      pCreateTbRes;
610
  int8_t            igCheckUpdate;
611
  int8_t            igExpired;
612
  void*             pState;  // void
613
  SStoreTqReader    readerFn;
614
  SStateStore       stateStore;
615
  SSDataBlock*      pCheckpointRes;
616
  int8_t            pkColType;
617
  int32_t           pkColLen;
618
  bool              useGetResultRange;
619
  STimeWindow       lastScanRange;
620
  SSDataBlock*      pRangeScanRes;  // update SSDataBlock
621
  bool              hasPart;
622

623
  //nonblock data scan
624
  TSKEY                  recalculateInterval;
625
  __compar_fn_t          comparePkColFn;
626
  SScanRange             curRange;
627
  struct SOperatorInfo*  pRecTableScanOp;
628
  bool                   scanAllTables;
629
  SSHashObj*             pRecRangeMap;
630
  SArray*                pRecRangeRes;
631
} SStreamScanInfo;
632

633
typedef struct {
634
  struct SVnode*       vnode;  // todo remove this
635
  SSDataBlock          pRes;   // result SSDataBlock
636
  STsdbReader*         dataReader;
637
  struct SSnapContext* sContext;
638
  SStorageAPI*         pAPI;
639
  STableListInfo*      pTableListInfo;
640
} SStreamRawScanInfo;
641

642
typedef struct STableCountScanSupp {
643
  int16_t dbNameSlotId;
644
  int16_t stbNameSlotId;
645
  int16_t tbCountSlotId;
646
  bool    groupByDbName;
647
  bool    groupByStbName;
648
  char    dbNameFilter[TSDB_DB_NAME_LEN];
649
  char    stbNameFilter[TSDB_TABLE_NAME_LEN];
650
} STableCountScanSupp;
651

652
typedef struct SOptrBasicInfo {
653
  SResultRowInfo resultRowInfo;
654
  SSDataBlock*   pRes;
655
  bool           mergeResultBlock;
656
  int32_t        inputTsOrder;
657
  int32_t        outputTsOrder;
658
} SOptrBasicInfo;
659

660
typedef struct SIntervalAggOperatorInfo {
661
  SOptrBasicInfo     binfo;              // basic info
662
  SAggSupporter      aggSup;             // aggregate supporter
663
  SExprSupp          scalarSupp;         // supporter for perform scalar function
664
  SGroupResInfo      groupResInfo;       // multiple results build supporter
665
  SInterval          interval;           // interval info
666
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
667
  STimeWindow        win;                // query time range
668
  bool               timeWindowInterpo;  // interpolation needed or not
669
  SArray*            pInterpCols;        // interpolation columns
670
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
671
  STimeWindowAggSupp twAggSup;
672
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
673
  bool               cleanGroupResInfo;
674
  struct SOperatorInfo* pOperator;
675
  // for limit optimization
676
  bool          limited;
677
  int64_t       limit;
678
  bool          slimited;
679
  int64_t       slimit;
680
  uint64_t      curGroupId;  // initialize to UINT64_MAX
681
  uint64_t      handledGroupNum;
682
  BoundedQueue* pBQ;
683
} SIntervalAggOperatorInfo;
684

685
typedef struct SMergeAlignedIntervalAggOperatorInfo {
686
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
687

688
  uint64_t     groupId;  // current groupId
689
  int64_t      curTs;    // current ts
690
  SSDataBlock* prefetchedBlock;
691
  SResultRow*  pResultRow;
692
} SMergeAlignedIntervalAggOperatorInfo;
693

694
typedef struct SOpCheckPointInfo {
695
  uint16_t  checkPointId;
696
  SHashObj* children;  // key:child id
697
} SOpCheckPointInfo;
698

699
typedef struct SDataGroupInfo {
700
  uint64_t groupId;
701
  int64_t  numOfRows;
702
  SArray*  pPageList;
703
  SArray*  blockForNotLoaded;   // SSDataBlock that data is not loaded
704
  int32_t  offsetForNotLoaded;  // read offset for SSDataBlock that data is not loaded
705
} SDataGroupInfo;
706

707
typedef struct SWindowRowsSup {
708
  STimeWindow win;
709
  TSKEY       prevTs;  // previous timestamp, used for window aggregation
710
  int32_t     startRowIndex;
711
  int32_t     numOfRows;
712
  uint64_t    groupId;
713
  uint32_t    numNullRows;  // number of continuous rows with null state col
714
  TSKEY       lastTs;  // last row's timestamp, used for checking duplicated ts
715
} SWindowRowsSup;
716

717
// return true if there are continuous rows with null state col
718
// state window operator needs to handle these rows specially
719
static inline bool hasContinuousNullRows(SWindowRowsSup* pRowSup) {
2,147,483,647✔
720
  return pRowSup->numNullRows > 0;
2,147,483,647✔
721
}
722

723
// reset on initialization or found of a row with non-null state col
724
static inline void resetNumNullRows(SWindowRowsSup* pRowSup) {
2,147,483,647✔
725
  pRowSup->numNullRows = 0;
2,147,483,647✔
726
}
2,147,483,647✔
727

728
static inline void resetWindowRowsSup(SWindowRowsSup* pRowSup) {
14,427,621✔
729
  if (NULL == pRowSup) {
14,427,621✔
730
    return;
×
731
  }
732

733
  pRowSup->win.skey = pRowSup->win.ekey = 0;
14,427,621✔
734
  pRowSup->prevTs = INT64_MIN;
14,427,621✔
735
  pRowSup->startRowIndex = pRowSup->groupId = 0;
14,427,621✔
736
  pRowSup->numOfRows = pRowSup->numNullRows = 0;
14,427,621✔
737
}
738

739
typedef int32_t (*AggImplFn)(struct SOperatorInfo* pOperator, SSDataBlock* pBlock);
740

741
typedef struct SSessionAggOperatorInfo {
742
  SOptrBasicInfo        binfo;
743
  SAggSupporter         aggSup;
744
  SExprSupp             scalarSupp;  // supporter for perform scalar function
745
  SGroupResInfo         groupResInfo;
746
  SWindowRowsSup        winSup;
747
  bool                  reptScan;  // next round scan
748
  int64_t               gap;       // session window gap
749
  int32_t               tsSlotId;  // primary timestamp slot id
750
  STimeWindowAggSupp    twAggSup;
751
  struct SOperatorInfo* pOperator;
752
  bool                  cleanGroupResInfo;
753
} SSessionAggOperatorInfo;
754

755
typedef struct SStateWindowOperatorInfo {
756
  SOptrBasicInfo        binfo;
757
  SAggSupporter         aggSup;
758
  SExprSupp             scalarSup;
759
  SGroupResInfo         groupResInfo;
760
  SWindowRowsSup        winSup;
761
  SColumn               stateCol;
762
  bool                  hasKey;    // has key means the state window has started
763
  SStateKeys            stateKey;
764
  int32_t               tsSlotId;  // primary timestamp column slot id
765
  STimeWindowAggSupp    twAggSup;
766
  struct SOperatorInfo* pOperator;
767
  bool                  cleanGroupResInfo;
768
  int64_t               trueForLimit;
769
  EStateWinExtendOption extendOption;
770
} SStateWindowOperatorInfo;
771

772

773
typedef struct SEventWindowOperatorInfo {
774
  SOptrBasicInfo     binfo;
775
  SAggSupporter      aggSup;
776
  SExprSupp          scalarSup;
777
  SWindowRowsSup     winSup;
778
  int32_t            tsSlotId;  // primary timestamp column slot id
779
  STimeWindowAggSupp twAggSup;
780
  uint64_t           groupId;  // current group id, used to identify the data block from different groups
781
  SFilterInfo*       pStartCondInfo;
782
  SFilterInfo*       pEndCondInfo;
783
  bool               inWindow;
784
  SResultRow*        pRow;
785
  SSDataBlock*       pPreDataBlock;
786
  struct SOperatorInfo*     pOperator;
787
  int64_t            trueForLimit;
788
} SEventWindowOperatorInfo;
789

790
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
791
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
792
#define OPTR_CLR_OPENED(_optr) ((_optr)->status &= ~OP_OPENED)
793

794
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
795

796
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
797
                                   SExecTaskInfo* pTaskInfo);
798
void    cleanupQueriedTableScanInfo(void* p);
799

800
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
801
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
802

803
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
804
void checkIndefRowsFuncs(SExprSupp* pSup);
805
void    cleanupExprSupp(SExprSupp* pSup);
806
void    cleanupExprSuppWithoutFilter(SExprSupp* pSupp);
807

808
void     cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup,
809
                                   SGroupResInfo* pGroupResInfo);
810
void     cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
811
                                    SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
812
void     cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
813
                           SAggSupporter *pAggSup, bool cleanHashmap);
814
void     cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
815
                                      SGroupResInfo* pGroupResInfo);
816

817
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
818
                   const char* pkey, void* pState, SFunctionStateStore* pStore);
819
void    cleanupAggSup(SAggSupporter* pAggSup);
820

821
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
822

823
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
824
                            SDiskbasedBuf* pBuf);
825

826
/**
827
 * @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
828
 */
829
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
830
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
831
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
832
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, int64_t minWindowSize);
833

834
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
835
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
836
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
837
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
838
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
839

840
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
841
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
842

843
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
844
                             int32_t numOfExprs);
845
int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);                             
846
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
847
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
848
                             struct SOperatorInfo* pOperator);
849

850
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
851
int32_t     getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz);
852

853
extern void doDestroyExchangeOperatorInfo(void* param);
854

855
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet);
856
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
857
                               int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
858

859
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
860
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
861
                            int32_t* rowEntryInfoOffset);
862
void    clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
863

864
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
865
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
866
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
867

868
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
869
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams);
870
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
871
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
872
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc);
873

874
int32_t setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
875
                          bool createDummyCol);
876

877
int32_t checkForQueryBuf(size_t numOfTables);
878

879
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
880

881
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
882
                                int32_t order);
883
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
884
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
885
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
886
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
887
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
888
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
889
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
890
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
891
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
892
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
893
                           SStateStore* pStore);
894

895
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
896

897
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
898
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
899

900
bool    groupbyTbname(SNodeList* pGroupList);
901
void    getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
902
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
903
                               int64_t* pData);
904
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
905
SExprInfo*   createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
906

907
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
908
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
909
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
910

911
void    streamOpReleaseState(struct SOperatorInfo* pOperator);
912
void    streamOpReloadState(struct SOperatorInfo* pOperator);
913
void    destroyStreamAggSupporter(SStreamAggSupporter* pSup);
914
void    clearGroupResInfo(SGroupResInfo* pGroupResInfo);
915
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
916
                        SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
917
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
918
                               SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
919
                               SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
920
                               SStorageAPI* pApi, int32_t tsIndex, int8_t stateType, int32_t ratio);
921
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
922
                       int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic, int64_t recalculateInterval);
923
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
924
void    initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
925
void    getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
926
int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
927
                              SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
928
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
929
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
930
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
931
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
932
void    removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
933
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
934
                           int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
935
                           struct SOperatorInfo* pOperator, int64_t winDelta);
936
void    setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
937
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo);
938
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
939
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
940
void    removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
941
void    doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
942
                               SGroupResInfo* pGroupResInfo);
943
void    doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
944
                             SSDataBlock* pBlock, SArray* pSessionKeys);
945
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
946
void    getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
947
                              SResultWindowInfo* pNextWin);
948
int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
949
                          SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
950
                          SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
951
void    releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
952
void    resetWinRange(STimeWindow* winRange);
953
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
954
void    resetUnCloseSessionWinInfo(SSHashObj* winMap);
955
void    setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
956
void    reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
957
void    destroyFlusedPos(void* pRes);
958
bool    isIrowtsPseudoColumn(SExprInfo* pExprInfo);
959
bool    isIsfilledPseudoColumn(SExprInfo* pExprInfo);
960
bool    isInterpFunc(SExprInfo* pExprInfo);
961
bool    isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo);
962

963
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
964
void*   decodeSSessionKey(void* buf, SSessionKey* key);
965
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen);
966
void*   decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen);
967
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
968
void*   decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
969

970
void    destroyOperatorParamValue(void* pValues);
971
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
972
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
973
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type);
974
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
975
                                          int32_t srcOpType, TSKEY notifyTs);
976
void    freeExchangeGetBasicOperatorParam(void* pParam);
977
void    freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree);
978
int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
979
                                       SSDataBlock** pResBlock);
980
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange);
981
void    doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey);
982

983
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key);
984
void    removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins);
985
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
986
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
987

988
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo);
989
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
990
bool compareVal(const char* v, const SStateKeys* pKey);
991
bool inWinRange(STimeWindow* range, STimeWindow* cur);
992
int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result);
993

994
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
995
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
996
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
997
bool    getIgoreNullRes(SExprSupp* pExprSup);
998
bool    checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
999
int64_t getMinWindowSize(struct SOperatorInfo* pOperator);
1000

1001
void    destroyTmqScanOperatorInfo(void* param);
1002
int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out);
1003
void resetBasicOperatorState(SOptrBasicInfo* pBasicInfo);
1004

1005
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
1006

1007
#ifdef __cplusplus
1008
}
1009
#endif
1010

1011
#endif  // TDENGINE_EXECUTORINT_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc