• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.24
/source/libs/function/src/functionMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17

18
#include "builtins.h"
19
#include "builtinsimpl.h"
20
#include "functionMgtInt.h"
21
#include "taos.h"
22
#include "taoserror.h"
23
#include "thash.h"
24
#include "tudf.h"
25

26
typedef struct SFuncMgtService {
27
  SHashObj* pFuncNameHashTable;
28
} SFuncMgtService;
29

30
static SFuncMgtService gFunMgtService;
31
static TdThreadOnce    functionHashTableInit = PTHREAD_ONCE_INIT;
32
static int32_t         initFunctionCode = 0;
33

34
static void doInitFunctionTable() {
15,720✔
35
  gFunMgtService.pFuncNameHashTable =
15,720✔
36
      taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
15,720✔
37
  if (NULL == gFunMgtService.pFuncNameHashTable) {
15,720!
38
    initFunctionCode = terrno;
×
39
    return;
×
40
  }
41

42
  for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
2,735,280✔
43
    if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name,
2,719,560!
44
                                         strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
2,719,560✔
45
      initFunctionCode = terrno;
×
46
      return;
×
47
    }
48
  }
49
}
50

51
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
296,392,952✔
52
  if (fmIsUserDefinedFunc(funcId)) {
296,392,952✔
53
    return FUNC_MGT_AGG_FUNC == classification
54
               ? FUNC_AGGREGATE_UDF_ID == funcId
55
               : (FUNC_MGT_SCALAR_FUNC == classification ? FUNC_SCALAR_UDF_ID == funcId : false);
58,202✔
56
  }
57
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
296,340,291!
58
    return false;
69,466,739✔
59
  }
60
  return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
226,873,552✔
61
}
62

63
int32_t fmFuncMgtInit() {
15,720✔
64
  (void)taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
15,720✔
65
  return initFunctionCode;
15,720✔
66
}
67

68
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
4,564,613✔
69
  if (NULL != gFunMgtService.pFuncNameHashTable) {
4,564,613✔
70
    void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc->functionName, strlen(pFunc->functionName));
4,559,733✔
71
    if (NULL != pVal) {
4,559,780✔
72
      pFunc->funcId = *(int32_t*)pVal;
4,559,560✔
73
      pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
4,559,560✔
74
      return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
4,559,560✔
75
    }
76
    return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
220✔
77
  }
78
  for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
99,396!
79
    if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
99,414✔
80
      pFunc->funcId = i;
4,898✔
81
      pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
4,898✔
82
      return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, pMsg, msgLen);
4,898✔
83
    }
84
  }
85
  return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
×
86
}
87

88
EFuncReturnRows fmGetFuncReturnRows(SFunctionNode* pFunc) {
85,455✔
89
  if (NULL != funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc) {
85,455✔
90
    return funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc(pFunc);
49,229✔
91
  }
92
  return (fmIsIndefiniteRowsFunc(pFunc->funcId) || fmIsMultiRowsFunc(pFunc->funcId)) ? FUNC_RETURN_ROWS_INDEFINITE
36,226!
93
                                                                                     : FUNC_RETURN_ROWS_NORMAL;
36,225!
94
}
95

96
bool fmIsBuiltinFunc(const char* pFunc) {
21✔
97
  return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
21✔
98
}
99

100
EFunctionType fmGetFuncType(const char* pFunc) {
2,982,009✔
101
  void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
2,982,009✔
102
  if (NULL != pVal) {
2,982,044✔
103
    return funcMgtBuiltins[*(int32_t*)pVal].type;
2,981,831✔
104
  }
105
  return FUNCTION_TYPE_UDF;
213✔
106
}
107

108
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
399,027✔
109
  if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
399,027!
UNCOV
110
    return FUNC_DATA_REQUIRED_DATA_LOAD;
×
111
  }
112
  if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
399,088!
113
    return FUNC_DATA_REQUIRED_DATA_LOAD;
×
114
  }
115
  return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
399,088✔
116
}
117

118
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo) {
1,390,572✔
119
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
1,390,572!
120
    return FUNC_DATA_REQUIRED_DATA_LOAD;
×
121
  }
122

123
  const char* name = funcMgtBuiltins[funcId].name;
1,390,592✔
124
  if ((strcmp(name, "_group_key") == 0) || (strcmp(name, "_select_value") == 0)) {
1,390,592✔
125
    return FUNC_DATA_REQUIRED_NOT_LOAD;
117,238✔
126
    ;
127
  }
128

129
  if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
1,273,354✔
130
    return FUNC_DATA_REQUIRED_DATA_LOAD;
1,259,685✔
131
  } else {
132
    return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo);
13,669✔
133
  }
134
}
135

136
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
6,924,448✔
137
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
6,924,448!
138
    return TSDB_CODE_FAILED;
×
139
  }
140
  pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
6,927,579✔
141
  pFpSet->init = funcMgtBuiltins[funcId].initFunc;
6,927,579✔
142
  pFpSet->process = funcMgtBuiltins[funcId].processFunc;
6,927,579✔
143
  pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
6,927,579✔
144
  pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
6,927,579✔
145
  pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
6,927,579✔
146
  pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc;
6,927,579✔
147
  return TSDB_CODE_SUCCESS;
6,927,579✔
148
}
149

150
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
32✔
151
#ifdef USE_UDF
152
  if (!fmIsUserDefinedFunc(funcId)) {
32!
153
    return TSDB_CODE_FAILED;
×
154
  }
155
  pFpSet->getEnv = udfAggGetEnv;
32✔
156
  pFpSet->init = udfAggInit;
32✔
157
  pFpSet->process = udfAggProcess;
32✔
158
  pFpSet->finalize = udfAggFinalize;
32✔
159
  return TSDB_CODE_SUCCESS;
32✔
160
#else
161
  TAOS_RETURN(TSDB_CODE_OPS_NOT_SUPPORT);
162
#endif
163
}
164

165
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
9,783,342✔
166
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
9,783,342!
UNCOV
167
    return TSDB_CODE_FAILED;
×
168
  }
169
  pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
9,783,819✔
170
  pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
9,783,819✔
171
  return TSDB_CODE_SUCCESS;
9,783,819✔
172
}
173

174
bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_AGG_FUNC); }
20,383,215✔
175

176
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
12,889,385✔
177

178
bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId) && !fmIsPseudoColumnFunc(funcId); }
6,206,569✔
179

180
bool fmIsSelectColsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_COLS_FUNC); }
432,315✔
181

182
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }
30,884,971✔
183

184
bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); }
6,000,470✔
185

186
bool fmIsDateTimeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_DATETIME_FUNC); }
3,117,675✔
187

188
bool fmIsPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC); }
31,191,834✔
189

190
bool fmIsScanPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCAN_PC_FUNC); }
18,941,171✔
191

192
bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_WINDOW_PC_FUNC); }
14,985,892✔
193

194
bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); }
414,681✔
195

196
bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); }
14,034,264✔
197

198
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
782,122✔
199
  return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
782,122✔
200
}
201

202
bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
241,612✔
203
  return isSpecificClassifyFunc(funcId, FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED);
241,612✔
204
}
205

206
bool fmIsMultiResFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC); }
6,032,493✔
207

208
bool fmIsRepeatScanFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_REPEAT_SCAN_FUNC); }
7,149,100✔
209

210
bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
335,845,234✔
211

212
bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); }
3,127,492✔
213

214
bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERVAL_INTERPO_FUNC); }
24,924,586✔
215

216
bool fmIsForbidStreamFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC); }
3,118,702✔
217

218
bool fmIsSystemInfoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SYSTEM_INFO_FUNC); }
3,146,712✔
219

220
bool fmIsImplicitTsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IMPLICIT_TS_FUNC); }
59,346,197✔
221

222
bool fmIsClientPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CLIENT_PC_FUNC); }
3,143,988✔
223

224
bool fmIsMultiRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_ROWS_FUNC); }
6,043,240✔
225

226
bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_KEEP_ORDER_FUNC); }
1,204,844✔
227

228
bool fmIsCumulativeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CUMULATIVE_FUNC); }
447,411✔
229

230
bool fmIsForbidSysTableFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_SYSTABLE_FUNC); }
3,118,693✔
231

232
bool fmIsInterpFunc(int32_t funcId) {
6,203,930✔
233
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
6,203,930!
234
    return false;
381✔
235
  }
236
  return FUNCTION_TYPE_INTERP == funcMgtBuiltins[funcId].type;
6,203,549✔
237
}
238

239
bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); }
6,042,285✔
240

241
bool fmIsForecastFunc(int32_t funcId) {
5,967,816✔
242
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
5,967,816!
243
    return false;
384✔
244
  }
245
  return FUNCTION_TYPE_FORECAST == funcMgtBuiltins[funcId].type;
5,967,432✔
246
}
247

248
bool fmIsForecastPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORECAST_PC_FUNC); }
6,052,470✔
249

250
bool fmIsLastRowFunc(int32_t funcId) {
97,449✔
251
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
97,449!
252
    return false;
×
253
  }
254
  return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type;
97,449✔
255
}
256

UNCOV
257
bool fmIsLastFunc(int32_t funcId) {
×
UNCOV
258
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
×
259
    return false;
×
260
  }
UNCOV
261
  return FUNCTION_TYPE_LAST == funcMgtBuiltins[funcId].type;
×
262
}
263

264
bool fmIsNotNullOutputFunc(int32_t funcId) {
10,174,519✔
265
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
10,174,519!
UNCOV
266
    return false;
×
267
  }
268
  return FUNCTION_TYPE_LAST == funcMgtBuiltins[funcId].type ||
20,144,182✔
269
         FUNCTION_TYPE_LAST_PARTIAL == funcMgtBuiltins[funcId].type ||
9,966,735✔
270
         FUNCTION_TYPE_LAST_MERGE == funcMgtBuiltins[funcId].type ||
9,365,295✔
271
         FUNCTION_TYPE_FIRST == funcMgtBuiltins[funcId].type ||
9,064,934✔
272
         FUNCTION_TYPE_FIRST_PARTIAL == funcMgtBuiltins[funcId].type ||
8,884,040✔
273
         FUNCTION_TYPE_FIRST_MERGE == funcMgtBuiltins[funcId].type ||
8,649,786✔
274
         FUNCTION_TYPE_COUNT == funcMgtBuiltins[funcId].type ||
8,532,515✔
275
         FUNCTION_TYPE_HYPERLOGLOG == funcMgtBuiltins[funcId].type ||
6,730,643✔
276
         FUNCTION_TYPE_HYPERLOGLOG_PARTIAL == funcMgtBuiltins[funcId].type ||
26,810,024✔
277
         FUNCTION_TYPE_HYPERLOGLOG_MERGE == funcMgtBuiltins[funcId].type;
6,665,842✔
278
}
279

280
bool fmIsSelectValueFunc(int32_t funcId) {
696,352✔
281
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
696,352!
282
    return false;
×
283
  }
284
  return FUNCTION_TYPE_SELECT_VALUE == funcMgtBuiltins[funcId].type;
696,352✔
285
}
286

287
bool fmIsGroupKeyFunc(int32_t funcId) {
187,070,448✔
288
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
187,070,448!
289
    return false;
×
290
  }
291
  return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type;
187,081,701✔
292
}
293

294
bool fmisSelectGroupConstValueFunc(int32_t funcId) {
75✔
295
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
75!
296
    return false;
×
297
  }
298
  return FUNCTION_TYPE_GROUP_CONST_VALUE == funcMgtBuiltins[funcId].type;
75✔
299
}
300

301
bool fmIsElapsedFunc(int32_t funcId) {
13,991,424✔
302
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
13,991,424!
UNCOV
303
    return false;
×
304
  }
305
  return FUNCTION_TYPE_ELAPSED == funcMgtBuiltins[funcId].type;
13,991,474✔
306
}
307

308
bool fmIsBlockDistFunc(int32_t funcId) {
3,117,688✔
309
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
3,117,688!
310
    return false;
158✔
311
  }
312
  return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
3,117,530✔
313
}
314

315
bool fmIsDBUsageFunc(int32_t funcId) {
3,117,664✔
316
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
3,117,664!
317
    return false;
167✔
318
  }
319
  return FUNCTION_TYPE_DB_USAGE == funcMgtBuiltins[funcId].type;
3,117,497✔
320
}
321

322
bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); }
4,611,017✔
323

324
bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); }
2,537✔
325

326
void fmFuncMgtDestroy() {
15,721✔
327
  void* m = gFunMgtService.pFuncNameHashTable;
15,721✔
328
  if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
15,721!
329
    taosHashCleanup(m);
15,720✔
330
  }
331
}
15,721✔
332

333
#ifdef BUILD_NO_CALL
334
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
335
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
336
    return TSDB_CODE_FAILED;
337
  }
338
  pFpSet->process = funcMgtBuiltins[funcId].invertFunc;
339
  return TSDB_CODE_SUCCESS;
340
}
341

342
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet) {
343
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
344
    return TSDB_CODE_FAILED;
345
  }
346
  pFpSet->process = funcMgtBuiltins[funcId].processFunc;
347
  return TSDB_CODE_SUCCESS;
348
}
349

350
bool fmIsInvertible(int32_t funcId) {
351
  bool res = false;
352
  switch (funcMgtBuiltins[funcId].type) {
353
    case FUNCTION_TYPE_COUNT:
354
    case FUNCTION_TYPE_SUM:
355
    case FUNCTION_TYPE_STDDEV:
356
    case FUNCTION_TYPE_AVG:
357
    case FUNCTION_TYPE_WSTART:
358
    case FUNCTION_TYPE_WEND:
359
    case FUNCTION_TYPE_WDURATION:
360
      res = true;
361
      break;
362
    default:
363
      break;
364
  }
365
  return res;
366
}
367
#endif
368

369
// function has same input/output type
370
bool fmIsSameInOutType(int32_t funcId) {
321,880✔
371
  bool res = false;
321,880✔
372
  switch (funcMgtBuiltins[funcId].type) {
321,880✔
373
    case FUNCTION_TYPE_MAX:
109,948✔
374
    case FUNCTION_TYPE_MIN:
375
    case FUNCTION_TYPE_TOP:
376
    case FUNCTION_TYPE_BOTTOM:
377
    case FUNCTION_TYPE_FIRST:
378
    case FUNCTION_TYPE_LAST:
379
    case FUNCTION_TYPE_SAMPLE:
380
    case FUNCTION_TYPE_TAIL:
381
    case FUNCTION_TYPE_UNIQUE:
382
      res = true;
109,948✔
383
      break;
109,948✔
384
    default:
211,932✔
385
      break;
211,932✔
386
  }
387
  return res;
321,880✔
388
}
389

390
bool fmIsConstantResFunc(SFunctionNode* pFunc) {
63,499✔
391
  SNode* pNode;
392
  FOREACH(pNode, pFunc->pParameterList) {
64,129✔
393
    if (nodeType(pNode) != QUERY_NODE_VALUE) {
3,647✔
394
      return false;
3,017✔
395
    }
396
  }
397
  return true;
60,482✔
398
}
399

400
bool fmIsSkipScanCheckFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC); }
241,586✔
401

402
bool fmIsPrimaryKeyFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PRIMARY_KEY_FUNC); }
2,556,293✔
403
void getLastCacheDataType(SDataType* pType, int32_t pkBytes) {
3,130✔
404
  // TODO: do it later.
405
  pType->bytes = getFirstLastInfoSize(pType->bytes, pkBytes) + VARSTR_HEADER_SIZE;
3,130✔
406
  pType->type = TSDB_DATA_TYPE_BINARY;
3,130✔
407
}
3,130✔
408

409
static int32_t getFuncInfo(SFunctionNode* pFunc) {
1,027,014✔
410
  char msg[128] = {0};
1,027,014✔
411
  return fmGetFuncInfo(pFunc, msg, sizeof(msg));
1,027,014✔
412
}
413

414
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** ppFunc) {
110,437✔
415
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
110,437✔
416
  if (NULL == *ppFunc) {
110,437!
417
    return code;
×
418
  }
419
  (void)snprintf((*ppFunc)->functionName, sizeof((*ppFunc)->functionName), "%s", pName);
110,437✔
420
  (*ppFunc)->pParameterList = pParameterList;
110,437✔
421
  code = getFuncInfo((*ppFunc));
110,437✔
422
  if (TSDB_CODE_SUCCESS != code) {
110,437!
UNCOV
423
    (*ppFunc)->pParameterList = NULL;
×
UNCOV
424
    nodesDestroyNode((SNode*)*ppFunc);
×
UNCOV
425
    *ppFunc = NULL;
×
UNCOV
426
    return code;
×
427
  }
428
  return code;
110,437✔
429
}
430

431
static void resetOutputChangedFunc(SFunctionNode *pFunc, const SFunctionNode* pSrcFunc) {
916,577✔
432
  if (funcMgtBuiltins[pFunc->funcId].type == FUNCTION_TYPE_LAST_MERGE) {
916,577✔
433
    pFunc->node.resType = pSrcFunc->node.resType;
13,255✔
434
    return;
13,255✔
435
  }
436
}
437

438
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) {
916,581✔
439
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
916,581✔
440
  if (NULL == *ppFunc) {
916,581!
UNCOV
441
    return code;
×
442
  }
443

444
  (*ppFunc)->hasPk = pSrcFunc->hasPk;
916,581✔
445
  (*ppFunc)->pkBytes = pSrcFunc->pkBytes;
916,581✔
446
  (*ppFunc)->pSrcFuncRef = pSrcFunc;
916,581✔
447

448
  (void)snprintf((*ppFunc)->functionName, sizeof((*ppFunc)->functionName), "%s", pName);
916,581✔
449
  (*ppFunc)->pParameterList = pParameterList;
916,581✔
450
  code = getFuncInfo((*ppFunc));
916,581✔
451
  if (TSDB_CODE_SUCCESS != code) {
916,577!
UNCOV
452
    (*ppFunc)->pParameterList = NULL;
×
UNCOV
453
    nodesDestroyNode((SNode*)*ppFunc);
×
UNCOV
454
    *ppFunc = NULL;
×
UNCOV
455
    return code;
×
456
  }
457
  resetOutputChangedFunc(*ppFunc, pSrcFunc);
916,577✔
458
  (*ppFunc)->node.relatedTo = pSrcFunc->node.relatedTo;
916,578✔
459
  (*ppFunc)->node.bindExprID = pSrcFunc->node.bindExprID;
916,578✔
460
  return code;
916,578✔
461
}
462

463
static int32_t createColumnByFunc(const SFunctionNode* pFunc, SColumnNode** ppCol) {
594,280✔
464
  int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)ppCol);
594,280✔
465
  if (NULL == *ppCol) {
594,282!
UNCOV
466
    return code;
×
467
  }
468
  tstrncpy((*ppCol)->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
594,282✔
469
  (*ppCol)->node.resType = pFunc->node.resType;
594,282✔
470
  return TSDB_CODE_SUCCESS;
594,282✔
471
}
472

473
bool fmIsDistExecFunc(int32_t funcId) {
1,594,847✔
474
  if (fmIsUserDefinedFunc(funcId)) {
1,594,847✔
475
    return false;
24✔
476
  }
477
  if (!fmIsVectorFunc(funcId)) {
1,594,882!
UNCOV
478
    return true;
×
479
  }
480
  return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc);
1,594,911!
481
}
482

483
static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) {
321,882✔
484
  SNodeList* pParameterList = NULL;
321,882✔
485
  int32_t    code = nodesCloneList(pSrcFunc->pParameterList, &pParameterList);
321,882✔
486
  if (NULL == pParameterList) {
321,883!
UNCOV
487
    return code;
×
488
  }
489
  code =
490
      createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pSrcFunc, pParameterList, pPartialFunc);
321,883✔
491
  if (TSDB_CODE_SUCCESS != code) {
321,880!
UNCOV
492
    nodesDestroyList(pParameterList);
×
493
    return code;
×
494
  }
495
  (*pPartialFunc)->hasOriginalFunc = true;
321,880✔
496
  (*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
321,880✔
497
  char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
321,880✔
498

499
  int32_t len = tsnprintf(name, sizeof(name), "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
321,880✔
500
  if (taosHashBinary(name, len) < 0) {
321,883!
UNCOV
501
    return TSDB_CODE_FAILED;
×
502
  }
503
  tstrncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN);
321,883✔
504
  return TSDB_CODE_SUCCESS;
321,883✔
505
}
506

507
static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
594,280✔
508
                                   SNodeList** pParameterList) {
509
  SNode*  pRes = NULL;
594,280✔
510
  int32_t code = createColumnByFunc(pPartialFunc, (SColumnNode**)&pRes);
594,280✔
511
  if (TSDB_CODE_SUCCESS != code) {
594,279!
UNCOV
512
    return code;
×
513
  }
514
  if (NULL != funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc) {
594,279✔
515
    return funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc(pSrcFunc->pParameterList, pRes, pParameterList);
4,684✔
516
  } else {
517
    return nodesListMakeStrictAppend(pParameterList, pRes);
589,595✔
518
  }
519
}
520

521
static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
272,399✔
522
                                 SFunctionNode** pMidFunc) {
523
  SNodeList*     pParameterList = NULL;
272,399✔
524
  SFunctionNode* pFunc = NULL;
272,399✔
525

526
  int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
272,399✔
527
  if (TSDB_CODE_SUCCESS == code) {
272,401!
528
    if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
272,401✔
529
      code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pSrcFunc, pParameterList, &pFunc);
18,931✔
530
    }else{
531
      code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
253,470✔
532
    }
533
  }
534
  if (TSDB_CODE_SUCCESS == code) {
272,399!
535
    tstrncpy(pFunc->node.aliasName, pPartialFunc->node.aliasName, TSDB_COL_NAME_LEN);
272,399✔
536
  }
537

538
  if (TSDB_CODE_SUCCESS == code) {
272,399!
539
    *pMidFunc = pFunc;
272,399✔
540
  } else {
UNCOV
541
    nodesDestroyList(pParameterList);
×
542
  }
543
  return code;
272,399✔
544
}
545

546
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
321,880✔
547
                                   SFunctionNode** pMergeFunc) {
548
  SNodeList*     pParameterList = NULL;
321,880✔
549
  SFunctionNode* pFunc = NULL;
321,880✔
550

551
  int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
321,880✔
552
  if (TSDB_CODE_SUCCESS == code) {
321,882!
553
    code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
321,882✔
554
  }
555
  if (TSDB_CODE_SUCCESS == code) {
321,883✔
556
    pFunc->hasOriginalFunc = true;
321,882✔
557
    pFunc->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
321,882✔
558
    // overwrite function restype set by translate function
559
    if (fmIsSameInOutType(pSrcFunc->funcId)) {
321,882✔
560
      pFunc->node.resType = pSrcFunc->node.resType;
109,948✔
561
    }
562
    tstrncpy(pFunc->node.aliasName, pSrcFunc->node.aliasName, TSDB_COL_NAME_LEN);
321,880✔
563
  }
564

565
  if (TSDB_CODE_SUCCESS == code) {
321,881!
566
    *pMergeFunc = pFunc;
321,881✔
567
  } else {
568
    nodesDestroyList(pParameterList);
×
569
  }
570
  return code;
321,881✔
571
}
572

573
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc,
321,882✔
574
                        SFunctionNode** pMergeFunc) {
575
  if (!fmIsDistExecFunc(pFunc->funcId)) {
321,882!
UNCOV
576
    return TSDB_CODE_FAILED;
×
577
  }
578

579
  int32_t code = createPartialFunction(pFunc, pPartialFunc);
321,883✔
580
  if (TSDB_CODE_SUCCESS == code && pMidFunc) {
321,881!
581
    code = createMidFunction(pFunc, *pPartialFunc, pMidFunc);
272,401✔
582
  }
583
  if (TSDB_CODE_SUCCESS == code) {
321,880!
584
    code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
321,882✔
585
  }
586

587
  if (TSDB_CODE_SUCCESS != code) {
321,879!
UNCOV
588
    nodesDestroyNode((SNode*)*pPartialFunc);
×
UNCOV
589
    if (pMidFunc) nodesDestroyNode((SNode*)*pMidFunc);
×
590
    nodesDestroyNode((SNode*)*pMergeFunc);
×
591
  }
592

593
  return code;
321,881✔
594
}
595

596
char* fmGetFuncName(int32_t funcId) {
2✔
597
  if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
2!
UNCOV
598
    return taosStrdup("invalid function");
×
599
  }
600
  return taosStrdup(funcMgtBuiltins[funcId].name);
2!
601
}
602

603
/// @param [out] pStateFunc, not changed if error occured or no need to create state func
604
/// @retval 0 for succ, otherwise err occured
605
static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pStateFunc) {
33,088✔
606
  if (funcMgtBuiltins[pFunc->funcId].pStateFunc) {
33,088!
607
    SNodeList* pParams = NULL;
33,088✔
608
    int32_t    code = nodesCloneList(pFunc->pParameterList, &pParams);
33,088✔
609
    if (!pParams) return code;
33,088!
610
    code = createFunction(funcMgtBuiltins[pFunc->funcId].pStateFunc, pParams, pStateFunc);
33,088✔
611
    if (TSDB_CODE_SUCCESS != code) {
33,088!
UNCOV
612
      nodesDestroyList(pParams);
×
UNCOV
613
      return code;
×
614
    }
615
    tstrncpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
33,088✔
616
    tstrncpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias, TSDB_COL_NAME_LEN);
33,088✔
617
  }
618
  return TSDB_CODE_SUCCESS;
33,088✔
619
}
620

621
bool fmIsTSMASupportedFunc(func_id_t funcId) {
617,841✔
622
  return isSpecificClassifyFunc(funcId, FUNC_MGT_TSMA_FUNC) &&
1,212,600✔
623
         !isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC);
594,759!
624
}
625

626
int32_t fmCreateStateFuncs(SNodeList* pFuncs) {
156✔
627
  int32_t code;
628
  SNode*  pNode;
629
  char    buf[128] = {0};
156✔
630
  FOREACH(pNode, pFuncs) {
33,244!
631
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
33,088✔
632
    code = fmGetFuncInfo(pFunc, buf, 128);
33,088✔
633
    if (code) break;
33,088!
634
    if (fmIsTSMASupportedFunc(pFunc->funcId)) {
33,088!
635
      SFunctionNode* pNewFunc = NULL;
33,088✔
636
      code = fmCreateStateFunc(pFunc, &pNewFunc);
33,088✔
637
      if (code) {
33,088!
638
        // error
UNCOV
639
        break;
×
640
      } else if (!pNewFunc) {
33,088!
641
        // no need state func
UNCOV
642
        continue;
×
643
      } else {
644
        REPLACE_NODE(pNewFunc);
33,088✔
645
        nodesDestroyNode(pNode);
33,088✔
646
      }
647
    }
648
  }
649
  return code;
156✔
650
}
651

652
static int32_t fmCreateStateMergeFunc(SFunctionNode* pFunc, SFunctionNode** pStateMergeFunc) {
428✔
653
  if (funcMgtBuiltins[pFunc->funcId].pMergeFunc) {
428✔
654
    SNodeList* pParams = NULL;
416✔
655
    int32_t    code = nodesCloneList(pFunc->pParameterList, &pParams);
416✔
656
    if (!pParams) return code;
416!
657
    code = createFunctionWithSrcFunc(funcMgtBuiltins[pFunc->funcId].pMergeFunc, pFunc, pParams, pStateMergeFunc);
416✔
658
    if (TSDB_CODE_SUCCESS != code) {
416!
UNCOV
659
      nodesDestroyList(pParams);
×
UNCOV
660
      return code;
×
661
    }
662
    tstrncpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
416✔
663
    tstrncpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias, TSDB_COL_NAME_LEN);
416✔
664
  }
665
  return TSDB_CODE_SUCCESS;
428✔
666
}
667

668
int32_t fmCreateStateMergeFuncs(SNodeList* pFuncs) {
92✔
669
  int32_t code;
670
  SNode*  pNode;
671
  char    buf[128] = {0};
92✔
672
  FOREACH(pNode, pFuncs) {
520!
673
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
428✔
674
    if (fmIsTSMASupportedFunc(pFunc->funcId)) {
428!
675
      SFunctionNode* pNewFunc = NULL;
428✔
676
      code = fmCreateStateMergeFunc(pFunc, &pNewFunc);
428✔
677
      if (code) {
428!
678
        // error
UNCOV
679
        break;
×
680
      } else if (!pNewFunc) {
428✔
681
        // no state merge func
682
        continue;
12✔
683
      } else {
684
        REPLACE_NODE(pNewFunc);
416✔
685
        nodesDestroyNode(pNode);
416✔
686
      }
687
    }
688
  }
689
  return code;
92✔
690
}
691

692
int32_t fmGetFuncId(const char* name) {
83,518✔
693
  if (NULL != gFunMgtService.pFuncNameHashTable) {
83,518!
694
    void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name));
83,518✔
695
    if (NULL != pVal) {
83,518!
696
      return *(int32_t*)pVal;
83,518✔
697
    }
UNCOV
698
    return -1;
×
699
  }
UNCOV
700
  for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
×
UNCOV
701
    if (0 == strcmp(funcMgtBuiltins[i].name, name)) {
×
UNCOV
702
      return i;
×
703
    }
704
  }
UNCOV
705
  return -1;
×
706
}
707

708
bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
16,719,700✔
709
  const SBuiltinFuncDefinition* pFunc = &funcMgtBuiltins[funcId];
16,719,700✔
710
  const SBuiltinFuncDefinition* pStateFunc = &funcMgtBuiltins[stateFuncId];
16,719,700✔
711
  if (!pFunc->pStateFunc) {
16,719,700!
UNCOV
712
    return false;
×
713
  }
714
  if (strcmp(pFunc->pStateFunc, pStateFunc->name) == 0) return true;
16,719,700✔
715
  int32_t stateMergeFuncId = fmGetFuncId(pFunc->pStateFunc);
82,058✔
716
  if (stateMergeFuncId == -1) {
82,058!
UNCOV
717
    return false;
×
718
  }
719
  const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId];
82,058✔
720
  return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
82,058✔
721
}
722

723
bool fmIsCountLikeFunc(int32_t funcId) {
1,110,816✔
724
  return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC);
1,110,816✔
725
}
726

727
bool fmIsRowTsOriginFunc(int32_t funcId) {
17,279✔
728
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
17,279!
UNCOV
729
    return false;
×
730
  }
731
  return FUNCTION_TYPE_IROWTS_ORIGIN == funcMgtBuiltins[funcId].type;
17,279✔
732
}
733

UNCOV
734
bool fmIsGroupIdFunc(int32_t funcId) {
×
UNCOV
735
  if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
×
UNCOV
736
    return false;
×
737
  }
UNCOV
738
  return FUNCTION_TYPE_GROUP_ID == funcMgtBuiltins[funcId].type;
×
739
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc