• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3599

08 Feb 2025 11:23AM UTC coverage: 1.77% (-61.6%) from 63.396%
#3599

push

travis-ci

web-flow
Merge pull request #29712 from taosdata/fix/TD-33652-3.0

fix: reduce write rows from 30w to 3w

3776 of 278949 branches covered (1.35%)

Branch coverage included in aggregate %.

6012 of 274147 relevant lines covered (2.19%)

1642.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/include/libs/function/function.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifndef TDENGINE_FUNCTION_H
17
#define TDENGINE_FUNCTION_H
18

19
#ifdef __cplusplus
20
extern "C" {
21
#endif
22

23
#include "tcommon.h"
24
#include "tsimplehash.h"
25
#include "tvariant.h"
26
#include "functionResInfo.h"
27

28
struct SqlFunctionCtx;
29
struct SResultRowEntryInfo;
30

31
struct SFunctionNode;
32
struct SExprSupp;
33
typedef struct SScalarParam SScalarParam;
34
typedef struct SStreamState SStreamState;
35

36
typedef struct SFuncExecEnv {
37
  int32_t calcMemSize;
38
} SFuncExecEnv;
39

40
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
41
typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx);
42
typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
43
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
44
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
45
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
46
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
47
typedef int32_t (*FExecDecode)(struct SqlFunctionCtx *pCtx, const char *buf, struct SResultRowEntryInfo *pResultCellInfo, int32_t version);
48
typedef int32_t (*processFuncByRow)(SArray* pCtx);  // array of SqlFunctionCtx
49

50
typedef struct SScalarFuncExecFuncs {
51
  FExecGetEnv        getEnv;
52
  FScalarExecProcess process;
53
} SScalarFuncExecFuncs;
54

55
typedef struct SFuncExecFuncs {
56
  FExecGetEnv      getEnv;
57
  FExecInit        init;
58
  FExecProcess     process;
59
  FExecFinalize    finalize;
60
  FExecCombine     combine;
61
  FExecCleanUp     cleanup;
62
  FExecDecode      decode;
63
  processFuncByRow processFuncByRow;
64
} SFuncExecFuncs;
65

66
#define MAX_INTERVAL_TIME_WINDOW 10000000  // maximum allowed time windows in final results
67

68
#define TOP_BOTTOM_QUERY_LIMIT    100
69
#define FUNCTIONS_NAME_MAX_LENGTH 32
70

71
#define FUNCTION_RESULT_INFO_VERSION 1
72

73
typedef struct SResultRowEntryInfo {
74
  bool     initialized : 1;  // output buffer has been initialized
75
  bool     complete : 1;     // query has completed
76
  uint8_t  isNullRes : 6;    // the result is null
77
  uint16_t numOfRes;         // num of output result in current buffer. NOT NULL RESULT
78
} SResultRowEntryInfo;
79

80
// determine the real data need to calculated the result
81
enum {
82
  BLK_DATA_NOT_LOAD = 0x0,
83
  BLK_DATA_SMA_LOAD = 0x1,
84
  BLK_DATA_DATA_LOAD = 0x3,
85
  BLK_DATA_FILTEROUT = 0x4,  // discard current data block since it is not qualified for filter
86
};
87

88
enum {
89
  MAIN_SCAN = 0x0u,
90
  REVERSE_SCAN = 0x1u,  // todo remove it
91
  PRE_SCAN = 0x2u,      // pre-scan belongs to the main scan and occurs before main scan
92
};
93

94
struct SPoint1;
95
struct SqlFunctionCtx;
96
struct SResultRowEntryInfo;
97

98
// for selectivity query, the corresponding tag value is assigned if the data is qualified
99
typedef struct SSubsidiaryResInfo {
100
  int16_t                 num;
101
  int32_t                 rowLen;
102
  char                   *buf;  // serialize data buffer
103
  struct SqlFunctionCtx **pCtx;
104
} SSubsidiaryResInfo;
105

106
typedef struct SResultDataInfo {
107
  int16_t  precision;
108
  int16_t  scale;
109
  int16_t  type;
110
  uint16_t bytes;
111
  int32_t  interBufSize;
112
} SResultDataInfo;
113

114
#define GET_RES_INFO(ctx)        ((ctx)->resultInfo)
115
#define GET_ROWCELL_INTERBUF(_c) ((void *)((char *)(_c) + sizeof(SResultRowEntryInfo)))
116

117
typedef struct SInputColumnInfoData {
118
  int32_t           totalRows;        // total rows in current columnar data
119
  int32_t           startRowIndex;    // handle started row index
120
  int64_t           numOfRows;        // the number of rows needs to be handled
121
  bool              blankFill;        // fill blank data to block for empty table
122
  int32_t           numOfInputCols;   // PTS is not included
123
  bool              colDataSMAIsSet;  // if agg is set or not
124
  SColumnInfoData  *pPTS;             // primary timestamp column
125
  SColumnInfoData  *pPrimaryKey;      // primary key column
126
  SColumnInfoData **pData;
127
  SColumnDataAgg  **pColumnDataAgg;
128
  uint64_t uid;  // table uid, used to set the tag value when building the final query result for selectivity functions.
129
} SInputColumnInfoData;
130

131
typedef struct SSerializeDataHandle {
132
  struct SDiskbasedBuf *pBuf;
133
  int32_t               currentPage;
134
  SStreamState          *pState;
135
} SSerializeDataHandle;
136

137
// incremental state storage
138

139
typedef struct SBackendCfWrapper {
140
  void          *rocksdb;
141
  void         **pHandle;
142
  void          *writeOpts;
143
  void          *readOpts;
144
  void         **cfOpts;
145
  void          *dbOpt;
146
  void          *param;
147
  void          *env;
148
  SListNode     *pComparNode;
149
  void          *pBackend;
150
  void          *compactFactory;
151
  TdThreadRwlock rwLock;
152
  bool           remove;
153
  int64_t        backendId;
154
  char           idstr[64];
155
} SBackendCfWrapper;
156

157
typedef struct STdbState {
158
  SBackendCfWrapper *pBackendCfWrapper;
159
  int64_t            backendCfWrapperId;
160
  char               idstr[64];
161

162
  struct SStreamTask *pOwner;
163
  void               *db;
164
  void               *pStateDb;
165
  void               *pFuncStateDb;
166
  void               *pFillStateDb;  // todo refactor
167
  void               *pSessionStateDb;
168
  void               *pParNameDb;
169
  void               *pParTagDb;
170
  void               *txn;
171
} STdbState;
172

173
typedef struct SResultRowStore {
174
  int32_t (*resultRowPut)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
175
  int32_t (*resultRowGet)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
176
} SResultRowStore;
177

178
struct SStreamState {
179
  STdbState               *pTdbState;
180
  struct SStreamFileState *pFileState;
181
  int32_t                  number;
182
  SSHashObj               *parNameMap;
183
  int32_t                  taskId;
184
  int64_t                  streamId;
185
  int64_t                  streamBackendRid;
186
  int8_t                   dump;
187
  int32_t                  tsIndex;
188
  SResultRowStore          pResultRowStore;
189
  struct SExprSupp        *pExprSupp;
190
};
191

192
typedef struct SFunctionStateStore {
193
  int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
194
  int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
195
} SFunctionStateStore;
196

197
typedef struct SFuncInputRow {
198
  TSKEY ts;
199
  bool isDataNull;
200
  char* pData;
201
  char* pPk;
202

203
  SSDataBlock* block; // prev row block or src block
204
  int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock
205

206
  //TODO:
207
  // int32_t startOffset; // for diff, derivative
208
  // SPoint1 startPoint; // for twa
209
} SFuncInputRow;
210

211
typedef struct SFuncInputRowIter {
212
  bool  hasPrev;
213
 
214
  SInputColumnInfoData* pInput;
215
  SColumnInfoData* pDataCol;
216
  SColumnInfoData* pPkCol;
217
  TSKEY* tsList;
218
  int32_t rowIndex;
219
  int32_t inputEndIndex;
220
  SSDataBlock* pSrcBlock;
221

222
  TSKEY prevBlockTsEnd;
223
  bool prevIsDataNull;
224
  char* pPrevData;
225
  char* pPrevPk;
226
  SSDataBlock* pPrevRowBlock; // pre one row block
227

228
  uint64_t groupId;
229
  bool hasGroupId;
230

231
  bool finalRow;
232
} SFuncInputRowIter;
233

234
// sql function runtime context
235
typedef struct SqlFunctionCtx {
236
  SInputColumnInfoData input;
237
  SResultDataInfo      resDataInfo;
238
  uint32_t             order;          // data block scanner order: asc|desc
239
  uint8_t              isPseudoFunc;   // denote current function is pseudo function or not [added for perf reason]
240
  uint8_t              isNotNullFunc;  // not return null value.
241
  uint8_t              scanFlag;       // record current running step, default: 0
242
  int16_t              functionId;     // function id
243
  char                *pOutput;        // final result output buffer, point to sdata->data
244
  // input parameter, e.g., top(k, 20), the number of results of top query is kept in param
245
  SFunctParam *param;
246
  // corresponding output buffer for timestamp of each result, e.g., diff/csum
247
  SColumnInfoData     *pTsOutput;
248
  int32_t              numOfParams;
249
  int32_t              offset;
250
  SResultRowEntryInfo *resultInfo;
251
  SSubsidiaryResInfo   subsidiaries;
252
  SPoint1              start;
253
  SPoint1              end;
254
  SFuncExecFuncs       fpSet;
255
  SScalarFuncExecFuncs sfp;
256
  struct SExprInfo    *pExpr;
257
  struct SSDataBlock  *pSrcBlock;
258
  struct SSDataBlock  *pDstBlock;  // used by indefinite rows function to set selectivity
259
  SSerializeDataHandle saveHandle;
260
  int32_t              exprIdx;
261
  char                *udfName;
262
  SFunctionStateStore *pStore;
263
  bool                 hasPrimaryKey;
264
  SFuncInputRowIter    rowIter;
265
  bool                 bInputFinished;
266
  bool                 hasWindowOrGroup; // denote that the function is used with time window or group
267
  bool                 needCleanup; // denote that the function need to be cleaned up
268
} SqlFunctionCtx;
269

270
typedef struct tExprNode {
271
  int32_t nodeType;
272
  union {
273
    struct {                                                          // function node
274
      char                  functionName[FUNCTIONS_NAME_MAX_LENGTH];  // todo refactor
275
      int32_t               functionId;
276
      int32_t               num;
277
      struct SFunctionNode *pFunctNode;
278
      int32_t               functionType;
279
    } _function;
280

281
    struct {
282
      struct SNode *pRootNode;
283
    } _optrRoot;
284
  };
285
} tExprNode;
286

287
struct SScalarParam {
288
  bool             colAlloced;
289
  SColumnInfoData *columnData;
290
  SHashObj        *pHashFilter;
291
  SHashObj        *pHashFilterOthers;
292
  int32_t          hashValueType;
293
  void            *param;  // other parameter, such as meta handle from vnode, to extract table name/tag value
294
  int32_t          numOfRows;
295
  int32_t          numOfQualified;  // number of qualified elements in the final results
296
  timezone_t       tz;
297
  void            *charsetCxt;
298
};
299

300
static inline void setTzCharset(SScalarParam* param, timezone_t tz, void* charsetCxt){
×
301
  if (param == NULL) return;
×
302
  param->tz = tz;
×
303
  param->charsetCxt = charsetCxt;
×
304
}
305

306
#define cleanupResultRowEntry(p)  p->initialized = false
307
#define isRowEntryCompleted(p)   (p->complete)
308
#define isRowEntryInitialized(p) (p->initialized)
309

310
typedef struct SPoint {
311
  int64_t key;
312
  void   *val;
313
} SPoint;
314

315
void taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint *point1, SPoint *point2,
316
                                      int32_t inputType);
317

318
#define LEASTSQUARES_DOUBLE_ITEM_LENGTH 25
319
#define LEASTSQUARES_BUFF_LENGTH 128
320
#define DOUBLE_PRECISION_DIGITS "16e"
321

322
#ifdef __cplusplus
323
}
324
#endif
325

326
#endif  // TDENGINE_FUNCTION_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc