• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4853

14 Nov 2025 08:06AM UTC coverage: 63.951% (+0.1%) from 63.812%
#4853

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

355 of 675 new or added lines in 18 files covered. (52.59%)

2781 existing lines in 25 files now uncovered.

150719 of 235679 relevant lines covered (63.95%)

117936996.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/source/libs/executor/inc/executorInt.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifndef TDENGINE_EXECUTORINT_H
16
#define TDENGINE_EXECUTORINT_H
17

18
#ifdef __cplusplus
19
extern "C" {
20
#endif
21

22
#include "os.h"
23
#include "tcommon.h"
24
#include "theap.h"
25
#include "tlosertree.h"
26
#include "tsort.h"
27
#include "tvariant.h"
28

29
#include "dataSinkMgt.h"
30
#include "executil.h"
31
#include "executor.h"
32
#include "planner.h"
33
#include "scalar.h"
34
#include "taosdef.h"
35
#include "tarray.h"
36
#include "tfill.h"
37
#include "thash.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tpagedbuf.h"
41
#include "tlrucache.h"
42
#include "tworker.h"
43

44
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
45

46
typedef struct STsdbReader STsdbReader;
47
typedef struct STqReader   STqReader;
48

49
typedef enum SOperatorParamType { OP_GET_PARAM = 1, OP_NOTIFY_PARAM } SOperatorParamType;
50

51
typedef enum EExtWinMode {
52
  EEXT_MODE_SCALAR = 1,
53
  EEXT_MODE_AGG,
54
  EEXT_MODE_INDEFR_FUNC,
55
} EExtWinMode;
56

57
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
58
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
59
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
60
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
61

62
#define IS_STREAM_MODE(_task) ((_task)->execModel == OPTR_EXEC_MODEL_STREAM)
63

64
/**
65
 * If the number of generated results is greater than this value,
66
 * query query will be halt and return results to client immediate.
67
 */
68
typedef struct SResultInfo {  // TODO refactor
69
  int64_t totalRows;          // total generated result size in rows
70
  int64_t totalBytes;         // total results in bytes.
71
  int32_t capacity;           // capacity of current result output buffer
72
  int32_t threshold;          // result size threshold in rows.
73
} SResultInfo;
74

75
typedef struct STableQueryInfo {
76
  TSKEY              lastKey;  // last check ts, todo remove it later
77
  SResultRowPosition pos;      // current active time window
78
} STableQueryInfo;
79

80
typedef struct SLimit {
81
  int64_t limit;
82
  int64_t offset;
83
} SLimit;
84

85
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
86

87
enum {
88
  STREAM_RECOVER_STEP__NONE = 0,
89
  STREAM_RECOVER_STEP__PREPARE1,
90
  STREAM_RECOVER_STEP__PREPARE2,
91
  STREAM_RECOVER_STEP__SCAN1,
92
};
93

94
extern int32_t exchangeObjRefPool;
95

96
typedef struct {
97
  char*   pData;
98
  bool    isNull;
99
  int16_t type;
100
  int32_t bytes;
101
} SGroupKeys, SStateKeys;
102

103
typedef struct {
104
  char*           tablename;
105
  char*           dbname;
106
  int32_t         tversion;
107
  int32_t         rversion;
108
  SSchemaWrapper* sw;
109
  SSchemaWrapper* qsw;
110
} SSchemaInfo;
111

112
typedef struct SExchangeOpStopInfo {
113
  int32_t operatorType;
114
  int64_t refId;
115
} SExchangeOpStopInfo;
116

117
typedef struct SGcOperatorParam {
118
  int64_t sessionId;
119
  int32_t downstreamIdx;
120
  int32_t vgId;
121
  int64_t tbUid;
122
  bool    needCache;
123
} SGcOperatorParam;
124

125
typedef struct SGcNotifyOperatorParam {
126
  int32_t downstreamIdx;
127
  int32_t vgId;
128
  int64_t tbUid;
129
} SGcNotifyOperatorParam;
130

131
struct SExprSupp {
132
  SExprInfo*      pExprInfo;
133
  int32_t         numOfExprs;  // the number of scalar expression in group operator
134
  SqlFunctionCtx* pCtx;
135
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
136
  SFilterInfo*    pFilterInfo;
137
  bool            hasWindowOrGroup;
138
  bool            hasIndefRowsFunc;
139
};
140

141
typedef enum {
142
  EX_SOURCE_DATA_NOT_READY = 0x1,
143
  EX_SOURCE_DATA_STARTED,
144
  EX_SOURCE_DATA_READY,
145
  EX_SOURCE_DATA_EXHAUSTED,
146
} EX_SOURCE_STATUS;
147

148
#define COL_MATCH_FROM_COL_ID  0x1
149
#define COL_MATCH_FROM_SLOT_ID 0x2
150

151
typedef struct SLoadRemoteDataInfo {
152
  uint64_t totalSize;     // total load bytes from remote
153
  uint64_t totalRows;     // total number of rows
154
  uint64_t totalElapsed;  // total elapsed time
155
} SLoadRemoteDataInfo;
156

157
typedef struct SLimitInfo {
158
  SLimit   limit;
159
  SLimit   slimit;
160
  uint64_t currentGroupId;
161
  int64_t  remainGroupOffset;
162
  int64_t  numOfOutputGroups;
163
  int64_t  remainOffset;
164
  int64_t  numOfOutputRows;
165
} SLimitInfo;
166

167
typedef struct SSortMergeJoinOperatorParam {
168
  bool initDownstream;
169
} SSortMergeJoinOperatorParam;
170

171
typedef struct SExchangeOperatorBasicParam {
172
  int32_t               vgId;
173
  int32_t               srcOpType;
174
  bool                  tableSeq;
175
  SArray*               uidList;
176
  bool                  isVtbRefScan;
177
  bool                  isVtbTagScan;
178
  bool                  isNewDeployed; // used with newDeployedSrc
179
  SOrgTbInfo*           colMap;
180
  STimeWindow           window;
181
  SDownstreamSourceNode newDeployedSrc; // used with isNewDeployed
182
} SExchangeOperatorBasicParam;
183

184
typedef struct SExchangeOperatorBatchParam {
185
  bool       multiParams;
186
  SSHashObj* pBatchs;  // SExchangeOperatorBasicParam
187
} SExchangeOperatorBatchParam;
188

189
typedef struct SExchangeOperatorParam {
190
  bool                        multiParams;
191
  SExchangeOperatorBasicParam basic;
192
} SExchangeOperatorParam;
193

194
typedef struct SExchangeSrcIndex {
195
  int32_t srcIdx;
196
  int32_t inUseIdx;
197
} SExchangeSrcIndex;
198

199
typedef struct SExchangeInfo {
200
  int64_t    seqId;
201
  SArray*    pSources;
202
  SSHashObj* pHashSources;
203
  SArray*    pSourceDataInfo;
204
  tsem_t     ready;
205
  void*      pTransporter;
206

207
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
208
  // passed by downstream operator
209
  SArray*      pResultBlockList;
210
  SArray*      pRecycledBlocks;  // build a pool for small data block to avoid to repeatly create and then destroy.
211
  SSDataBlock* pDummyBlock;      // dummy block, not keep data
212
  bool         seqLoadData;      // sequential load data or not, false by default
213
  bool         dynamicOp;
214
  bool         dynTbname;         // %%tbname for stream    
215
  int32_t      current;
216
  SLoadRemoteDataInfo loadInfo;
217
  uint64_t            self;
218
  SLimitInfo          limitInfo;
219
  int64_t             openedTs;  // start exec time stamp, todo: move to SLoadRemoteDataInfo
220
  char*               pTaskId;
221
  SArray*             pFetchRpcHandles;
222
} SExchangeInfo;
223

224
typedef struct SScanInfo {
225
  int32_t numOfAsc;
226
  int32_t numOfDesc;
227
} SScanInfo;
228

229
typedef struct SSampleExecInfo {
230
  double   sampleRatio;  // data block sample ratio, 1 by default
231
  uint32_t seed;         // random seed value
232
} SSampleExecInfo;
233

234
enum {
235
  TABLE_SCAN__TABLE_ORDER = 1,
236
  TABLE_SCAN__BLOCK_ORDER = 2,
237
};
238

239
typedef enum ETableCountState {
240
  TABLE_COUNT_STATE_NONE = 0,       // before start scan
241
  TABLE_COUNT_STATE_SCAN = 1,       // cur group scanning
242
  TABLE_COUNT_STATE_PROCESSED = 2,  // cur group processed
243
  TABLE_COUNT_STATE_END = 3,        // finish or noneed to process
244
} ETableCountState;
245

246
struct SAggSupporter {
247
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
248
  char*          keyBuf;               // window key buffer
249
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
250
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
251
  int32_t        currentPageId;  // current write page id
252
};
253

254
typedef struct {
255
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
256
  // current data block needs to be loaded.
257
  SInterval      interval;
258
  SAggSupporter* pAggSup;
259
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
260
} SAggOptrPushDownInfo;
261

262
typedef struct STableMetaCacheInfo {
263
  SLRUCache* pTableMetaEntryCache;  // 100 by default
264
  uint64_t   metaFetch;
265
  uint64_t   cacheHit;
266
} STableMetaCacheInfo;
267

268
typedef struct STableScanBase {
269
  STsdbReader*           dataReader;
270
  SFileBlockLoadRecorder readRecorder;
271
  SQueryTableDataCond    cond;
272
  SQueryTableDataCond    orgCond; // use for virtual super table scan
273
  SAggOptrPushDownInfo   pdInfo;
274
  SColMatchInfo          matchInfo;
275
  SReadHandle            readHandle;
276
  SExprSupp              pseudoSup;
277
  STableMetaCacheInfo    metaCache;
278
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
279
  int32_t                dataBlockLoadFlag;
280
  SLimitInfo             limitInfo;
281
  // there are more than one table list exists in one task, if only one vnode exists.
282
  STableListInfo* pTableListInfo;
283
  TsdReader       readerAPI;
284
} STableScanBase;
285

286
typedef struct STableScanInfo {
287
  STableScanBase  base;
288
  SScanInfo       scanInfo;
289
  int32_t         scanTimes;
290
  SSDataBlock*    pResBlock;
291
  SHashObj*       pIgnoreTables;
292
  SSampleExecInfo sample;           // sample execution info
293
  int32_t         tableStartIndex;  // current group scan start
294
  int32_t         tableEndIndex;    // current group scan end
295
  int32_t         currentGroupId;
296
  int32_t         currentTable;
297
  int8_t          scanMode;
298
  int8_t          assignBlockUid;
299
  uint8_t         countState;  // empty table count state
300
  bool            hasGroupByTag;
301
  bool            filesetDelimited;
302
  bool            needCountEmptyTable;
303
  SSDataBlock*    pOrgBlock;
304
  bool            ignoreTag;
305
  bool            virtualStableScan;
306
  SHashObj*       readerCache;
307
  bool            newReader;
308
} STableScanInfo;
309

310
typedef enum ESubTableInputType {
311
  SUB_TABLE_MEM_BLOCK,
312
  SUB_TABLE_EXT_PAGES,
313
} ESubTableInputType;
314

315
typedef struct STmsSubTableInput {
316
  STsdbReader*        pReader;
317
  SQueryTableDataCond tblCond;
318
  STableKeyInfo*      pKeyInfo;
319
  bool                bInMemReader;
320
  ESubTableInputType  type;
321
  SSDataBlock*        pReaderBlock;
322

323
  SArray*      aBlockPages;
324
  SSDataBlock* pPageBlock;
325
  int32_t      pageIdx;
326

327
  int32_t      rowIdx;
328
  int64_t*     aTs;
329
  SSDataBlock* pInputBlock;
330
} STmsSubTableInput;
331

332
typedef struct SBlockOrderInfo SBlockOrderInfo;
333
typedef struct STmsSubTablesMergeInfo {
334
  SBlockOrderInfo* pTsOrderInfo;
335
  SBlockOrderInfo* pPkOrderInfo;
336

337
  int32_t                 numSubTables;
338
  STmsSubTableInput*      aInputs;
339
  SMultiwayMergeTreeInfo* pTree;
340
  int32_t                 numSubTablesCompleted;
341

342
  int32_t        numTableBlocksInMem;
343
  SDiskbasedBuf* pBlocksBuf;
344

345
  int32_t numInMemReaders;
346
} STmsSubTablesMergeInfo;
347

348
typedef struct STableMergeScanInfo {
349
  int32_t         tableStartIndex;
350
  int32_t         tableEndIndex;
351
  bool            hasGroupId;
352
  uint64_t        groupId;
353
  STableScanBase  base;
354
  int32_t         bufPageSize;
355
  uint32_t        sortBufSize;  // max buffer size for in-memory sort
356
  SArray*         pSortInfo;
357
  SSortHandle*    pSortHandle;
358
  SSDataBlock*    pSortInputBlock;
359
  SSDataBlock*    pReaderBlock;
360
  int64_t         startTs;  // sort start time
361
  SLimitInfo      limitInfo;
362
  int64_t         numOfRows;
363
  SScanInfo       scanInfo;
364
  int32_t         scanTimes;
365
  int32_t         readIdx;
366
  SSDataBlock*    pResBlock;
367
  SSampleExecInfo sample;         // sample execution info
368
  SSHashObj*      mTableNumRows;  // uid->num of table rows
369
  SHashObj*       mSkipTables;
370
  int64_t         mergeLimit;
371
  SSortExecInfo   sortExecInfo;
372
  bool            needCountEmptyTable;
373
  bool            bGroupProcessed;  // the group return data means processed
374
  bool            filesetDelimited;
375
  bool            bNewFilesetEvent;
376
  bool            bNextDurationBlockEvent;
377
  int32_t         numNextDurationBlocks;
378
  SSDataBlock*    nextDurationBlocks[2];
379
  bool            rtnNextDurationBlocks;
380
  int32_t         nextDurationBlocksIdx;
381
  bool            bSortRowId;
382

383
  STmsSubTablesMergeInfo* pSubTablesMergeInfo;
384
} STableMergeScanInfo;
385

386
typedef struct STagScanFilterContext {
387
  SHashObj* colHash;
388
  int32_t   index;
389
  SArray*   cInfoList;
390
  int32_t   code;
391
} STagScanFilterContext;
392

393
typedef struct STagScanInfo {
394
  SColumnInfo*          pCols;
395
  SSDataBlock*          pRes;
396
  SColMatchInfo         matchInfo;
397
  int32_t               curPos;
398
  SReadHandle           readHandle;
399
  STableListInfo*       pTableListInfo;
400
  uint64_t              suid;
401
  void*                 pCtbCursor;
402
  SNode*                pTagCond;
403
  SNode*                pTagIndexCond;
404
  STagScanFilterContext filterCtx;
405
  SArray*               aUidTags;     // SArray<STUidTagInfo>
406
  SArray*               aFilterIdxs;  // SArray<int32_t>
407
  SStorageAPI*          pStorageAPI;
408
  SLimitInfo            limitInfo;
409
} STagScanInfo;
410

411
typedef enum EStreamScanMode {
412
  STREAM_SCAN_FROM_READERHANDLE = 1,
413
  STREAM_SCAN_FROM_RES,
414
  STREAM_SCAN_FROM_UPDATERES,
415
  STREAM_SCAN_FROM_DELETE_DATA,
416
  STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
417
  STREAM_SCAN_FROM_DATAREADER_RANGE,
418
  STREAM_SCAN_FROM_CREATE_TABLERES,
419
} EStreamScanMode;
420

421
enum {
422
  PROJECT_RETRIEVE_CONTINUE = 0x1,
423
  PROJECT_RETRIEVE_DONE = 0x2,
424
};
425

426
typedef struct SStreamAggSupporter {
427
  int32_t         resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
428
  SSDataBlock*    pScanBlock;
429
  SStreamState*   pState;
430
  int64_t         gap;        // stream session window gap
431
  SqlFunctionCtx* pDummyCtx;  // for combine
432
  SSHashObj*      pResultRows;
433
  int32_t         stateKeySize;
434
  int16_t         stateKeyType;
435
  SDiskbasedBuf*  pResultBuf;
436
  SStateStore     stateStore;
437
  STimeWindow     winRange;
438
  SStorageAPI*    pSessionAPI;
439
  struct SUpdateInfo* pUpdateInfo;
440
  int32_t             windowCount;
441
  int32_t             windowSliding;
442
  SStreamStateCur*    pCur;
443
} SStreamAggSupporter;
444

445
typedef struct SWindowSupporter {
446
  SStreamAggSupporter* pStreamAggSup;
447
  int64_t              gap;
448
  uint16_t             parentType;
449
  SAggSupporter*       pIntervalAggSup;
450
} SWindowSupporter;
451

452
typedef struct SPartitionBySupporter {
453
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
454
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
455
  char*   keyBuf;         // group by keys for hash
456
  bool    needCalc;       // partition by column
457
} SPartitionBySupporter;
458

459
typedef struct SPartitionDataInfo {
460
  uint64_t groupId;
461
  char*    tbname;
462
  SArray*  rowIds;
463
} SPartitionDataInfo;
464

465
typedef struct STimeWindowAggSupp {
466
  int8_t          calTrigger;
467
  int8_t          calTriggerSaved;
468
  int64_t         deleteMark;
469
  int64_t         deleteMarkSaved;
470
  int64_t         waterMark;
471
  TSKEY           maxTs;
472
  TSKEY           minTs;
473
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
474
} STimeWindowAggSupp;
475

476
typedef struct SStreamNotifyEventSupp {
477
  SHashObj*    pWindowEventHashMap;  // Hash map from gorupid+skey+eventType to the list node of window event.
478
  SHashObj*    pTableNameHashMap;    // Hash map from groupid to the dest child table name.
479
  SSDataBlock* pEventBlock;          // The datablock contains all window events and results.
480
  SArray*      pSessionKeys;
481
  const char*  windowType;
482
} SStreamNotifyEventSupp;
483

484
typedef struct SSteamOpBasicInfo {
485
  int32_t                primaryPkIndex;
486
  int16_t                operatorFlag;
487
  SStreamNotifyEventSupp notifyEventSup;
488
  bool                   recvCkBlock;
489
  SSDataBlock*           pCheckpointRes;
490
  SSHashObj*             pSeDeleted;
491
  void*                  pDelIterator;
492
  SSDataBlock*           pDelRes;
493
  SArray*                pUpdated;
494
  STableTsDataState*     pTsDataState;
495
  int32_t                numOfRecv;
496
} SSteamOpBasicInfo;
497

498
typedef struct SStreamFillSupporter {
499
  int32_t        type;  // fill type
500
  SInterval      interval;
501
  SResultRowData prev;
502
  TSKEY          prevOriginKey;
503
  SResultRowData cur;
504
  SResultRowData next;
505
  TSKEY          nextOriginKey;
506
  SResultRowData nextNext;
507
  SFillColInfo*  pAllColInfo;  // fill exprs and not fill exprs
508
  SExprSupp      notFillExprSup;
509
  int32_t        numOfAllCols;  // number of all exprs, including the tags columns
510
  int32_t        numOfFillCols;
511
  int32_t        numOfNotFillCols;
512
  int32_t        rowSize;
513
  SSHashObj*     pResMap;
514
  bool           hasDelete;
515
  SStorageAPI*   pAPI;
516
  STimeWindow    winRange;
517
  int32_t        pkColBytes;
518
  __compar_fn_t  comparePkColFn;
519
  int32_t*       pOffsetInfo;
520
  bool           normalFill;
521
  void*          pEmptyRow;
522
  SArray*        pResultRange;
523
} SStreamFillSupporter;
524

525
typedef struct SStreamScanInfo {
526
  SSteamOpBasicInfo basic;
527
  SExprInfo*        pPseudoExpr;
528
  int32_t           numOfPseudoExpr;
529
  SExprSupp         tbnameCalSup;
530
  SExprSupp*        pPartTbnameSup;
531
  SExprSupp         tagCalSup;
532
  int32_t           primaryTsIndex;  // primary time stamp slot id
533
  int32_t           primaryKeyIndex;
534
  SReadHandle       readHandle;
535
  SInterval         interval;  // if the upstream is an interval operator, the interval info is also kept here.
536
  SColMatchInfo     matchInfo;
537

538
  SArray*      pBlockLists;  // multiple SSDatablock.
539
  SSDataBlock* pRes;         // result SSDataBlock
540
  SSDataBlock* pUpdateRes;   // update SSDataBlock
541
  int32_t      updateResIndex;
542
  int32_t      blockType;        // current block type
543
  int32_t      validBlockIndex;  // Is current data has returned?
544
  uint64_t     numOfExec;        // execution times
545
  STqReader*   tqReader;
546

547
  SHashObj*       pVtableMergeHandles;  // key: vtable uid, value: SStreamVtableMergeHandle
548
  SDiskbasedBuf*  pVtableMergeBuf;      // page buffer used by vtable merge
549
  SArray*         pVtableReadyHandles;
550
  STableListInfo* pTableListInfo;
551

552
  uint64_t            groupId;
553
  bool                igCheckGroupId;
554
  struct SUpdateInfo* pUpdateInfo;
555

556
  EStreamScanMode       scanMode;
557
  struct SOperatorInfo* pStreamScanOp;
558
  struct SOperatorInfo* pTableScanOp;
559
  SArray*               childIds;
560
  SWindowSupporter      windowSup;
561
  SPartitionBySupporter partitionSup;
562
  SExprSupp*            pPartScalarSup;
563
  bool                  assignBlockUid;  // assign block uid to groupId, temporarily used for generating rollup SMA.
564
  int32_t               scanWinIndex;    // for state operator
565
  SSDataBlock*          pDeleteDataRes;  // delete data SSDataBlock
566
  int32_t               deleteDataIndex;
567
  STimeWindow           updateWin;
568
  STimeWindowAggSupp    twAggSup;
569
  SSDataBlock*          pUpdateDataRes;
570
  SStreamFillSupporter* pFillSup;
571
  // status for tmq
572
  SNodeList* pGroupTags;
573
  SNode*     pTagCond;
574
  SNode*     pTagIndexCond;
575

576
  // recover
577
  int32_t      blockRecoverTotCnt;
578
  SSDataBlock* pRecoverRes;
579

580
  SSDataBlock*      pCreateTbRes;
581
  int8_t            igCheckUpdate;
582
  int8_t            igExpired;
583
  void*             pState;  // void
584
  SStoreTqReader    readerFn;
585
  SStateStore       stateStore;
586
  SSDataBlock*      pCheckpointRes;
587
  int8_t            pkColType;
588
  int32_t           pkColLen;
589
  bool              useGetResultRange;
590
  STimeWindow       lastScanRange;
591
  SSDataBlock*      pRangeScanRes;  // update SSDataBlock
592
  bool              hasPart;
593

594
  //nonblock data scan
595
  TSKEY                  recalculateInterval;
596
  __compar_fn_t          comparePkColFn;
597
  SScanRange             curRange;
598
  struct SOperatorInfo*  pRecTableScanOp;
599
  bool                   scanAllTables;
600
  SSHashObj*             pRecRangeMap;
601
  SArray*                pRecRangeRes;
602
} SStreamScanInfo;
603

604
typedef struct {
605
  struct SVnode*       vnode;  // todo remove this
606
  SSDataBlock          pRes;   // result SSDataBlock
607
  STsdbReader*         dataReader;
608
  struct SSnapContext* sContext;
609
  SStorageAPI*         pAPI;
610
  STableListInfo*      pTableListInfo;
611
} SStreamRawScanInfo;
612

613
typedef struct STableCountScanSupp {
614
  int16_t dbNameSlotId;
615
  int16_t stbNameSlotId;
616
  int16_t tbCountSlotId;
617
  bool    groupByDbName;
618
  bool    groupByStbName;
619
  char    dbNameFilter[TSDB_DB_NAME_LEN];
620
  char    stbNameFilter[TSDB_TABLE_NAME_LEN];
621
} STableCountScanSupp;
622

623
typedef struct SOptrBasicInfo {
624
  SResultRowInfo resultRowInfo;
625
  SSDataBlock*   pRes;
626
  bool           mergeResultBlock;
627
  int32_t        inputTsOrder;
628
  int32_t        outputTsOrder;
629
} SOptrBasicInfo;
630

631
typedef struct SIntervalAggOperatorInfo {
632
  SOptrBasicInfo     binfo;              // basic info
633
  SAggSupporter      aggSup;             // aggregate supporter
634
  SExprSupp          scalarSupp;         // supporter for perform scalar function
635
  SGroupResInfo      groupResInfo;       // multiple results build supporter
636
  SInterval          interval;           // interval info
637
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
638
  STimeWindow        win;                // query time range
639
  bool               timeWindowInterpo;  // interpolation needed or not
640
  SArray*            pInterpCols;        // interpolation columns
641
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
642
  STimeWindowAggSupp twAggSup;
643
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
644
  bool               cleanGroupResInfo;
645
  struct SOperatorInfo* pOperator;
646
  // for limit optimization
647
  bool          limited;
648
  int64_t       limit;
649
  bool          slimited;
650
  int64_t       slimit;
651
  uint64_t      curGroupId;  // initialize to UINT64_MAX
652
  uint64_t      handledGroupNum;
653
  BoundedQueue* pBQ;
654
} SIntervalAggOperatorInfo;
655

656
typedef struct SMergeAlignedIntervalAggOperatorInfo {
657
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
658

659
  uint64_t     groupId;  // current groupId
660
  int64_t      curTs;    // current ts
661
  SSDataBlock* prefetchedBlock;
662
  SResultRow*  pResultRow;
663
} SMergeAlignedIntervalAggOperatorInfo;
664

665
typedef struct SOpCheckPointInfo {
666
  uint16_t  checkPointId;
667
  SHashObj* children;  // key:child id
668
} SOpCheckPointInfo;
669

670
typedef struct SDataGroupInfo {
671
  uint64_t groupId;
672
  int64_t  numOfRows;
673
  SArray*  pPageList;
674
  SArray*  blockForNotLoaded;   // SSDataBlock that data is not loaded
675
  int32_t  offsetForNotLoaded;  // read offset for SSDataBlock that data is not loaded
676
} SDataGroupInfo;
677

678
typedef struct SWindowRowsSup {
679
  STimeWindow win;
680
  TSKEY       prevTs;
681
  int32_t     startRowIndex;
682
  int32_t     numOfRows;
683
  uint64_t    groupId;
684
  uint32_t    numNullRows;  // number of continuous rows with null state col
685
  TSKEY       lastTs; // this ts is used to record the last timestamp, so that we can know whether the new row's ts is duplicated
686
} SWindowRowsSup;
687

688
// return true if there are continuous rows with null state col
689
// state window operator needs to handle these rows specially
690
static inline bool hasContinuousNullRows(SWindowRowsSup* pRowSup) {
2,147,483,647✔
691
  return pRowSup->numNullRows > 0;
2,147,483,647✔
692
}
693

694
// reset on initialization or found of a row with non-null state col
695
static inline void resetNumNullRows(SWindowRowsSup* pRowSup) {
1,450,128,681✔
696
  pRowSup->numNullRows = 0;
1,450,128,681✔
697
}
1,450,128,681✔
698

699
static inline void resetWindowRowsSup(SWindowRowsSup* pRowSup) {
40,710✔
700
  if (NULL == pRowSup) {
40,710✔
UNCOV
701
    return;
×
702
  }
703

704
  pRowSup->win.skey = pRowSup->win.ekey = 0;
40,710✔
705
  pRowSup->prevTs = pRowSup->startRowIndex = 0;
40,710✔
706
  pRowSup->numOfRows = pRowSup->groupId = 0;
40,710✔
707
  resetNumNullRows(pRowSup);
40,710✔
708
}
709

710
typedef int32_t (*AggImplFn)(struct SOperatorInfo* pOperator, SSDataBlock* pBlock);
711

712
typedef struct SSessionAggOperatorInfo {
713
  SOptrBasicInfo        binfo;
714
  SAggSupporter         aggSup;
715
  SExprSupp             scalarSupp;  // supporter for perform scalar function
716
  SGroupResInfo         groupResInfo;
717
  SWindowRowsSup        winSup;
718
  bool                  reptScan;  // next round scan
719
  int64_t               gap;       // session window gap
720
  int32_t               tsSlotId;  // primary timestamp slot id
721
  STimeWindowAggSupp    twAggSup;
722
  struct SOperatorInfo* pOperator;
723
  bool                  cleanGroupResInfo;
724
} SSessionAggOperatorInfo;
725

726
typedef struct SStateWindowOperatorInfo {
727
  SOptrBasicInfo        binfo;
728
  SAggSupporter         aggSup;
729
  SExprSupp             scalarSup;
730
  SGroupResInfo         groupResInfo;
731
  SWindowRowsSup        winSup;
732
  SColumn               stateCol;
733
  bool                  hasKey;
734
  SStateKeys            stateKey;
735
  int32_t               tsSlotId;  // primary timestamp column slot id
736
  STimeWindowAggSupp    twAggSup;
737
  struct SOperatorInfo* pOperator;
738
  bool                  cleanGroupResInfo;
739
  int64_t               trueForLimit;
740
  EStateWinExtendOption extendOption;
741
} SStateWindowOperatorInfo;
742

743

744
typedef struct SEventWindowOperatorInfo {
745
  SOptrBasicInfo     binfo;
746
  SAggSupporter      aggSup;
747
  SExprSupp          scalarSup;
748
  SWindowRowsSup     winSup;
749
  int32_t            tsSlotId;  // primary timestamp column slot id
750
  STimeWindowAggSupp twAggSup;
751
  uint64_t           groupId;  // current group id, used to identify the data block from different groups
752
  SFilterInfo*       pStartCondInfo;
753
  SFilterInfo*       pEndCondInfo;
754
  bool               inWindow;
755
  SResultRow*        pRow;
756
  SSDataBlock*       pPreDataBlock;
757
  struct SOperatorInfo*     pOperator;
758
  int64_t            trueForLimit;
759
} SEventWindowOperatorInfo;
760

761
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
762
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
763
#define OPTR_CLR_OPENED(_optr) ((_optr)->status &= ~OP_OPENED)
764

765
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
766

767
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
768
                                   SExecTaskInfo* pTaskInfo);
769
void    cleanupQueriedTableScanInfo(void* p);
770

771
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
772
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
773

774
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
775
void checkIndefRowsFuncs(SExprSupp* pSup);
776
void    cleanupExprSupp(SExprSupp* pSup);
777
void    cleanupExprSuppWithoutFilter(SExprSupp* pSupp);
778

779
void     cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup,
780
                                   SGroupResInfo* pGroupResInfo);
781
void     cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
782
                                    SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
783
void     cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
784
                           SAggSupporter *pAggSup, bool cleanHashmap);
785
void     cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
786
                                      SGroupResInfo* pGroupResInfo);
787

788
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
789
                   const char* pkey, void* pState, SFunctionStateStore* pStore);
790
void    cleanupAggSup(SAggSupporter* pAggSup);
791

792
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
793

794
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
795
                            SDiskbasedBuf* pBuf);
796

797
/**
798
 * @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
799
 */
800
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
801
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
802
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
803
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, int64_t minWindowSize);
804

805
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
806
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
807
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
808
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
809
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
810

811
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
812
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
813

814
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
815
                             int32_t numOfExprs);
816
int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);                             
817
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
818
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
819
                             struct SOperatorInfo* pOperator);
820

821
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
822
int32_t     getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz);
823

824
extern void doDestroyExchangeOperatorInfo(void* param);
825

826
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet);
827
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
828
                               int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
829

830
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
831
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
832
                            int32_t* rowEntryInfoOffset);
833
void    clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
834

835
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
836
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
837
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
838

839
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
840
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams);
841
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
842
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
843
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc);
844

845
int32_t setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
846
                          bool createDummyCol);
847

848
int32_t checkForQueryBuf(size_t numOfTables);
849

850
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
851

852
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
853
                                int32_t order);
854
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
855
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
856
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
857
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
858
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
859
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
860
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
861
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
862
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
863
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
864
                           SStateStore* pStore);
865

866
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
867

868
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
869
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
870

871
bool    groupbyTbname(SNodeList* pGroupList);
872
void    getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
873
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
874
                               int64_t* pData);
875
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
876
SExprInfo*   createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
877

878
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
879
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
880
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
881

882
void    streamOpReleaseState(struct SOperatorInfo* pOperator);
883
void    streamOpReloadState(struct SOperatorInfo* pOperator);
884
void    destroyStreamAggSupporter(SStreamAggSupporter* pSup);
885
void    clearGroupResInfo(SGroupResInfo* pGroupResInfo);
886
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
887
                        SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
888
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
889
                               SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
890
                               SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
891
                               SStorageAPI* pApi, int32_t tsIndex, int8_t stateType, int32_t ratio);
892
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
893
                       int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic, int64_t recalculateInterval);
894
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
895
void    initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
896
void    getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
897
int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
898
                              SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
899
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
900
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
901
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
902
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
903
void    removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
904
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
905
                           int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
906
                           struct SOperatorInfo* pOperator, int64_t winDelta);
907
void    setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
908
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo);
909
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
910
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
911
void    removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
912
void    doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
913
                               SGroupResInfo* pGroupResInfo);
914
void    doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
915
                             SSDataBlock* pBlock, SArray* pSessionKeys);
916
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
917
void    getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
918
                              SResultWindowInfo* pNextWin);
919
int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
920
                          SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
921
                          SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
922
void    releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
923
void    resetWinRange(STimeWindow* winRange);
924
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
925
void    resetUnCloseSessionWinInfo(SSHashObj* winMap);
926
void    setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
927
void    reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
928
void    destroyFlusedPos(void* pRes);
929
bool    isIrowtsPseudoColumn(SExprInfo* pExprInfo);
930
bool    isIsfilledPseudoColumn(SExprInfo* pExprInfo);
931
bool    isInterpFunc(SExprInfo* pExprInfo);
932
bool    isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo);
933

934
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
935
void*   decodeSSessionKey(void* buf, SSessionKey* key);
936
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen);
937
void*   decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen);
938
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
939
void*   decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
940

941
void    destroyOperatorParamValue(void* pValues);
942
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
943
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
944
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window);
945
void    freeExchangeGetBasicOperatorParam(void* pParam);
946
void    freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type);
947
void    freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree);
948
int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
949
                                       SSDataBlock** pResBlock);
950
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange);
951
void    doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey);
952

953
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key);
954
void    removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins);
955
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
956
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
957

958
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
959
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
960
bool compareVal(const char* v, const SStateKeys* pKey);
961
bool inWinRange(STimeWindow* range, STimeWindow* cur);
962
int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result);
963

964
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
965
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
966
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
967
bool    getIgoreNullRes(SExprSupp* pExprSup);
968
bool    checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
969
int64_t getMinWindowSize(struct SOperatorInfo* pOperator);
970

971
void    destroyTmqScanOperatorInfo(void* param);
972
int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out);
973
void resetBasicOperatorState(SOptrBasicInfo* pBasicInfo);
974

975
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
976

977
#ifdef __cplusplus
978
}
979
#endif
980

981
#endif  // TDENGINE_EXECUTORINT_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc