• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/source/libs/executor/inc/executorInt.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifndef TDENGINE_EXECUTORINT_H
16
#define TDENGINE_EXECUTORINT_H
17

18
#ifdef __cplusplus
19
extern "C" {
20
#endif
21

22
#include "os.h"
23
#include "tcommon.h"
24
#include "theap.h"
25
#include "tlosertree.h"
26
#include "tsort.h"
27
#include "tvariant.h"
28

29
#include "dataSinkMgt.h"
30
#include "executil.h"
31
#include "executor.h"
32
#include "planner.h"
33
#include "scalar.h"
34
#include "taosdef.h"
35
#include "tarray.h"
36
#include "tfill.h"
37
#include "thash.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tpagedbuf.h"
41
#include "tlrucache.h"
42
#include "tworker.h"
43

44
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
45

46
typedef struct STsdbReader STsdbReader;
47
typedef struct STqReader   STqReader;
48

49
typedef enum EExtWinMode {
50
  EEXT_MODE_SCALAR = 1,
51
  EEXT_MODE_AGG,
52
  EEXT_MODE_INDEFR_FUNC,
53
} EExtWinMode;
54

55
#define IS_VALID_SESSION_WIN(winInfo)        ((winInfo).sessionWin.win.skey > 0)
56
#define SET_SESSION_WIN_INVALID(winInfo)     ((winInfo).sessionWin.win.skey = INT64_MIN)
57
#define IS_INVALID_SESSION_WIN_KEY(winKey)   ((winKey).win.skey <= 0)
58
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
59

60
#define IS_NON_STREAM_MODE(_task) ((_task)->execModel != OPTR_EXEC_MODEL_STREAM)
61
#define IS_STREAM_MODE(_task) ((_task)->execModel == OPTR_EXEC_MODEL_STREAM)
62
#define IS_STREAM_SINGLE_GRP(_task) (!(_task)->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc)
63

64
/**
65
 * If the number of generated results is greater than this value,
66
 * query will be halt and return results to client immediate.
67
 */
68
typedef struct SResultInfo {  // TODO refactor
69
  int64_t totalRows;          // total generated result size in rows
70
  int64_t totalBytes;         // total results in bytes.
71
  int32_t capacity;           // capacity of current result output buffer
72
  int32_t threshold;          // result size threshold in rows.
73
} SResultInfo;
74

75
typedef struct STableQueryInfo {
76
  TSKEY              lastKey;  // last check ts, todo remove it later
77
  SResultRowPosition pos;      // current active time window
78
} STableQueryInfo;
79

80
typedef struct SLimit {
81
  int64_t limit;       // default -1, no limit
82
  int64_t offset;
83
} SLimit;
84

85
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
86

87
enum {
88
  STREAM_RECOVER_STEP__NONE = 0,
89
  STREAM_RECOVER_STEP__PREPARE1,
90
  STREAM_RECOVER_STEP__PREPARE2,
91
  STREAM_RECOVER_STEP__SCAN1,
92
};
93

94
extern int32_t fetchObjRefPool;
95

96
typedef struct {
97
  char*   pData;
98
  bool    isNull;
99
  int16_t type;
100
  int32_t bytes;
101
} SGroupKeys, SStateKeys;
102

103
typedef struct {
104
  char*           tablename;
105
  char*           dbname;
106
  int32_t         tversion;
107
  int32_t         rversion;
108
  SSchemaWrapper* sw;
109
  SSchemaWrapper* qsw;
110
} SSchemaInfo;
111

112
typedef struct SExchangeOpStopInfo {
113
  int32_t operatorType;
114
  int64_t refId;
115
} SExchangeOpStopInfo;
116

117
typedef struct SGcOperatorParam {
118
  int64_t sessionId;
119
  int32_t downstreamIdx;
120
  int32_t vgId;
121
  int64_t tbUid;
122
  bool    needCache;
123
} SGcOperatorParam;
124

125
typedef struct SGcNotifyOperatorParam {
126
  int32_t downstreamIdx;
127
  int32_t vgId;
128
  int64_t tbUid;
129
} SGcNotifyOperatorParam;
130

131
struct SExprSupp {
132
  SExprInfo*      pExprInfo;
133
  int32_t         numOfExprs;  // the number of scalar expression in group operator
134
  SqlFunctionCtx* pCtx;
135
  int32_t*        rowEntryInfoOffset;  // offset value for each row result cell info
136
  SFilterInfo*    pFilterInfo;
137
  bool            hasWindowOrGroup;    // denote that the function is used with time window or group
138
  bool            hasWindow;           // denote that the function is used with time window
139
  bool            hasIndefRowsFunc;
140
};
141

142
typedef enum {
143
  EX_SOURCE_DATA_NOT_READY = 0x1,
144
  EX_SOURCE_DATA_STARTED,
145
  EX_SOURCE_DATA_READY,
146
  EX_SOURCE_DATA_EXHAUSTED,
147
} EX_SOURCE_STATUS;
148

149
#define COL_MATCH_FROM_COL_ID  0x1
150
#define COL_MATCH_FROM_SLOT_ID 0x2
151

152
typedef struct SLoadRemoteDataInfo {
153
  uint64_t totalSize;     // total load bytes from remote
154
  uint64_t totalRows;     // total number of rows
155
  uint64_t totalElapsed;  // total elapsed time
156
} SLoadRemoteDataInfo;
157

158
typedef struct SLimitInfo {
159
  SLimit   limit;
160
  SLimit   slimit;
161
  uint64_t currentGroupId;
162
  int64_t  remainGroupOffset;
163
  int64_t  numOfOutputGroups;
164
  int64_t  remainOffset;
165
  int64_t  numOfOutputRows;
166
} SLimitInfo;
167

168
typedef struct SSortMergeJoinOperatorParam {
169
  bool initDownstream;
170
} SSortMergeJoinOperatorParam;
171

172
typedef enum EExchangeSourceType {
173
  EX_SRC_TYPE_STB_JOIN_SCAN = 1,
174
  EX_SRC_TYPE_VSTB_SCAN,
175
  EX_SRC_TYPE_VSTB_WIN_SCAN,
176
  EX_SRC_TYPE_VSTB_AGG_SCAN,
177
  EX_SRC_TYPE_VSTB_TAG_SCAN,
178
  EX_SRC_TYPE_VTB_WIN_SCAN,
179
  EX_SRC_TYPE_VSTB_TS_SCAN,
180
  EX_SRC_TYPE_VSTB_INTERVAL_SCAN,
181
  EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN,
182
} EExchangeSourceType;
183

184
typedef enum {
185
  DYN_TYPE_EXCHANGE_PARAM = 1,
186
  NOTIFY_TYPE_EXCHANGE_PARAM,
187
} EExchangeGetParamType;
188

189
typedef struct SExchangeOperatorBasicParam {
190
  EExchangeGetParamType paramType;
191
  /* dynamic scan params */
192
  int32_t               vgId;
193
  int32_t               srcOpType;
194
  bool                  tableSeq;
195
  SArray*               uidList;
196
  EExchangeSourceType   type;
197
  bool                  isNewDeployed; // used with newDeployedSrc
198
  bool                  isNewParam;
199
  uint64_t              groupid;
200
  SOrgTbInfo*           orgTbInfo;
201
  SArray*               batchOrgTbInfo; // SArray<SOrgTbInfo>
202
  SArray*               tagList;
203
  STimeWindow           window;
204
  SDownstreamSourceNode newDeployedSrc; // used with isNewDeployed
205
  /* notify scan params */
206
  TSKEY notifyTs;
207
} SExchangeOperatorBasicParam;
208

209
typedef struct SExchangeOperatorBatchParam {
210
  bool       multiParams;
211
  SSHashObj* pBatchs;  // SExchangeOperatorBasicParam
212
} SExchangeOperatorBatchParam;
213

214
typedef struct SExchangeOperatorParam {
215
  bool                        multiParams;
216
  SExchangeOperatorBasicParam basic;
217
} SExchangeOperatorParam;
218

219
typedef struct SExchangeSrcIndex {
220
  int32_t srcIdx;
221
  int32_t inUseIdx;
222
} SExchangeSrcIndex;
223

224
typedef struct SExchangeInfo {
225
  bool       isExchange;  // KEEP IT FIRST
226
  int64_t    seqId;
227
  SArray*    pSources;
228
  SSHashObj* pHashSources;
229
  SArray*    pSourceDataInfo;
230
  tsem_t     ready;
231
  void*      pTransporter;
232

233
  // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
234
  // passed by downstream operator
235
  SArray*      pResultBlockList;
236
  SArray*      pRecycledBlocks;  // build a pool for small data block to avoid to repeatly create and then destroy.
237
  SSDataBlock* pDummyBlock;      // dummy block, not keep data
238
  bool         seqLoadData;      // sequential load data or not, false by default
239
  bool         dynamicOp;
240
  bool         dynTbname;         // %%tbname for stream    
241
  int32_t      current;
242
  SLoadRemoteDataInfo loadInfo;
243
  int64_t             self;
244
  SLimitInfo          limitInfo;
245
  int64_t             openedTs;  // start exec time stamp, todo: move to SLoadRemoteDataInfo
246
  char*               pTaskId;
247
  SArray*             pFetchRpcHandles;
248
  bool                notifyToSend;  // need to send notify STEP DONE message
249
  TSKEY               notifyTs;      // notify timestamp
250
} SExchangeInfo;
251

252
typedef struct SScanInfo {
253
  int32_t numOfAsc;
254
  int32_t numOfDesc;
255
} SScanInfo;
256

257
typedef struct SSampleExecInfo {
258
  double   sampleRatio;  // data block sample ratio, 1 by default
259
  uint32_t seed;         // random seed value
260
} SSampleExecInfo;
261

262
enum {
263
  TABLE_SCAN__TABLE_ORDER = 1,
264
  TABLE_SCAN__BLOCK_ORDER = 2,
265
};
266

267
typedef enum ETableCountState {
268
  TABLE_COUNT_STATE_NONE = 0,       // before start scan
269
  TABLE_COUNT_STATE_SCAN = 1,       // cur group scanning
270
  TABLE_COUNT_STATE_PROCESSED = 2,  // cur group processed
271
  TABLE_COUNT_STATE_END = 3,        // finish or noneed to process
272
} ETableCountState;
273

274
struct SAggSupporter {
275
  SSHashObj*     pResultRowHashTable;  // quick locate the window object for each result
276
  char*          keyBuf;               // window key buffer
277
  SDiskbasedBuf* pResultBuf;           // query result buffer based on blocked-wised disk file
278
  int32_t        resultRowSize;  // the result buffer size for each result row, with the meta data size for each row
279
  int32_t        currentPageId;  // current write page id
280
};
281

282
typedef struct {
283
  // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
284
  // current data block needs to be loaded.
285
  SInterval      interval;
286
  SAggSupporter* pAggSup;
287
  SExprSupp*     pExprSup;  // expr supporter of aggregate operator
288
} SAggOptrPushDownInfo;
289

290
typedef struct STableMetaCacheInfo {
291
  SLRUCache* pTableMetaEntryCache;  // 100 by default
292
  uint64_t   metaFetch;
293
  uint64_t   cacheHit;
294
} STableMetaCacheInfo;
295

296
typedef struct STableScanBase {
297
  STsdbReader*           dataReader;
298
  SFileBlockLoadRecorder readRecorder;
299
  SQueryTableDataCond    cond;
300
  SQueryTableDataCond    orgCond; // use for virtual super table scan
301
  SAggOptrPushDownInfo   pdInfo;
302
  SColMatchInfo          matchInfo;
303
  SReadHandle            readHandle;
304
  SExprSupp              pseudoSup;
305
  STableMetaCacheInfo    metaCache;
306
  int32_t                scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
307
  int32_t                dataBlockLoadFlag;
308
  SLimitInfo             limitInfo;
309
  // there are more than one table list exists in one task, if only one vnode exists.
310
  STableListInfo* pTableListInfo;
311
  TsdReader       readerAPI;
312
} STableScanBase;
313

314
typedef struct STableScanInfo {
315
  STableScanBase  base;
316
  SScanInfo       scanInfo;
317
  int32_t         scanTimes;
318
  SSDataBlock*    pResBlock;
319
  SHashObj*       pIgnoreTables;
320
  SSampleExecInfo sample;           // sample execution info
321
  int32_t         tableStartIndex;  // current group scan start
322
  int32_t         tableEndIndex;    // current group scan end
323
  int32_t         currentGroupId;
324
  int32_t         currentTable;
325
  int8_t          scanMode;
326
  int8_t          assignBlockUid;
327
  uint8_t         countState;  // empty table count state
328
  bool            hasGroupByTag;
329
  bool            filesetDelimited;
330
  bool            needCountEmptyTable;
331
  // for virtual super table scan
332
  SSDataBlock*    pOrgBlock;
333
  bool            ignoreTag;
334
  bool            virtualStableScan;
335
  SHashObj*       readerCache;
336
  bool            newReader;
337
  SArray*         pBlockColMap;
338
  // for virtual super table batch scan
339
  int32_t         lastBatchIdx;
340
  int32_t         currentBatchIdx;
341
  STimeWindow     lastTimeWindow;
342
  SArray*         lastColArray;
343
  SArray*         lastBlockColArray;
344
  SArray*         pBatchColMap;  // SArray<SOrgTbInfo>
345
  STimeWindow     cachedTimeWindow;
346
  SArray*         cachedTagList;
347
  uint64_t        cachedGroupId;
348
} STableScanInfo;
349

350
typedef enum ESubTableInputType {
351
  SUB_TABLE_MEM_BLOCK,
352
  SUB_TABLE_EXT_PAGES,
353
} ESubTableInputType;
354

355
typedef struct STmsSubTableInput {
356
  STsdbReader*        pReader;
357
  SQueryTableDataCond tblCond;
358
  STableKeyInfo*      pKeyInfo;
359
  bool                bInMemReader;
360
  ESubTableInputType  type;
361
  SSDataBlock*        pReaderBlock;
362

363
  SArray*      aBlockPages;
364
  SSDataBlock* pPageBlock;
365
  int32_t      pageIdx;
366

367
  int32_t      rowIdx;
368
  int64_t*     aTs;
369
  SSDataBlock* pInputBlock;
370
} STmsSubTableInput;
371

372
typedef struct SBlockOrderInfo SBlockOrderInfo;
373
typedef struct STmsSubTablesMergeInfo {
374
  SBlockOrderInfo* pTsOrderInfo;
375
  SBlockOrderInfo* pPkOrderInfo;
376

377
  int32_t                 numSubTables;
378
  STmsSubTableInput*      aInputs;
379
  SMultiwayMergeTreeInfo* pTree;
380
  int32_t                 numSubTablesCompleted;
381

382
  int32_t        numTableBlocksInMem;
383
  SDiskbasedBuf* pBlocksBuf;
384

385
  int32_t numInMemReaders;
386
} STmsSubTablesMergeInfo;
387

388
typedef struct STableMergeScanInfo {
389
  int32_t         tableStartIndex;
390
  int32_t         tableEndIndex;
391
  bool            hasGroupId;
392
  uint64_t        groupId;
393
  STableScanBase  base;
394
  int32_t         bufPageSize;
395
  uint32_t        sortBufSize;  // max buffer size for in-memory sort
396
  SArray*         pSortInfo;
397
  SSortHandle*    pSortHandle;
398
  SSDataBlock*    pSortInputBlock;
399
  SSDataBlock*    pReaderBlock;
400
  int64_t         startTs;  // sort start time
401
  SLimitInfo      limitInfo;
402
  int64_t         numOfRows;
403
  SScanInfo       scanInfo;
404
  int32_t         scanTimes;
405
  int32_t         readIdx;
406
  SSDataBlock*    pResBlock;
407
  SSampleExecInfo sample;         // sample execution info
408
  SSHashObj*      mTableNumRows;  // uid->num of table rows
409
  SHashObj*       mSkipTables;
410
  int64_t         mergeLimit;
411
  SSortExecInfo   sortExecInfo;
412
  bool            needCountEmptyTable;
413
  bool            bGroupProcessed;  // the group return data means processed
414
  bool            filesetDelimited;
415
  bool            bNewFilesetEvent;
416
  bool            bNextDurationBlockEvent;
417
  int32_t         numNextDurationBlocks;
418
  SSDataBlock*    nextDurationBlocks[2];
419
  bool            rtnNextDurationBlocks;
420
  int32_t         nextDurationBlocksIdx;
421
  bool            bSortRowId;
422

423
  STmsSubTablesMergeInfo* pSubTablesMergeInfo;
424
} STableMergeScanInfo;
425

426
typedef struct STagScanFilterContext {
427
  SHashObj* colHash;
428
  int32_t   index;
429
  SArray*   cInfoList;
430
  int32_t   code;
431
} STagScanFilterContext;
432

433
typedef struct STagScanInfo {
434
  SColumnInfo*          pCols;
435
  SSDataBlock*          pRes;
436
  SColMatchInfo         matchInfo;
437
  int32_t               curPos;
438
  SReadHandle           readHandle;
439
  STableListInfo*       pTableListInfo;
440
  uint64_t              suid;
441
  void*                 pCtbCursor;
442
  SNode*                pTagCond;
443
  SNode*                pTagIndexCond;
444
  STagScanFilterContext filterCtx;
445
  SArray*               aUidTags;     // SArray<STUidTagInfo>
446
  SArray*               aFilterIdxs;  // SArray<int32_t>
447
  SStorageAPI*          pStorageAPI;
448
  SLimitInfo            limitInfo;
449
} STagScanInfo;
450

451

452
enum {
453
  PROJECT_RETRIEVE_CONTINUE = 0x1,
454
  PROJECT_RETRIEVE_DONE = 0x2,
455
};
456

457
typedef struct SPartitionBySupporter {
458
  SArray* pGroupCols;     // group by columns, SArray<SColumn>
459
  SArray* pGroupColVals;  // current group column values, SArray<SGroupKeys>
460
  char*   keyBuf;         // group by keys for hash
461
  bool    needCalc;       // partition by column
462
} SPartitionBySupporter;
463

464
typedef struct SPartitionDataInfo {
465
  uint64_t groupId;
466
  char*    tbname;
467
  SArray*  rowIds;
468
} SPartitionDataInfo;
469

470
typedef struct STimeWindowAggSupp {
471
  TSKEY           maxTs;
472
  TSKEY           minTs;
473
  SColumnInfoData timeWindowData;  // query time window info for scalar function execution.
474
} STimeWindowAggSupp;
475

476
typedef struct STmqQueryScanInfo {
477
  SExprInfo*        pPseudoExpr;
478
  int32_t           numOfPseudoExpr;
479
  SExprSupp         tagCalSup;
480
  int32_t           primaryTsIndex;  // primary time stamp slot id
481
  SReadHandle       readHandle;
482
  SColMatchInfo     matchInfo;
483
  SHashObj*         pCol2SlotId;
484

485
  SSDataBlock* pRes;         // result SSDataBlock
486
  STqReader*   tqReader;
487

488
  STableListInfo* pTableListInfo;
489

490
  struct SOperatorInfo* pTmqScanOp;
491
  struct SOperatorInfo* pTableScanOp;
492
  // status for tmq
493
  SNodeList* pGroupTags;
494
  SNode*     pTagCond;
495
  SNode*     pTagIndexCond;
496
  SStoreTqReader    readerFn;
497
} STmqQueryScanInfo;
498

499
typedef struct {
500
  struct SVnode*       vnode;  // todo remove this
501
  SSDataBlock          pRes;   // result SSDataBlock
502
  STsdbReader*         dataReader;
503
  struct SSnapContext* sContext;
504
  SStorageAPI*         pAPI;
505
  STableListInfo*      pTableListInfo;
506
} STmqRawScanInfo;
507

508
typedef struct STableCountScanSupp {
509
  int16_t dbNameSlotId;
510
  int16_t stbNameSlotId;
511
  int16_t tbCountSlotId;
512
  bool    groupByDbName;
513
  bool    groupByStbName;
514
  char    dbNameFilter[TSDB_DB_NAME_LEN];
515
  char    stbNameFilter[TSDB_TABLE_NAME_LEN];
516
} STableCountScanSupp;
517

518
typedef struct SOptrBasicInfo {
519
  SResultRowInfo resultRowInfo;
520
  SSDataBlock*   pRes;
521
  bool           mergeResultBlock;
522
  int32_t        inputTsOrder;
523
  int32_t        outputTsOrder;
524
} SOptrBasicInfo;
525

526
typedef struct SIntervalAggOperatorInfo {
527
  SOptrBasicInfo     binfo;              // basic info
528
  SAggSupporter      aggSup;             // aggregate supporter
529
  SExprSupp          scalarSupp;         // supporter for perform scalar function
530
  SGroupResInfo      groupResInfo;       // multiple results build supporter
531
  SInterval          interval;           // interval info
532
  int32_t            primaryTsIndex;     // primary time stamp slot id from result of downstream operator.
533
  STimeWindow        win;                // query time range
534
  bool               timeWindowInterpo;  // interpolation needed or not
535
  SArray*            pInterpCols;        // interpolation columns
536
  EOPTR_EXEC_MODEL   execModel;          // operator execution model [batch model|stream model]
537
  STimeWindowAggSupp twAggSup;
538
  SArray*            pPrevValues;  //  SArray<SGroupKeys> used to keep the previous not null value for interpolation.
539
  bool               cleanGroupResInfo;
540
  struct SOperatorInfo* pOperator;
541
  // for limit optimization
542
  bool          limited;
543
  int64_t       limit;
544
  bool          slimited;
545
  int64_t       slimit;
546
  uint64_t      curGroupId;  // initialize to UINT64_MAX
547
  uint64_t      handledGroupNum;
548
  BoundedQueue* pBQ;
549
} SIntervalAggOperatorInfo;
550

551
typedef struct SMergeAlignedIntervalAggOperatorInfo {
552
  SIntervalAggOperatorInfo* intervalAggOperatorInfo;
553

554
  uint64_t     groupId;  // current groupId
555
  int64_t      curTs;    // current ts
556
  SSDataBlock* prefetchedBlock;
557
  SResultRow*  pResultRow;
558
} SMergeAlignedIntervalAggOperatorInfo;
559

560
typedef struct SOpCheckPointInfo {
561
  uint16_t  checkPointId;
562
  SHashObj* children;  // key:child id
563
} SOpCheckPointInfo;
564

565
typedef struct SDataGroupInfo {
566
  uint64_t groupId;
567
  int64_t  numOfRows;
568
  SArray*  pPageList;
569
  SArray*  blockForNotLoaded;   // SSDataBlock that data is not loaded
570
  int32_t  offsetForNotLoaded;  // read offset for SSDataBlock that data is not loaded
571
} SDataGroupInfo;
572

573
typedef struct SWindowRowsSup {
574
  STimeWindow win;
575
  TSKEY       prevTs;  // previous timestamp, used for window aggregation
576
  int32_t     startRowIndex;
577
  int32_t     numOfRows;
578
  uint64_t    groupId;
579
  uint32_t    numNullRows;  // number of continuous rows with null state col
580
  TSKEY       lastTs;  // last row's timestamp, used for checking duplicated ts
581
} SWindowRowsSup;
582

583
// return true if there are continuous rows with null state col
584
// state window operator needs to handle these rows specially
585
static inline bool hasContinuousNullRows(SWindowRowsSup* pRowSup) {
2,147,483,647✔
586
  return pRowSup->numNullRows > 0;
2,147,483,647✔
587
}
588

589
// reset on initialization or found of a row with non-null state col
590
static inline void resetNumNullRows(SWindowRowsSup* pRowSup) {
2,147,483,647✔
591
  pRowSup->numNullRows = 0;
2,147,483,647✔
592
}
2,147,483,647✔
593

594
static inline void resetWindowRowsSup(SWindowRowsSup* pRowSup) {
13,308,356✔
595
  if (NULL == pRowSup) {
13,308,356✔
UNCOV
596
    return;
×
597
  }
598

599
  pRowSup->win.skey = pRowSup->win.ekey = 0;
13,308,356✔
600
  pRowSup->prevTs = INT64_MIN;
13,307,284✔
601
  pRowSup->startRowIndex = pRowSup->groupId = 0;
13,307,284✔
602
  pRowSup->numOfRows = pRowSup->numNullRows = 0;
13,307,820✔
603
}
604

605
typedef int32_t (*AggImplFn)(struct SOperatorInfo* pOperator, SSDataBlock* pBlock);
606

607
typedef struct SSessionAggOperatorInfo {
608
  SOptrBasicInfo        binfo;
609
  SAggSupporter         aggSup;
610
  SExprSupp             scalarSupp;  // supporter for perform scalar function
611
  SGroupResInfo         groupResInfo;
612
  SWindowRowsSup        winSup;
613
  bool                  reptScan;  // next round scan
614
  int64_t               gap;       // session window gap
615
  int32_t               tsSlotId;  // primary timestamp slot id
616
  STimeWindowAggSupp    twAggSup;
617
  struct SOperatorInfo* pOperator;
618
  bool                  cleanGroupResInfo;
619
} SSessionAggOperatorInfo;
620

621
typedef struct SStateWindowOperatorInfo {
622
  SOptrBasicInfo        binfo;
623
  SAggSupporter         aggSup;
624
  SExprSupp             scalarSup;
625
  SGroupResInfo         groupResInfo;
626
  SWindowRowsSup        winSup;
627
  SColumn               stateCol;
628
  bool                  hasKey;    // has key means the state window has started
629
  SStateKeys            stateKey;
630
  int32_t               tsSlotId;  // primary timestamp column slot id
631
  STimeWindowAggSupp    twAggSup;
632
  struct SOperatorInfo* pOperator;
633
  bool                  cleanGroupResInfo;
634
  STrueForInfo          trueForInfo;
635
  EStateWinExtendOption extendOption;
636
} SStateWindowOperatorInfo;
637

638

639
typedef struct SEventWindowOperatorInfo {
640
  SOptrBasicInfo     binfo;
641
  SAggSupporter      aggSup;
642
  SExprSupp          scalarSup;
643
  SWindowRowsSup     winSup;
644
  int32_t            tsSlotId;  // primary timestamp column slot id
645
  STimeWindowAggSupp twAggSup;
646
  uint64_t           groupId;  // current group id, used to identify the data block from different groups
647
  SFilterInfo*       pStartCondInfo;
648
  SFilterInfo*       pEndCondInfo;
649
  bool               inWindow;
650
  SResultRow*        pRow;
651
  SSDataBlock*       pPreDataBlock;
652
  struct SOperatorInfo*     pOperator;
653
  STrueForInfo              trueForInfo;
654
} SEventWindowOperatorInfo;
655

656
#define OPTR_IS_OPENED(_optr)  (((_optr)->status & OP_OPENED) == OP_OPENED)
657
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
658
#define OPTR_CLR_OPENED(_optr) ((_optr)->status &= ~OP_OPENED)
659

660
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
661

662
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
663
                                   SExecTaskInfo* pTaskInfo);
664
void    cleanupQueriedTableScanInfo(void* p);
665

666
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
667
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
668

669
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
670
void checkIndefRowsFuncs(SExprSupp* pSup);
671
void    cleanupExprSupp(SExprSupp* pSup);
672
void    cleanupExprSuppWithoutFilter(SExprSupp* pSupp);
673

674
void     cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
675
                                    SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
676
void     cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
677
                           SAggSupporter *pAggSup, bool cleanHashmap);
678
void     cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
679
                                      SGroupResInfo* pGroupResInfo);
680

681
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
682
                   const char* pkey, void* pState, SFunctionStateStore* pStore);
683
void    cleanupAggSup(SAggSupporter* pAggSup);
684

685
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
686

687
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
688
                            SDiskbasedBuf* pBuf);
689

690
/**
691
 * @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
692
 */
693
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
694
                              SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
695
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
696
                        SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, STrueForInfo *pTrueForInfo);
697

698
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
699
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
700
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
701
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
702
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
703

704
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
705
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
706

707
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
708
                             int32_t numOfExprs);
709
int32_t      setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);                             
710
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan);
711
void    updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
712
                             struct SOperatorInfo* pOperator);
713

714
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
715
int32_t     getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz);
716

717
extern void doDestroyExchangeOperatorInfo(void* param);
718

719
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet);
720
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
721
                               int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
722

723
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
724
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
725
                            int32_t* rowEntryInfoOffset);
726
void    clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
727

728
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
729
                                   int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
730
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
731

732
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
733
                              int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams);
734
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
735
                                        SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
736
                                        const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc);
737

738
int32_t setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
739
                          bool createDummyCol);
740

741
int32_t checkForQueryBuf(size_t numOfTables);
742

743
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
744

745
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
746
                                int32_t order);
747
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
748
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
749
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
750
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
751
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
752
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
753
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
754
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
755

756
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
757

758
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
759
                        SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
760

761
bool    groupbyTbname(SNodeList* pGroupList);
762
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
763
                               int64_t* pData);
764
SExprInfo*   createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
765

766
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
767
                                 SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
768
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
769

770
void    streamOpReleaseState(struct SOperatorInfo* pOperator);
771
void    streamOpReloadState(struct SOperatorInfo* pOperator);
772
void    clearGroupResInfo(SGroupResInfo* pGroupResInfo);
773
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
774
                        SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
775
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
776
void    initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
777
void    getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
778
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
779
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
780
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
781
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
782
void    removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
783
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
784
                           int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
785
                           struct SOperatorInfo* pOperator, int64_t winDelta);
786
void    setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
787
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
788
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
789
void    doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
790
                               SGroupResInfo* pGroupResInfo);
791
void    doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
792
                             SSDataBlock* pBlock, SArray* pSessionKeys);
793
void    resetWinRange(STimeWindow* winRange);
794
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
795
void    resetUnCloseSessionWinInfo(SSHashObj* winMap);
796
void    setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
797
void    destroyFlusedPos(void* pRes);
798
bool    isIrowtsPseudoColumn(SExprInfo* pExprInfo);
799
bool    isIsfilledPseudoColumn(SExprInfo* pExprInfo);
800
bool    isInterpFunc(SExprInfo* pExprInfo);
801
bool    isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo);
802

803
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
804
void*   decodeSSessionKey(void* buf, SSessionKey* key);
805
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen);
806
void*   decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen);
807
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
808
void*   decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
809

810
void    destroyOperatorParamValue(void* pValues);
811
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
812
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
813
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type);
814
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
815
                                          int32_t srcOpType, TSKEY notifyTs);
816
void    freeExchangeGetBasicOperatorParam(void* pParam);
817
void    freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree);
818
int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
819
                                       SSDataBlock** pResBlock);
820

821
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key);
822
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
823
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
824

825
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo);
826
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd);
827
bool compareVal(const char* v, const SStateKeys* pKey);
828
bool inWinRange(STimeWindow* range, STimeWindow* cur);
829

830
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
831
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
832
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
833
bool    getIgoreNullRes(SExprSupp* pExprSup);
834
bool    checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
835
STrueForInfo* getTrueForInfo(struct SOperatorInfo* pOperator);
836

837
void    destroyTmqScanOperatorInfo(void* param);
838
void    resetBasicOperatorState(SOptrBasicInfo* pBasicInfo);
839

840
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
841

842
#ifdef __cplusplus
843
}
844
#endif
845

846
#endif  // TDENGINE_EXECUTORINT_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc