• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.87
/source/libs/function/src/udfd.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifdef USE_UDF
17
// clang-format off
18
#include "uv.h"
19
#include "os.h"
20
#include "fnLog.h"
21
#include "thash.h"
22

23
#include "tudf.h"
24
#include "tudfInt.h"
25
#include "version.h"
26

27
#include "tdatablock.h"
28
#include "tdataformat.h"
29
#include "tglobal.h"
30
#include "tmsg.h"
31
#include "trpc.h"
32
#include "tmisce.h"
33
#include "tversion.h"
34
#include "dmUtil.h"
35
// clang-format on
36

37
#define UDFD_MAX_SCRIPT_PLUGINS 64
38
#define UDFD_MAX_SCRIPT_TYPE    1
39
#define UDFD_MAX_PLUGIN_FUNCS   9
40

41
typedef struct SUdfCPluginCtx {
42
  uv_lib_t lib;
43

44
  TUdfScalarProcFunc scalarProcFunc;
45

46
  TUdfAggStartFunc   aggStartFunc;
47
  TUdfAggProcessFunc aggProcFunc;
48
  TUdfAggFinishFunc  aggFinishFunc;
49

50
  TUdfInitFunc    initFunc;
51
  TUdfDestroyFunc destroyFunc;
52
} SUdfCPluginCtx;
53

54
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
1,917✔
55

56
int32_t udfdCPluginClose() { return 0; }
1,917✔
57

58
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
40,732✔
59
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
122,196✔
60
  char  initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
40,732✔
61
  char *initSuffix = "_init";
40,732✔
62
  snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
40,732✔
63
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
40,732✔
64

65
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
40,220✔
66
  char *destroySuffix = "_destroy";
40,220✔
67
  snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
40,220✔
68
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
40,220✔
69
  return 0;
40,220✔
70
}
71

72
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
11,463✔
73
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
34,389✔
74
  char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
11,463✔
75
  snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); 
11,463✔
76
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
11,463✔
77

78
  char  startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
11,463✔
79
  char *startSuffix = "_start";
11,463✔
80
  snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
11,463✔
81
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
11,463✔
82

83
  char  finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
10,887✔
84
  char *finishSuffix = "_finish";
10,887✔
85
  snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
10,887✔
86
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
10,887✔
87

88
  return 0;
10,887✔
89
}
90

91
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
40,732✔
92
  TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx);
122,196✔
93
  int32_t         err = 0;
40,732✔
94
  SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
40,732✔
95
  if (NULL == udfCtx) {
40,732✔
96
    return terrno;
×
97
  }
98
  err = uv_dlopen(udf->path, &udfCtx->lib);
40,732✔
99
  if (err != 0) {
40,732✔
100
    fnError("can not load library %s. error: %s, %s", udf->path, uv_strerror(err), udfCtx->lib.errmsg);
×
101
    taosMemoryFree(udfCtx);
×
102
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
103
  }
104
  const char *udfName = udf->name;
40,732✔
105

106
  err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
40,732✔
107
  if (err != 0) {
40,732✔
108
    fnError("can not load init/destroy functions. error: %d", err);
512✔
109
    err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
512✔
110
    goto _exit;
512✔
111
  }
112

113
  if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
40,220✔
114
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
28,757✔
115
    snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
28,757✔
116
    if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
28,757✔
117
      fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
×
118
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
119
      goto _exit;
×
120
    }
121
  } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
11,463✔
122
    err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
11,463✔
123
    if (err != 0) {
11,463✔
124
      fnError("can not load aggregation functions. error: %d", err);
576✔
125
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
576✔
126
      goto _exit;
576✔
127
    }
128
  }
129

130
  if (udfCtx->initFunc) {
39,644✔
131
    err = (udfCtx->initFunc)();
39,644✔
132
    if (err != 0) {
39,644✔
133
      fnError("udf init function failed. error: %d", err);
×
134
      goto _exit;
×
135
    }
136
  }
137
  *pUdfCtx = udfCtx;
39,644✔
138
  return 0;
39,644✔
139
_exit:
1,088✔
140
  uv_dlclose(&udfCtx->lib);
1,088✔
141
  taosMemoryFree(udfCtx);
1,088✔
142
  return err;
1,088✔
143
}
144

145
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
40,732✔
146
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx);
80,376✔
147
  SUdfCPluginCtx *ctx = udfCtx;
39,644✔
148
  int32_t         code = 0;
39,644✔
149
  if (ctx->destroyFunc) {
39,644✔
150
    code = (ctx->destroyFunc)();
39,644✔
151
  }
152
  uv_dlclose(&ctx->lib);
39,644✔
153
  taosMemoryFree(ctx);
39,644✔
154
  return code;
39,644✔
155
}
156

157
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
347,382✔
158
  TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx);
1,389,870✔
159
  SUdfCPluginCtx *ctx = udfCtx;
347,382✔
160
  if (ctx->scalarProcFunc) {
347,382✔
161
    return ctx->scalarProcFunc(block, resultCol);
347,496✔
162
  } else {
UNCOV
163
    fnError("udfd c plugin scalar proc not implemented");
×
164
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
165
  }
166
}
167

168
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
120,974✔
169
  TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx);
362,922✔
170
  SUdfCPluginCtx *ctx = udfCtx;
120,974✔
171
  if (ctx->aggStartFunc) {
120,974✔
172
    return ctx->aggStartFunc(buf);
120,974✔
173
  } else {
174
    fnError("udfd c plugin aggregation start not implemented");
×
175
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
176
  }
177
  return 0;
178
}
179

180
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
283,246✔
181
  TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx);
1,416,230✔
182
  SUdfCPluginCtx *ctx = udfCtx;
283,246✔
183
  if (ctx->aggProcFunc) {
283,246✔
184
    return ctx->aggProcFunc(block, interBuf, newInterBuf);
283,246✔
185
  } else {
186
    fnError("udfd c plugin aggregation process not implemented");
×
187
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
188
  }
189
}
190

191
// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
192
//                                void *udfCtx) {
193
//   TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
194
//   SUdfCPluginCtx *ctx = udfCtx;
195
//   if (ctx->aggMergeFunc) {
196
//     return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
197
//   } else {
198
//     fnError("udfd c plugin aggregation merge not implemented");
199
//     return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
200
//   }
201
// }
202

203
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
119,758✔
204
  TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
479,032✔
205
  SUdfCPluginCtx *ctx = udfCtx;
119,758✔
206
  if (ctx->aggFinishFunc) {
119,758✔
207
    return ctx->aggFinishFunc(buf, resultData);
119,758✔
208
  } else {
209
    fnError("udfd c plugin aggregation finish not implemented");
×
210
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
211
  }
212
  return 0;
213
}
214

215
// for c, the function pointer are filled directly and libloaded = true;
216
// for others, dlopen/dlsym to find function pointers
217
typedef struct SUdfScriptPlugin {
218
  int8_t scriptType;
219

220
  char     libPath[PATH_MAX];
221
  bool     libLoaded;
222
  uv_lib_t lib;
223

224
  TScriptUdfScalarProcFunc udfScalarProcFunc;
225
  TScriptUdfAggStartFunc   udfAggStartFunc;
226
  TScriptUdfAggProcessFunc udfAggProcFunc;
227
  TScriptUdfAggMergeFunc   udfAggMergeFunc;
228
  TScriptUdfAggFinishFunc  udfAggFinishFunc;
229

230
  TScriptUdfInitFunc    udfInitFunc;
231
  TScriptUdfDestoryFunc udfDestroyFunc;
232

233
  TScriptOpenFunc  openFunc;
234
  TScriptCloseFunc closeFunc;
235
} SUdfScriptPlugin;
236

237
typedef struct SUdfdContext {
238
  uv_loop_t  *loop;
239
  uv_pipe_t   ctrlPipe;
240
  uv_signal_t intrSignal;
241
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
242
  uv_pipe_t   listeningPipe;
243

244
  void     *clientRpc;
245
  SCorEpSet mgmtEp;
246

247
  uv_mutex_t udfsMutex;
248
  SHashObj  *udfsHash;
249

250
  uv_mutex_t        scriptPluginsMutex;
251
  SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
252

253
  SArray *residentFuncs;
254

255
  char udfDataDir[PATH_MAX];
256
  bool printVersion;
257
} SUdfdContext;
258

259
SUdfdContext global;
260

261
struct SUdfdUvConn;
262
struct SUvUdfWork;
263

264
typedef struct SUdfdUvConn {
265
  uv_stream_t *client;
266
  char        *inputBuf;
267
  int32_t      inputLen;
268
  int32_t      inputCap;
269
  int32_t      inputTotal;
270

271
  struct SUvUdfWork *pWorkList;  // head of work list
272
} SUdfdUvConn;
273

274
typedef struct SUvUdfWork {
275
  SUdfdUvConn *conn;
276
  uv_buf_t     input;
277
  uv_buf_t     output;
278

279
  struct SUvUdfWork *pWorkNext;
280
} SUvUdfWork;
281

282
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
283

284
typedef struct SUdf {
285
  char    name[TSDB_FUNC_NAME_LEN + 1];
286
  int32_t version;
287
  int64_t createdTime;
288

289
  int8_t  funcType;
290
  int8_t  scriptType;
291
  int8_t  outputType;
292
  int32_t outputLen;
293
  int32_t bufSize;
294

295
  char path[PATH_MAX];
296

297
  int32_t    refCount;
298
  EUdfState  state;
299
  uv_mutex_t lock;
300
  uv_cond_t  condReady;
301
  bool       resident;
302

303
  SUdfScriptPlugin *scriptPlugin;
304
  void             *scriptUdfCtx;
305

306
  int64_t lastFetchTime;  // last fetch time in milliseconds
307
  bool    expired;
308
} SUdf;
309

310
typedef struct SUdfcFuncHandle {
311
  SUdf *udf;
312
} SUdfcFuncHandle;
313

314
typedef enum EUdfdRpcReqRspType {
315
  UDFD_RPC_MNODE_CONNECT = 0,
316
  UDFD_RPC_RETRIVE_FUNC,
317
} EUdfdRpcReqRspType;
318

319
typedef struct SUdfdRpcSendRecvInfo {
320
  EUdfdRpcReqRspType rpcType;
321
  int32_t            code;
322
  void              *param;
323
  uv_sem_t           resultSem;
324
} SUdfdRpcSendRecvInfo;
325

326
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
327
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
328
static int32_t udfdConnectToMnode();
329
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
330
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
331
static int32_t udfdOpenClientRpc();
332
static void    udfdCloseClientRpc();
333

334
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
335
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
336
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
337
static void udfdProcessRequest(uv_work_t *req);
338
static void udfdOnWrite(uv_write_t *req, int status);
339
static void udfdSendResponse(uv_work_t *work, int status);
340
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
341
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
342
static void udfdHandleRequest(SUdfdUvConn *conn);
343
static void udfdPipeCloseCb(uv_handle_t *pipe);
344
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
127,516✔
345
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
346
static void udfdOnNewConnection(uv_stream_t *server, int status);
347

348
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
349
static void    removeListeningPipe();
350

351
static void    udfdPrintVersion();
352
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
353
static int32_t udfdInitLog();
354

355
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
356
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
357
static int32_t udfdUvInit();
358
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
359
static void    udfdRun();
360
static void    udfdConnectMnodeThreadFunc(void *args);
361

362
int32_t udfdNewUdf(SUdf **pUdf,  const char *udfName);
363
void  udfdGetFuncBodyPath(const SUdf *udf, char *path);
364

365
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
1,917✔
366
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
3,834✔
367
  plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
1,917✔
368
  plugin->openFunc = udfdCPluginOpen;
1,917✔
369
  plugin->closeFunc = udfdCPluginClose;
1,917✔
370
  plugin->udfInitFunc = udfdCPluginUdfInit;
1,917✔
371
  plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
1,917✔
372
  plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
1,917✔
373
  plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
1,917✔
374
  plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
1,917✔
375
  // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
376
  plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
1,917✔
377

378
  SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
1,917✔
379
  int32_t           err = plugin->openFunc(items, 1);
1,917✔
380
  if (err != 0) return err;
1,917✔
381
  return 0;
1,917✔
382
}
383

384
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
64✔
385
  TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func);
320✔
386
  int err = uv_dlopen(libPath, pLib);
64✔
387
  if (err != 0) {
64✔
388
    fnError("can not load library %s. error: %s, %s", libPath, uv_strerror(err), pLib->errmsg);
×
389
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
390
  }
391

392
  for (int i = 0; i < numOfFuncs; ++i) {
640✔
393
    err = uv_dlsym(pLib, funcName[i], func[i]);
576✔
394
    if (err != 0) {
576✔
395
      fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
×
396
    }
397
  }
398
  return 0;
64✔
399
}
400

401
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
64✔
402
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
128✔
403
  plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
64✔
404
  // todo: windows support
405
  snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
64✔
406
  plugin->libLoaded = false;
64✔
407
  const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen",         "pyClose",         "pyUdfInit",
64✔
408
                                                 "pyUdfDestroy",   "pyUdfScalarProc", "pyUdfAggStart",
409
                                                 "pyUdfAggFinish", "pyUdfAggProc",    "pyUdfAggMerge"};
410
  void      **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
64✔
411
           (void **)&plugin->openFunc,         (void **)&plugin->closeFunc,         (void **)&plugin->udfInitFunc,
64✔
412
           (void **)&plugin->udfDestroyFunc,   (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
64✔
413
           (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc,    (void **)&plugin->udfAggMergeFunc};
64✔
414
  int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
64✔
415
  if (err != 0) {
64✔
416
    fnError("can not load python plugin. lib path %s", plugin->libPath);
×
417
    return err;
×
418
  }
419

420
  if (plugin->openFunc) {
64✔
421
    int16_t lenPythonPath =
64✔
422
        strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1;  // global.udfDataDir:tsUdfdLdLibPath
64✔
423
    char *pythonPath = taosMemoryMalloc(lenPythonPath);
64✔
424
    if(pythonPath == NULL) {
64✔
425
      uv_dlclose(&plugin->lib);
×
426
      return terrno;
×
427
    }
428
#ifdef WINDOWS
429
    snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
430
#else
431
    snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
64✔
432
#endif
433
    SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
64✔
434
    err = plugin->openFunc(items, 2);
64✔
435
    taosMemoryFree(pythonPath);
64✔
436
  }
437
  if (err != 0) {
64✔
438
    fnError("udf script python plugin open func failed. error: %d", err);
×
439
    uv_dlclose(&plugin->lib);
×
440
    return err;
×
441
  }
442
  plugin->libLoaded = true;
64✔
443

444
  return 0;
64✔
445
}
446

447
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
1,917✔
448
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
3,834✔
449
  if (plugin->closeFunc) {
1,917✔
450
    if (plugin->closeFunc() != 0) {
1,917✔
451
      fnError("udf script c plugin close func failed.line:%d", __LINE__);
×
452
    }
453
  }
454
  plugin->openFunc = NULL;
1,917✔
455
  plugin->closeFunc = NULL;
1,917✔
456
  plugin->udfInitFunc = NULL;
1,917✔
457
  plugin->udfDestroyFunc = NULL;
1,917✔
458
  plugin->udfScalarProcFunc = NULL;
1,917✔
459
  plugin->udfAggStartFunc = NULL;
1,917✔
460
  plugin->udfAggProcFunc = NULL;
1,917✔
461
  plugin->udfAggMergeFunc = NULL;
1,917✔
462
  plugin->udfAggFinishFunc = NULL;
1,917✔
463
  return;
1,917✔
464
}
465

466
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
64✔
467
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
128✔
468
  if (plugin->closeFunc) {
64✔
469
    if (plugin->closeFunc() != 0) {
64✔
470
      fnError("udf script python plugin close func failed.line:%d", __LINE__);
×
471
    }
472
  }
473
  uv_dlclose(&plugin->lib);
64✔
474
  if (plugin->libLoaded) {
64✔
475
    plugin->libLoaded = false;
64✔
476
  }
477
  plugin->openFunc = NULL;
64✔
478
  plugin->closeFunc = NULL;
64✔
479
  plugin->udfInitFunc = NULL;
64✔
480
  plugin->udfDestroyFunc = NULL;
64✔
481
  plugin->udfScalarProcFunc = NULL;
64✔
482
  plugin->udfAggStartFunc = NULL;
64✔
483
  plugin->udfAggProcFunc = NULL;
64✔
484
  plugin->udfAggMergeFunc = NULL;
64✔
485
  plugin->udfAggFinishFunc = NULL;
64✔
486
}
487

488
int32_t udfdInitScriptPlugin(int8_t scriptType) {
1,981✔
489
  SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
1,981✔
490
  if (plugin == NULL) {
1,981✔
491
    return terrno;
×
492
  }
493
  int32_t err = 0;
1,981✔
494
  switch (scriptType) {
1,981✔
495
    case TSDB_FUNC_SCRIPT_BIN_LIB:
1,917✔
496
      err = udfdInitializeCPlugin(plugin);
1,917✔
497
      if (err != 0) {
1,917✔
498
        fnError("udf script c plugin init failed. error: %d", err);
×
499
        taosMemoryFree(plugin);
×
500
        return err;
×
501
      }
502
      break;
1,917✔
503
    case TSDB_FUNC_SCRIPT_PYTHON: {
64✔
504
      err = udfdInitializePythonPlugin(plugin);
64✔
505
      if (err != 0) {
64✔
506
        fnError("udf script python plugin init failed. error: %d", err);
×
507
        taosMemoryFree(plugin);
×
508
        return err;
×
509
      }
510
      break;
64✔
511
    }
512
    default:
×
513
      fnError("udf script type %d not supported", scriptType);
×
514
      taosMemoryFree(plugin);
×
515
      return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
516
  }
517

518
  global.scriptPlugins[scriptType] = plugin;
1,981✔
519
  return TSDB_CODE_SUCCESS;
1,981✔
520
}
521

522
void udfdDeinitScriptPlugins() {
661,832✔
523
  SUdfScriptPlugin *plugin = NULL;
661,832✔
524
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
661,832✔
525
  if (plugin != NULL) {
661,832✔
526
    udfdDeinitPythonPlugin(plugin);
64✔
527
    taosMemoryFree(plugin);
64✔
528
    global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = NULL;
64✔
529
  }
530

531
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
661,832✔
532
  if (plugin != NULL) {
661,832✔
533
    udfdDeinitCPlugin(plugin);
1,917✔
534
    taosMemoryFree(plugin);
1,917✔
535
    global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = NULL;
1,917✔
536
  }
537
  return;
661,832✔
538
}
539

540
void udfdProcessRequest(uv_work_t *req) {
1,192,100✔
541
  TAOS_UDF_CHECK_PTR_RVOID(req);
2,384,200✔
542
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
1,192,100✔
543
  if (uvUdf == NULL) {
1,192,100✔
544
    fnError("udf work is NULL");
×
545
    return;
×
546
  }
547
  SUdfRequest request = {0};
1,192,100✔
548
  if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
1,192,100✔
549
  {
550
    taosMemoryFreeClear(uvUdf->input.base);
×
551
    fnError("udf request decode failed");
×
552
    return;
×
553
  }
554

555
  switch (request.type) {
1,191,928✔
556
    case UDF_TASK_SETUP: {
127,516✔
557
      udfdProcessSetupRequest(uvUdf, &request);
127,516✔
558
      break;
127,516✔
559
    }
560

561
    case UDF_TASK_CALL: {
937,984✔
562
      udfdProcessCallRequest(uvUdf, &request);
937,984✔
563
      break;
938,099✔
564
    }
565
    case UDF_TASK_TEARDOWN: {
126,428✔
566
      udfdProcessTeardownRequest(uvUdf, &request);
126,428✔
567
      break;
126,428✔
568
    }
569
    default: {
×
570
      break;
×
571
    }
572
  }
573
}
574

575
static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
56,220✔
576
  udfInfo->bufSize = udf->bufSize;
56,220✔
577
  if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
56,220✔
578
    udfInfo->funcType = UDF_FUNC_TYPE_AGG;
15,367✔
579
  } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
40,853✔
580
    udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
40,853✔
581
  }
582
  udfInfo->name = udf->name;
56,220✔
583
  udfInfo->version = udf->version;
56,220✔
584
  udfInfo->createdTime = udf->createdTime;
56,220✔
585
  udfInfo->outputLen = udf->outputLen;
56,220✔
586
  udfInfo->outputType = udf->outputType;
56,220✔
587
  udfInfo->path = udf->path;
56,220✔
588
  udfInfo->scriptType = udf->scriptType;
56,220✔
589
}
56,220✔
590

591
static int32_t udfdInitUdf(char *udfName, SUdf *udf) {
56,220✔
592
  TAOS_UDF_CHECK_PTR_RCODE(udfName, udf);
168,660✔
593
  int32_t err = 0;
56,220✔
594
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
56,220✔
595
  if (err != 0) {
56,220✔
596
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
×
597
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
598
  }
599
  if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
56,220✔
600
    fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
×
601
    return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
602
  }
603

604
  uv_mutex_lock(&global.scriptPluginsMutex);
56,220✔
605
  SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
56,220✔
606
  if (scriptPlugin == NULL) {
56,220✔
607
    err = udfdInitScriptPlugin(udf->scriptType);
1,981✔
608
    if (err != 0) {
1,981✔
609
      fnError("udf name %s init script plugin failed. error %d", udfName, err);
×
610
      uv_mutex_unlock(&global.scriptPluginsMutex);
×
611
      return err;
×
612
    }
613
  }
614
  uv_mutex_unlock(&global.scriptPluginsMutex);
56,220✔
615
  udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
56,220✔
616

617
  SScriptUdfInfo info = {0};
56,220✔
618
  convertUdf2UdfInfo(udf, &info);
56,220✔
619
  err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
56,220✔
620
  if (err != 0) {
56,220✔
621
    fnError("udf name %s init failed. error %d", udfName, err);
1,088✔
622
    return err;
1,088✔
623
  }
624

625
  fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
55,132✔
626
  return 0;
55,132✔
627
}
628

629
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
56,220✔
630
  TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName);
168,660✔
631
  SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
56,220✔
632
  if (NULL == udfNew) {
56,220✔
633
    return terrno;
×
634
  }
635
  udfNew->refCount = 1;
56,220✔
636
  udfNew->lastFetchTime = taosGetTimestampMs();
56,220✔
637
  tstrncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
56,220✔
638

639
  udfNew->state = UDF_STATE_INIT;
56,220✔
640
  if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
56,220✔
641
  if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
56,220✔
642

643
  udfNew->resident = false;
56,220✔
644
  udfNew->expired = false;
56,220✔
645
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
57,571✔
646
    char *funcName = taosArrayGet(global.residentFuncs, i);
3,198✔
647
    if (strcmp(udfName, funcName) == 0) {
3,198✔
648
      udfNew->resident = true;
1,847✔
649
      break;
1,847✔
650
    }
651
  }
652
  *pUdf =  udfNew;
56,220✔
653

654
  fnTrace("udf new succeeded. name %s(%p)", udfNew->name, udfNew);
56,220✔
655

656
  return 0;
56,220✔
657
}
658

659
void udfdFreeUdf(void *pData) {
54,373✔
660
  SUdf *pSudf = (SUdf *)pData;
54,373✔
661
  if (pSudf == NULL) {
54,373✔
662
    return;
×
663
  }
664

665
  if (pSudf->scriptPlugin != NULL) {
54,373✔
666
    if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
54,373✔
667
      fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name);
1,088✔
668
    }
669
  }
670

671
  uv_mutex_destroy(&pSudf->lock);
54,373✔
672
  uv_cond_destroy(&pSudf->condReady);
54,373✔
673

674
  fnTrace("udf free succeeded. name %s(%p)", pSudf->name, pSudf);
54,373✔
675

676
  taosMemoryFree(pSudf);
54,373✔
677
}
678

679
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
127,516✔
680
  TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName);
382,548✔
681
  uv_mutex_lock(&global.udfsMutex);
127,516✔
682
  SUdf  **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
127,516✔
683
  int64_t currTime = taosGetTimestampMs();
127,516✔
684
  bool    expired = false;
127,516✔
685
  if (pUdfHash) {
127,516✔
686
    expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;  // 10s
71,296✔
687
    if (!expired) {
71,296✔
688
      ++(*pUdfHash)->refCount;
71,296✔
689
      *ppUdf = *pUdfHash;
71,296✔
690
      uv_mutex_unlock(&global.udfsMutex);
71,296✔
691
      fnInfo("udfd reuse existing udf. udf  %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
71,296✔
692
             (*ppUdf)->createdTime);
693
      return 0;
71,296✔
694
    } else {
695
      (*pUdfHash)->expired = true;
×
696
      fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
×
697
             (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
698
      if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
×
699
        fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName);
×
700
      }
701
    }
702
  }
703

704
  int32_t code = udfdNewUdf(ppUdf, udfName);
56,220✔
705
  if(code != 0) {
56,220✔
706
    uv_mutex_unlock(&global.udfsMutex);
×
707
    return code;
×
708
  }
709

710
  if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
56,220✔
711
    uv_mutex_unlock(&global.udfsMutex);
×
712
    return code;
×
713
  }
714
  uv_mutex_unlock(&global.udfsMutex);
56,220✔
715

716
  return 0;
56,220✔
717
}
718

719
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
127,516✔
720
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
382,548✔
721
  // TODO: tracable id from client. connect, setup, call, teardown
722
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
127,516✔
723

724
  SUdfSetupRequest *setup = &request->setup;
127,516✔
725
  int32_t           code = TSDB_CODE_SUCCESS;
127,516✔
726
  SUdf *udf = NULL;
127,516✔
727

728
  code = udfdGetOrCreateUdf(&udf, setup->udfName);
127,516✔
729
  if(code != 0) {
127,516✔
730
    fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
×
731
    goto _send;
×
732
  }
733
  uv_mutex_lock(&udf->lock);
127,516✔
734
  if (udf->state == UDF_STATE_INIT) {
127,516✔
735
    udf->state = UDF_STATE_LOADING;
56,220✔
736
    code = udfdInitUdf(setup->udfName, udf);
56,220✔
737
    if (code == 0) {
56,220✔
738
      udf->state = UDF_STATE_READY;
55,132✔
739
    } else {
740
      udf->state = UDF_STATE_INIT;
1,088✔
741
    }
742
    uv_cond_broadcast(&udf->condReady);
56,220✔
743
    uv_mutex_unlock(&udf->lock);
56,220✔
744
  } else {
745
    while (udf->state == UDF_STATE_LOADING) {
71,296✔
746
      uv_cond_wait(&udf->condReady, &udf->lock);
×
747
    }
748
    uv_mutex_unlock(&udf->lock);
71,296✔
749
  }
750

751
  SUdfcFuncHandle *handle = NULL;
127,516✔
752
  if (!code) {
127,516✔
753
    handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
126,428✔
754
    if (handle == NULL) {
126,428✔
755
      fnError("udfdProcessSetupRequest: malloc failed.");
×
756
      code = terrno;
×
757
    } else {
758
      handle->udf = udf;
126,428✔
759
    }
760
  }
761

762
_send:
73,736✔
763
  ;
764
  SUdfResponse rsp;
72,648✔
765
  rsp.seqNum = request->seqNum;
127,516✔
766
  rsp.type = request->type;
127,516✔
767
  rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
127,516✔
768
  rsp.setupRsp.udfHandle = (int64_t)(handle);
127,516✔
769
  rsp.setupRsp.outputType = udf->outputType;
127,516✔
770
  rsp.setupRsp.bytes = udf->outputLen;
127,516✔
771
  rsp.setupRsp.bufSize = udf->bufSize;
127,516✔
772

773
  int32_t len = encodeUdfResponse(NULL, &rsp);
127,516✔
774
  if(len < 0) {
127,516✔
775
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
776
    code = terrno;
×
777
    goto _exit;
×
778
  }
779
  rsp.msgLen = len;
127,516✔
780
  void *bufBegin = taosMemoryMalloc(len);
127,516✔
781
  if(bufBegin == NULL) {
127,516✔
782
    fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
×
783
    code = terrno;
×
784
    goto _exit;
×
785
  }
786

787
  void *buf = bufBegin;
127,516✔
788
  if(encodeUdfResponse(&buf, &rsp) < 0) {
127,516✔
789
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
790
    taosMemoryFree(bufBegin);
×
791
    code = terrno;
×
792
    goto _exit;
×
793
  }
794
  
795
  uvUdf->output = uv_buf_init(bufBegin, len);
127,516✔
796

797
  taosMemoryFreeClear(uvUdf->input.base);
127,516✔
798

799
_exit:
72,648✔
800
  if (code) {
127,516✔
801
    uv_mutex_lock(&global.udfsMutex);
1,088✔
802
    int32_t removeCode = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
1,088✔
803
    if (removeCode) {
1,088✔
804
      fnError("udf name %s remove from hash failed/setup, err:%0x %s", udf->name, removeCode, tstrerror(removeCode));
×
805
    }
806
    uv_mutex_unlock(&global.udfsMutex);
1,088✔
807

808
    fnError("udf free: setup failed. name %s(%p) err:%0x %s", udf->name, udf, code, tstrerror(code));
1,088✔
809

810
    udfdFreeUdf(udf);
1,088✔
811
  }
812
}
813

814
static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
371,312✔
815
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
371,312✔
816
    return TSDB_CODE_SUCCESS;
×
817
  }
818
  if (output->colData.numOfRows != block->info.rows) {
371,312✔
819
    fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
×
820
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
821
  }
822

823
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_BYROW) {
371,312✔
824
    for (int32_t i = 0; i < output->colData.numOfRows; ++i) {
×
825
      if (!udfColDataIsNull(output, i)) {
×
826
        if (IS_VAR_DATA_TYPE(output->colMeta.type)) {
×
827
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.payload != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
×
828
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.varOffsets[i] >= 0 &&
×
829
                                       output->colData.varLenCol.varOffsets[i] < output->colData.varLenCol.payloadLen,
830
                                   TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
831
        } else {
832
          TAOS_UDF_CHECK_CONDITION(
×
833
              output->colMeta.bytes * output->colData.numOfRows <= output->colData.fixLenCol.dataLen,
834
              TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
835
          break;
×
836
        }
837
      }
838
    }
839
  }
840

841
  return TSDB_CODE_SUCCESS;
371,312✔
842
}
843

844
static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
319,534✔
845
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
319,534✔
846
    return TSDB_CODE_SUCCESS;
×
847
  }
848
  if (output->numOfResult != 1 && output->numOfResult != 0) {
319,534✔
849
    fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
×
850
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
851
  }
852
  TAOS_UDF_CHECK_CONDITION(output->buf != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
319,534✔
853
  TAOS_UDF_CHECK_CONDITION(output->bufLen > 0, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
319,534✔
854
  return TSDB_CODE_SUCCESS;
319,534✔
855
}
856

857
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
938,099✔
858
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
2,814,010✔
859
  SUdfCallRequest *call = &request->call;
938,099✔
860
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
938,099✔
861
          request->seqNum);
862
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
938,041✔
863
  SUdf             *udf = handle->udf;
938,041✔
864
  SUdfResponse      response = {0};
938,041✔
865
  SUdfResponse     *rsp = &response;
938,041✔
866
  SUdfCallResponse *subRsp = &rsp->callRsp;
938,041✔
867

868
  int32_t code = TSDB_CODE_SUCCESS;
938,041✔
869
  switch (call->callType) {
938,041✔
870
    case TSDB_UDF_CALL_SCALA_PROC: {
372,271✔
871
      SUdfColumn output = {0};
372,271✔
872
      output.colMeta.bytes = udf->outputLen;
372,271✔
873
      output.colMeta.type = udf->outputType;
372,271✔
874
      output.colMeta.precision = 0;
372,271✔
875
      output.colMeta.scale = 0;
372,271✔
876
      if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
744,542✔
877
        SUdfDataBlock input = {0};
372,386✔
878
        code = convertDataBlockToUdfDataBlock(&call->block, &input);
372,386✔
879
        if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
372,157✔
880
        freeUdfDataDataBlock(&input);
372,386✔
881
        if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output);
372,329✔
882
        if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
372,272✔
883
      }
884
      freeUdfColumn(&output);
372,271✔
885
      break;
372,386✔
886
    }
887
    case TSDB_UDF_CALL_AGG_INIT: {
123,726✔
888
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
123,726✔
889
      if (outBuf.buf != NULL) {
123,726✔
890
        code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
123,726✔
891
      } else {
892
        code = terrno;
×
893
      }
894
      subRsp->resultBuf = outBuf;
123,726✔
895
      break;
123,726✔
896
    }
897
    case TSDB_UDF_CALL_AGG_PROC: {
319,534✔
898
      SUdfDataBlock input = {0};
319,534✔
899
      if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
319,534✔
900
        SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
319,534✔
901
        if (outBuf.buf != NULL) {
319,534✔
902
          code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
319,534✔
903
          freeUdfInterBuf(&call->interBuf);
319,534✔
904
          if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf);
319,534✔
905
          subRsp->resultBuf = outBuf;
319,534✔
906
        } else {
907
          code = terrno;
×
908
        }
909
      }
910
      freeUdfDataDataBlock(&input);
319,534✔
911

912
      break;
319,534✔
913
    }
914
    // case TSDB_UDF_CALL_AGG_MERGE: {
915
    //   SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
916
    //   if (outBuf.buf != NULL) {
917
    //     code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
918
    //     freeUdfInterBuf(&call->interBuf);
919
    //     freeUdfInterBuf(&call->interBuf2);
920
    //     subRsp->resultBuf = outBuf;
921
    //   } else {
922
    //     code = terrno;
923
    //   }
924
    // 
925
    //   break;
926
    // }
927
    case TSDB_UDF_CALL_AGG_FIN: {
122,510✔
928
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
122,510✔
929
      if (outBuf.buf != NULL) {
122,510✔
930
        code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
122,510✔
931
        freeUdfInterBuf(&call->interBuf);
122,510✔
932
        subRsp->resultBuf = outBuf;
122,510✔
933
      } else {
934
        code = terrno;
×
935
      }
936

937
      break;
122,510✔
938
    }
939
    default:
×
940
      break;
×
941
  }
942

943
  rsp->seqNum = request->seqNum;
938,156✔
944
  rsp->type = request->type;
938,156✔
945
  rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
938,156✔
946
  subRsp->callType = call->callType;
938,156✔
947

948
  int32_t len = encodeUdfResponse(NULL, rsp);
938,156✔
949
  if(len < 0) {
938,041✔
950
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
951
    goto _exit;
×
952
  }
953
  rsp->msgLen = len;
938,041✔
954
  void *bufBegin = taosMemoryMalloc(len);
938,041✔
955
  if (bufBegin == NULL) {
938,156✔
956
    fnError("udfdProcessCallRequest: malloc failed. len %d", len);
×
957
    goto _exit;
×
958
  }
959
  void *buf = bufBegin;
938,156✔
960
  if(encodeUdfResponse(&buf, rsp) < 0) {
938,156✔
961
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
962
    taosMemoryFree(bufBegin);
×
963
    goto _exit;
×
964
  }
965

966
  uvUdf->output = uv_buf_init(bufBegin, len);
938,042✔
967

968
_exit:
938,099✔
969
  switch (call->callType) {
938,099✔
970
    case TSDB_UDF_CALL_SCALA_PROC: {
372,329✔
971
      blockDataFreeRes(&call->block);
372,329✔
972
      blockDataFreeRes(&subRsp->resultData);
372,329✔
973
      break;
372,329✔
974
    }
975
    case TSDB_UDF_CALL_AGG_INIT: {
123,726✔
976
      freeUdfInterBuf(&subRsp->resultBuf);
123,726✔
977
      break;
123,726✔
978
    }
979
    case TSDB_UDF_CALL_AGG_PROC: {
319,534✔
980
      blockDataFreeRes(&call->block);
319,534✔
981
      freeUdfInterBuf(&subRsp->resultBuf);
319,534✔
982
      break;
319,477✔
983
    }
984
    // case TSDB_UDF_CALL_AGG_MERGE: {
985
    //   freeUdfInterBuf(&subRsp->resultBuf);
986
    //   break;
987
    // }
988
    case TSDB_UDF_CALL_AGG_FIN: {
122,510✔
989
      freeUdfInterBuf(&subRsp->resultBuf);
122,510✔
990
      break;
122,510✔
991
    }
992
    default:
×
993
      break;
×
994
  }
995

996
  taosMemoryFreeClear(uvUdf->input.base);
938,042✔
997
  return;
938,156✔
998
}
999

1000
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
126,428✔
1001
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
379,284✔
1002
  SUdfTeardownRequest *teardown = &request->teardown;
126,428✔
1003
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
126,428✔
1004
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
126,428✔
1005
  SUdf            *udf = handle->udf;
126,428✔
1006
  bool             unloadUdf = false;
126,428✔
1007
  int32_t          code = TSDB_CODE_SUCCESS;
126,428✔
1008

1009
  uv_mutex_lock(&global.udfsMutex);
126,428✔
1010
  udf->refCount--;
126,428✔
1011
  if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
126,428✔
1012
    unloadUdf = true;
53,285✔
1013
    code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
53,285✔
1014
    if (code != 0) {
53,285✔
1015
      fnError("udf name %s remove from hash failed, err:%0x %s", udf->name, code, tstrerror(code));
×
1016
      uv_mutex_unlock(&global.udfsMutex);
×
1017
      goto _send;
×
1018
    }
1019
  }
1020
  uv_mutex_unlock(&global.udfsMutex);
126,428✔
1021
  if (unloadUdf) {
126,428✔
1022
    fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
53,285✔
1023

1024
    udfdFreeUdf(udf);
53,285✔
1025
  }
1026

1027
_send:
73,143✔
1028
  taosMemoryFree(handle);
126,428✔
1029
  SUdfResponse  response = {0};
126,428✔
1030
  SUdfResponse *rsp = &response;
126,428✔
1031
  rsp->seqNum = request->seqNum;
126,428✔
1032
  rsp->type = request->type;
126,428✔
1033
  rsp->code = code;
126,428✔
1034
  int32_t len = encodeUdfResponse(NULL, rsp);
126,428✔
1035
  if (len < 0) {
126,428✔
1036
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
1037
    return;
×
1038
  }
1039
  rsp->msgLen = len;
126,428✔
1040
  void *bufBegin = taosMemoryMalloc(len);
126,428✔
1041
  if(bufBegin == NULL) {
126,428✔
1042
    fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
×
1043
    return;
×
1044
  }
1045
  void *buf = bufBegin;
126,428✔
1046
  if (encodeUdfResponse(&buf, rsp) < 0) {
126,428✔
1047
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
1048
    taosMemoryFree(bufBegin);
×
1049
    return;
×
1050
  }
1051
  uvUdf->output = uv_buf_init(bufBegin, len);
126,428✔
1052

1053
  taosMemoryFree(uvUdf->input.base);
126,428✔
1054
  return;
126,428✔
1055
}
1056

1057
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
56,220✔
1058
  TAOS_UDF_CHECK_PTR_RVOID(udf, path);
168,660✔
1059
  if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
56,220✔
1060
#ifdef WINDOWS
1061
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1062
#else
1063
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
40,732✔
1064
             udf->createdTime);
40,732✔
1065
#endif
1066
  } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
15,488✔
1067
#ifdef WINDOWS
1068
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1069
#else
1070
    snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
15,488✔
1071
#endif
1072
  } else {
1073
#ifdef WINDOWS
1074
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
1075
#else
1076
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
1077
#endif
1078
  }
1079
}
1080

1081
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
56,220✔
1082
  TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf);
168,660✔
1083
  if (!osDataSpaceAvailable()) {
56,220✔
1084
    terrno = TSDB_CODE_NO_DISKSPACE;
×
1085
    fnError("udfd create shared library failed since %s", terrstr());
×
1086
    return terrno;
×
1087
  }
1088

1089
  char path[PATH_MAX] = {0};
56,220✔
1090
  udfdGetFuncBodyPath(udf, path);
56,220✔
1091
  bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
56,220✔
1092
  if (fileExist) {
56,220✔
1093
    tstrncpy(udf->path, path, PATH_MAX);
49,839✔
1094
    fnInfo("udfd func body file. reuse existing file %s", path);
49,839✔
1095
    return TSDB_CODE_SUCCESS;
49,839✔
1096
  }
1097

1098
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
6,381✔
1099
  if (file == NULL) {
6,381✔
1100
    fnError("udfd write udf shared library: %s failed, error: %d %s", path, ERRNO, strerror(ERRNO));
×
1101
    return TSDB_CODE_FILE_CORRUPTED;
×
1102
  }
1103
  int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
6,381✔
1104
  if (count != pFuncInfo->codeSize) {
6,381✔
1105
    fnError("udfd write udf shared library failed");
×
1106
    return TSDB_CODE_FILE_CORRUPTED;
×
1107
  }
1108
  if(taosCloseFile(&file) != 0) {
6,381✔
1109
    fnError("udfdSaveFuncBodyToFile, udfd close file failed");
×
1110
    return TSDB_CODE_FILE_CORRUPTED;
×
1111
  }
1112

1113
  tstrncpy(udf->path, path, PATH_MAX);
6,381✔
1114
  return TSDB_CODE_SUCCESS;
6,381✔
1115
}
1116

1117
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
56,220✔
1118
  TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg);
168,660✔
1119
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
56,220✔
1120

1121
  if (pEpSet) {
56,220✔
1122
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
×
1123
      updateEpSet_s(&global.mgmtEp, pEpSet);
×
1124
    }
1125
  }
1126

1127
  if (pMsg->code != TSDB_CODE_SUCCESS) {
56,220✔
1128
    fnError("udfd rpc error, code:%s", tstrerror(pMsg->code));
×
1129
    msgInfo->code = pMsg->code;
×
1130
    goto _return;
×
1131
  }
1132

1133
  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
56,220✔
1134
    SConnectRsp connectRsp = {0};
×
1135
    if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
×
1136
      fnError("udfd deserialize connect response failed");
×
1137
      goto _return;
×
1138
    }
1139

1140
    int32_t now = taosGetTimestampSec();
×
1141
    int32_t delta = abs(now - connectRsp.svrTimestamp);
×
1142
    if (delta > 900) {
×
1143
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
×
1144
      goto _return;
×
1145
    }
1146

1147
    if (connectRsp.epSet.numOfEps == 0) {
×
1148
      msgInfo->code = TSDB_CODE_APP_ERROR;
×
1149
      goto _return;
×
1150
    }
1151

1152
    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
×
1153
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
×
1154
    }
1155
    msgInfo->code = 0;
×
1156
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
56,220✔
1157
    SRetrieveFuncRsp retrieveRsp = {0};
56,220✔
1158
    if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
56,220✔
1159
      fnError("udfd deserialize retrieve func response failed");
×
1160
      goto _return;
×
1161
    }
1162

1163
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
56,220✔
1164
    SUdf      *udf = msgInfo->param;
56,220✔
1165
    udf->funcType = pFuncInfo->funcType;
56,220✔
1166
    udf->scriptType = pFuncInfo->scriptType;
56,220✔
1167
    udf->outputType = pFuncInfo->outputType;
56,220✔
1168
    udf->outputLen = pFuncInfo->outputLen;
56,220✔
1169
    udf->bufSize = pFuncInfo->bufSize;
56,220✔
1170

1171
    SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
56,220✔
1172
    udf->version = pFuncExtraInfo->funcVersion;
56,220✔
1173
    udf->createdTime = pFuncExtraInfo->funcCreatedTime;
56,220✔
1174
    msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
56,220✔
1175
    if (msgInfo->code != 0) {
56,220✔
1176
      udf->lastFetchTime = 0;
×
1177
    }
1178
    tFreeSFuncInfo(pFuncInfo);
56,220✔
1179
    taosArrayDestroy(retrieveRsp.pFuncInfos);
56,220✔
1180
    taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
56,220✔
1181
  }
1182

1183
_return:
×
1184
  rpcFreeCont(pMsg->pCont);
56,220✔
1185
  uv_sem_post(&msgInfo->resultSem);
56,220✔
1186
  return;
56,220✔
1187
}
1188

1189
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
56,220✔
1190
  TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf);
224,880✔
1191
  SRetrieveFuncReq retrieveReq = {0};
56,220✔
1192
  retrieveReq.numOfFuncs = 1;
56,220✔
1193
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
56,220✔
1194
  if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
112,440✔
1195
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1196
    return terrno;
×
1197
  }
1198

1199
  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
56,220✔
1200
  if(contLen < 0) {
56,220✔
1201
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1202
    return terrno;
×
1203
  }
1204
  void   *pReq = rpcMallocCont(contLen);
56,220✔
1205
  if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq)  < 0) {
56,220✔
1206
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1207
    rpcFreeCont(pReq);
×
1208
    return terrno;
×
1209
  }
1210
  taosArrayDestroy(retrieveReq.pFuncNames);
56,220✔
1211

1212
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
56,220✔
1213
  if(NULL == msgInfo) {
56,220✔
1214
    return terrno;
×
1215
  }
1216
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
56,220✔
1217
  msgInfo->param = udf;
56,220✔
1218
  if(uv_sem_init(&msgInfo->resultSem, 0)  != 0) {
56,220✔
1219
    taosMemoryFree(msgInfo);
×
1220
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1221
  }
1222

1223
  SRpcMsg rpcMsg = {0};
56,220✔
1224
  rpcMsg.pCont = pReq;
56,220✔
1225
  rpcMsg.contLen = contLen;
56,220✔
1226
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
56,220✔
1227
  rpcMsg.info.ahandle = msgInfo;
56,220✔
1228
  int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
56,220✔
1229
  if (code == 0) {
56,220✔
1230
    uv_sem_wait(&msgInfo->resultSem);
56,220✔
1231
    uv_sem_destroy(&msgInfo->resultSem);
56,220✔
1232
    code = msgInfo->code;
56,220✔
1233
  }
1234
  taosMemoryFree(msgInfo);
56,220✔
1235
  return code;
56,220✔
1236
}
1237

1238
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
×
1239
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
×
1240
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
×
1241
      code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
×
1242
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
1243
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) {
×
1244
      return false;
×
1245
    }
1246
    return true;
×
1247
  } else {
1248
    return false;
×
1249
  }
1250
}
1251

1252
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
661,832✔
1253
  pEpSet->version = 0;
661,832✔
1254

1255
  // init mnode ip set
1256
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
661,832✔
1257
  mgmtEpSet->numOfEps = 0;
661,832✔
1258
  mgmtEpSet->inUse = 0;
661,832✔
1259

1260
  if (firstEp && firstEp[0] != 0) {
661,832✔
1261
    if (strlen(firstEp) >= TSDB_EP_LEN) {
661,832✔
1262
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1263
      return -1;
×
1264
    }
1265

1266
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
661,832✔
1267
    if (code != TSDB_CODE_SUCCESS) {
661,832✔
1268
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1269
      return terrno;
×
1270
    }
1271

1272
    mgmtEpSet->numOfEps++;
661,832✔
1273
  }
1274

1275
  if (secondEp && secondEp[0] != 0) {
661,832✔
1276
    if (strlen(secondEp) >= TSDB_EP_LEN) {
661,832✔
1277
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1278
      return -1;
×
1279
    }
1280

1281
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
661,832✔
1282
    if (code != TSDB_CODE_SUCCESS) {
661,832✔
1283
      fnError("invalid ep %s", secondEp);
×
1284
    } else {
1285
      mgmtEpSet->numOfEps++;
661,832✔
1286
    }
1287
  }
1288

1289
  if (mgmtEpSet->numOfEps == 0) {
661,832✔
1290
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1291
    return -1;
×
1292
  }
1293

1294
  return 0;
661,832✔
1295
}
1296

1297
int32_t udfdOpenClientRpc() {
661,832✔
1298
  SRpcInit rpcInit = {0};
661,832✔
1299
  rpcInit.label = "UDFD";
661,832✔
1300
  rpcInit.numOfThreads = 1;
661,832✔
1301
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
661,832✔
1302
  rpcInit.sessions = 1024;
661,832✔
1303
  rpcInit.connType = TAOS_CONN_CLIENT;
661,832✔
1304
  rpcInit.idleTime = tsShellActivityTimer * 1000;
661,832✔
1305
  rpcInit.user = TSDB_DEFAULT_USER;
661,832✔
1306
  rpcInit.parent = &global;
661,832✔
1307
  rpcInit.rfp = udfdRpcRfp;
661,832✔
1308
  rpcInit.compressSize = tsCompressMsgSize;
661,832✔
1309

1310
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
661,832✔
1311
  connLimitNum = TMAX(connLimitNum, 10);
661,832✔
1312
  connLimitNum = TMIN(connLimitNum, 500);
661,832✔
1313
  rpcInit.connLimitNum = connLimitNum;
661,832✔
1314
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
661,832✔
1315

1316
  rpcInit.enableSSL = tsEnableTLS;
661,832✔
1317
  rpcInit.enableSasl = tsEnableSasl;
661,832✔
1318
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
661,832✔
1319
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
661,832✔
1320
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
661,832✔
1321
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
661,832✔
1322
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
661,832✔
1323
  TAOS_CHECK_RETURN(taosVersionStrToInt(td_version, &rpcInit.compatibilityVer));
661,832✔
1324

1325
  global.clientRpc = rpcOpen(&rpcInit);
661,832✔
1326
  if (global.clientRpc == NULL) {
661,832✔
1327
    fnError("failed to init dnode rpc client");
×
1328
    return terrno;
×
1329
  }
1330
  return 0;
661,832✔
1331
}
1332

1333
void udfdCloseClientRpc() {
661,832✔
1334
  fnInfo("udfd begin closing rpc");
661,832✔
1335
  rpcClose(global.clientRpc);
661,832✔
1336
  fnInfo("udfd finish closing rpc");
661,832✔
1337
}
661,832✔
1338

1339
void udfdOnWrite(uv_write_t *req, int status) {
1,192,100✔
1340
  TAOS_UDF_CHECK_PTR_RVOID(req);
2,384,200✔
1341
  SUvUdfWork *work = (SUvUdfWork *)req->data;
1,192,100✔
1342
  if (status < 0) {
1,192,100✔
1343
    fnError("udfd send response error, length:%zu code:%s", work->output.len, uv_err_name(status));
×
1344
  }
1345
  // remove work from the connection work list
1346
  if (work->conn != NULL) {
1,192,100✔
1347
    SUvUdfWork **ppWork;
1348
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
1,480,623✔
1349
    }
1350
    if (*ppWork == work) {
1,192,100✔
1351
      *ppWork = work->pWorkNext;
1,192,100✔
1352
    } else {
1353
      fnError("work not in conn any more");
×
1354
    }
1355
  }
1356
  taosMemoryFree(work->output.base);
1,192,100✔
1357
  taosMemoryFree(work);
1,192,100✔
1358
  taosMemoryFree(req);
1,192,100✔
1359
}
1360

1361
void udfdSendResponse(uv_work_t *work, int status) {
1,192,100✔
1362
  TAOS_UDF_CHECK_PTR_RVOID(work);
2,384,200✔
1363
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
1,192,100✔
1364

1365
  if (udfWork->conn != NULL) {
1,192,100✔
1366
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
1,192,100✔
1367
    if(write_req == NULL) {
1,192,100✔
1368
      fnError("udfd send response error, malloc failed");
×
1369
      taosMemoryFree(work);
×
1370
      return;
×
1371
    }
1372
    write_req->data = udfWork;
1,192,100✔
1373
    int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
1,192,100✔
1374
    if (code != 0) {
1,192,100✔
1375
      fnError("udfd send response error %s", uv_strerror(code));
×
1376
      taosMemoryFree(write_req);
×
1377
   }
1378
  }
1379
  taosMemoryFree(work);
1,192,100✔
1380
}
1381

1382
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
3,681,534✔
1383
  TAOS_UDF_CHECK_PTR_RVOID(handle, buf);
11,044,602✔
1384
  SUdfdUvConn *ctx = handle->data;
3,681,534✔
1385
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
3,681,534✔
1386
  if (ctx->inputCap == 0) {
3,681,534✔
1387
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
1,319,616✔
1388
    if (ctx->inputBuf) {
1,319,616✔
1389
      ctx->inputLen = 0;
1,319,616✔
1390
      ctx->inputCap = msgHeadSize;
1,319,616✔
1391
      ctx->inputTotal = -1;
1,319,616✔
1392

1393
      buf->base = ctx->inputBuf;
1,319,616✔
1394
      buf->len = ctx->inputCap;
1,319,616✔
1395
    } else {
1396
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1397
      buf->len = 0;
×
1398
    }
1399
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
2,361,918✔
1400
    buf->base = ctx->inputBuf + ctx->inputLen;
1,165,670✔
1401
    buf->len = msgHeadSize - ctx->inputLen;
1,165,670✔
1402
  } else {
1403
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
1,196,248✔
1404
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
1,196,248✔
1405
    if (inputBuf) {
1,196,248✔
1406
      ctx->inputBuf = inputBuf;
1,196,248✔
1407
      buf->base = ctx->inputBuf + ctx->inputLen;
1,196,248✔
1408
      buf->len = ctx->inputCap - ctx->inputLen;
1,196,248✔
1409
    } else {
1410
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1411
      buf->len = 0;
×
1412
    }
1413
  }
1414
}
1415

1416
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
2,388,348✔
1417
  if (pipe == NULL) {
2,388,348✔
1418
    fnError("udfd pipe is NULL, LINE:%d", __LINE__);
×
1419
    return false;
×
1420
  }
1421
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
2,388,348✔
1422
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
1,192,100✔
1423
  }
1424
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
2,388,348✔
1425
    fnDebug("receive request complete. length %d", pipe->inputLen);
1,192,100✔
1426
    return true;
1,192,100✔
1427
  }
1428
  return false;
1,196,248✔
1429
}
1430

1431
void udfdHandleRequest(SUdfdUvConn *conn) {
1,192,100✔
1432
  TAOS_UDF_CHECK_PTR_RVOID(conn);
2,384,200✔
1433
  char   *inputBuf = conn->inputBuf;
1,192,100✔
1434
  int32_t inputLen = conn->inputLen;
1,192,100✔
1435

1436
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
1,192,100✔
1437
  if(work == NULL) {
1,192,100✔
1438
    fnError("udfd malloc work failed");
×
1439
    return;
×
1440
  }
1441
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
1,192,100✔
1442
  if(udfWork == NULL) {
1,192,100✔
1443
    fnError("udfd malloc udf work failed");
×
1444
    taosMemoryFree(work);
×
1445
    return;
×
1446
  }
1447
  udfWork->conn = conn;
1,192,100✔
1448
  udfWork->pWorkNext = conn->pWorkList;
1,192,100✔
1449
  conn->pWorkList = udfWork;
1,192,100✔
1450
  udfWork->input = uv_buf_init(inputBuf, inputLen);
1,192,100✔
1451
  conn->inputBuf = NULL;
1,192,100✔
1452
  conn->inputLen = 0;
1,192,100✔
1453
  conn->inputCap = 0;
1,192,100✔
1454
  conn->inputTotal = -1;
1,192,100✔
1455
  work->data = udfWork;
1,192,100✔
1456
  if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
1,192,100✔
1457
  {
1458
    fnError("udfd queue work failed");
×
1459
    taosMemoryFree(work);
×
1460
    taosMemoryFree(udfWork);
×
1461
  }
1462
}
1463

1464
void udfdPipeCloseCb(uv_handle_t *pipe) {
127,516✔
1465
  TAOS_UDF_CHECK_PTR_RVOID(pipe);
255,032✔
1466
  SUdfdUvConn *conn = pipe->data;
127,516✔
1467
  SUvUdfWork  *pWork = conn->pWorkList;
127,516✔
1468
  while (pWork != NULL) {
127,516✔
1469
    pWork->conn = NULL;
×
1470
    pWork = pWork->pWorkNext;
×
1471
  }
1472

1473
  taosMemoryFree(conn->client);
127,516✔
1474
  taosMemoryFree(conn->inputBuf);
127,516✔
1475
  taosMemoryFree(conn);
127,516✔
1476
}
1477

1478
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
3,681,534✔
1479
  TAOS_UDF_CHECK_PTR_RVOID(client, buf);
11,044,602✔
1480
  fnDebug("udfd read %zd bytes from client", nread);
3,681,534✔
1481
  if (nread == 0) return;
3,681,534✔
1482

1483
  SUdfdUvConn *conn = client->data;
2,515,864✔
1484

1485
  if (nread > 0) {
2,515,864✔
1486
    conn->inputLen += nread;
2,388,348✔
1487
    if (isUdfdUvMsgComplete(conn)) {
2,388,348✔
1488
      udfdHandleRequest(conn);
1,192,100✔
1489
    } else {
1490
      // log error or continue;
1491
    }
1492
    return;
2,388,348✔
1493
  }
1494

1495
  if (nread < 0) {
127,516✔
1496
    if (nread == UV_EOF) {
127,516✔
1497
      fnInfo("udfd pipe read EOF");
127,516✔
1498
    } else {
1499
      fnError("Receive error %s", uv_err_name(nread));
×
1500
    }
1501
    udfdUvHandleError(conn);
127,516✔
1502
  }
1503
}
1504

1505
void udfdOnNewConnection(uv_stream_t *server, int status) {
127,516✔
1506
  TAOS_UDF_CHECK_PTR_RVOID(server);
255,032✔
1507
  if (status < 0) {
127,516✔
1508
    fnError("udfd new connection error, code:%s", uv_strerror(status));
×
1509
    return;
×
1510
  }
1511
  int32_t code = 0;
127,516✔
1512

1513
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
127,516✔
1514
  if(client == NULL) {
127,516✔
1515
    fnError("udfd pipe malloc failed");
×
1516
    return;
×
1517
  }
1518
  code = uv_pipe_init(global.loop, client, 0);
127,516✔
1519
  if (code) {
127,516✔
1520
    fnError("udfd pipe init error %s", uv_strerror(code));
×
1521
    taosMemoryFree(client);
×
1522
    return;
×
1523
  }
1524
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
127,516✔
1525
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
127,516✔
1526
    if(ctx == NULL) {
127,516✔
1527
      fnError("udfd conn malloc failed");
×
1528
      goto _exit;
×
1529
    }
1530
    ctx->pWorkList = NULL;
127,516✔
1531
    ctx->client = (uv_stream_t *)client;
127,516✔
1532
    ctx->inputBuf = 0;
127,516✔
1533
    ctx->inputLen = 0;
127,516✔
1534
    ctx->inputCap = 0;
127,516✔
1535
    client->data = ctx;
127,516✔
1536
    ctx->client = (uv_stream_t *)client;
127,516✔
1537
    code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
127,516✔
1538
    if (code) {
127,516✔
1539
      fnError("udfd read start error %s", uv_strerror(code));
×
1540
      udfdUvHandleError(ctx);
×
1541
      taosMemoryFree(ctx);
×
1542
      taosMemoryFree(client);
×
1543
    }
1544
    return;
127,516✔
1545
  }
1546
_exit:
×
1547
    uv_close((uv_handle_t *)client, NULL);
×
1548
    taosMemoryFree(client);
×
1549
}
1550

1551
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
×
1552
  TAOS_UDF_CHECK_PTR_RVOID(handle);
×
1553
  fnInfo("udfd signal received: %d\n", signum);
×
1554
  uv_fs_t req;
×
1555
  int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
1556
  if(code) {
×
1557
    fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
×
1558
  }
1559
  code = uv_signal_stop(handle);
×
1560
  if(code) {
×
1561
    fnError("stop signal handler failed, reason:%s", uv_strerror(code));
×
1562
  }
1563
  uv_stop(global.loop);
×
1564
}
1565

1566
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
662,024✔
1567
  for (int32_t i = 1; i < argc; ++i) {
1,323,920✔
1568
    if (strcmp(argv[i], "-c") == 0) {
662,024✔
1569
      if (i < argc - 1) {
661,960✔
1570
        if (strlen(argv[++i]) >= PATH_MAX) {
661,896✔
1571
          (void)printf("config file path overflow");
64✔
1572
          return -1;
64✔
1573
        }
1574
        tstrncpy(configDir, argv[i], PATH_MAX);
661,832✔
1575
      } else {
1576
        (void)printf("'-c' requires a parameter, default is %s\n", configDir);
64✔
1577
        return -1;
64✔
1578
      }
1579
    } else if (strcmp(argv[i], "-V") == 0) {
64✔
1580
      global.printVersion = true;
64✔
1581
    } else {
1582
    }
1583
  }
1584

1585
  return 0;
661,896✔
1586
}
1587

1588
static void udfdPrintVersion() {
64✔
1589
  (void)printf("%sudf version: %s compatible_version: %s\n", CUS_PROMPT, td_version, td_compatible_version);
64✔
1590
  (void)printf("git: %s\n", td_gitinfo);
64✔
1591
  (void)printf("build: %s\n", td_buildinfo);
64✔
1592
}
64✔
1593

1594
static int32_t udfdInitLog() {
661,832✔
1595
  const char *logName = "udfdlog";
661,832✔
1596
  TAOS_CHECK_RETURN(taosInitLogOutput(&logName));
661,832✔
1597
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
661,832✔
1598
}
1599

1600
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
661,832✔
1601
  TAOS_UDF_CHECK_PTR_RVOID(buf);
1,323,664✔
1602
  buf->base = taosMemoryMalloc(suggested_size);
661,832✔
1603
  if (buf->base == NULL) {
661,832✔
1604
    fnError("udfd ctrl pipe alloc buffer failed");
×
1605
    return;
×
1606
  }
1607
  buf->len = suggested_size;
661,832✔
1608
}
1609

1610
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
661,832✔
1611
  TAOS_UDF_CHECK_PTR_RVOID(q, buf);
1,985,496✔
1612
  if (nread < 0) {
661,832✔
1613
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
661,832✔
1614
    taosMemoryFree(buf->base);
661,832✔
1615
    uv_close((uv_handle_t *)q, NULL);
661,832✔
1616
    uv_stop(global.loop);
661,832✔
1617
    return;
661,832✔
1618
  }
1619
  fnError("udfd ctrl pipe read %zu bytes", nread);
×
1620
  taosMemoryFree(buf->base);
×
1621
}
1622

1623
static void removeListeningPipe() {
1,323,664✔
1624
  uv_fs_t req;
1,311,586✔
1625
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
1,323,664✔
1626
  uv_fs_req_cleanup(&req);
1,323,664✔
1627
  if(err) {
1,323,664✔
1628
    fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
1,323,600✔
1629
  }
1630
}
1,323,664✔
1631

1632
static int32_t udfdUvInit() {
661,832✔
1633
  TAOS_CHECK_RETURN(uv_loop_init(global.loop));
661,832✔
1634

1635
  if (tsStartUdfd) {  // udfd is started by taosd, which shall exit when taosd exit
661,832✔
1636
    TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
661,832✔
1637
    TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
661,832✔
1638
    TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
661,832✔
1639
  }
1640
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
661,832✔
1641

1642
  removeListeningPipe();
661,832✔
1643

1644
  TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
661,832✔
1645

1646
  TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
661,832✔
1647
  TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
661,832✔
1648

1649
  int r;
1650
  fnInfo("bind to pipe %s", global.listenPipeName);
661,832✔
1651
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
661,832✔
1652
    fnError("Bind error %s", uv_err_name(r));
×
1653
    removeListeningPipe();
×
1654
    return -2;
×
1655
  }
1656
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
661,832✔
1657
    fnError("Listen error %s", uv_err_name(r));
×
1658
    removeListeningPipe();
×
1659
    return -3;
×
1660
  }
1661
  return 0;
661,832✔
1662
}
1663

1664
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
1,323,664✔
1665
  if (!uv_is_closing(handle)) {
1,323,664✔
1666
    uv_close(handle, NULL);
1,323,664✔
1667
  }
1668
}
1,323,664✔
1669

1670
static int32_t udfdGlobalDataInit() {
661,832✔
1671
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
661,832✔
1672
  if (loop == NULL) {
661,832✔
1673
    fnError("udfd init uv loop failed, mem overflow");
×
1674
    return terrno;
×
1675
  }
1676
  global.loop = loop;
661,832✔
1677

1678
  if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
661,832✔
1679
    fnError("udfd init script plugins mutex failed");
×
1680
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1681
  }
1682

1683
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
661,832✔
1684
  if (global.udfsHash == NULL) {
661,832✔
1685
    return terrno;
×
1686
  }
1687
  // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
1688

1689
  if (uv_mutex_init(&global.udfsMutex) != 0) {
661,832✔
1690
    fnError("udfd init udfs mutex failed");
×
1691
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1692
  }
1693

1694
  return 0;
661,832✔
1695
}
1696

1697
static void udfdGlobalDataDeinit() {
661,832✔
1698
  uv_mutex_destroy(&global.udfsMutex);
661,832✔
1699
  uv_mutex_destroy(&global.scriptPluginsMutex);
661,832✔
1700
  taosMemoryFreeClear(global.loop);
661,832✔
1701
  fnInfo("udfd global data deinit");
661,832✔
1702
}
661,832✔
1703

1704
static void udfdRun() {
661,832✔
1705
  fnInfo("start udfd event loop");
661,832✔
1706
  int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
661,832✔
1707
  if(code != 0) {
661,832✔
1708
    fnError("udfd event loop still has active handles or requests.");
661,832✔
1709
  }
1710
  fnInfo("udfd event loop stopped.");
661,832✔
1711

1712
  (void)uv_loop_close(global.loop);
661,832✔
1713

1714
  uv_walk(global.loop, udfdCloseWalkCb, NULL);
661,832✔
1715
  code = uv_run(global.loop, UV_RUN_DEFAULT);
661,832✔
1716
  if(code != 0) {
661,832✔
1717
    fnError("udfd event loop still has active handles or requests.");
×
1718
  }
1719
  (void)uv_loop_close(global.loop);
661,832✔
1720
}
661,832✔
1721

1722
int32_t udfdInitResidentFuncs() {
661,832✔
1723
  if (strlen(tsUdfdResFuncs) == 0) {
661,832✔
1724
    return TSDB_CODE_SUCCESS;
660,058✔
1725
  }
1726

1727
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
1,774✔
1728
  char *pSave = tsUdfdResFuncs;
1,774✔
1729
  char *token;
1730
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
5,322✔
1731
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
3,548✔
1732
    tstrncpy(func, token, TSDB_FUNC_NAME_LEN);
3,548✔
1733
    fnInfo("udfd add resident function %s", func);
3,548✔
1734
    if(taosArrayPush(global.residentFuncs, func) == NULL)
7,096✔
1735
    {
1736
      taosArrayDestroy(global.residentFuncs);
×
1737
      return terrno;
×
1738
    }
1739
  }
1740

1741
  return TSDB_CODE_SUCCESS;
1,774✔
1742
}
1743

1744
void udfdDeinitResidentFuncs() {
661,832✔
1745
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
665,380✔
1746
    char  *funcName = taosArrayGet(global.residentFuncs, i);
3,548✔
1747
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
3,548✔
1748
    if (udfInHash) {
3,548✔
1749
      SUdf   *udf = *udfInHash;
1,847✔
1750
      int32_t code = 0;
1,847✔
1751
      if (udf->scriptPlugin->udfDestroyFunc) {
1,847✔
1752
        code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
1,847✔
1753
        fnDebug("udfd %s destroy function returns %d", funcName, code);
1,847✔
1754
      }
1755
      if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
1,847✔
1756
      {
1757
        fnError("udfd remove resident function %s failed", funcName);
×
1758
      }
1759
      taosMemoryFree(udf);
1,847✔
1760
    }
1761
  }
1762
  taosHashCleanup(global.udfsHash);
661,832✔
1763
  taosArrayDestroy(global.residentFuncs);
661,832✔
1764
  fnInfo("udfd resident functions are deinit");
661,832✔
1765
}
661,832✔
1766

1767
int32_t udfdCreateUdfSourceDir() {
661,832✔
1768
  snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
661,832✔
1769
  int32_t code = taosMkDir(global.udfDataDir);
661,832✔
1770
  if (code != TSDB_CODE_SUCCESS) {
661,832✔
1771
    snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
×
1772
    code = taosMkDir(global.udfDataDir);
×
1773
  }
1774
  fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
661,832✔
1775

1776
  return code;
661,832✔
1777
}
1778

1779
void udfdDestroyUdfSourceDir() {
661,832✔
1780
  fnInfo("destory udf source directory %s", global.udfDataDir);
661,832✔
1781
  taosRemoveDir(global.udfDataDir);
661,832✔
1782
}
661,832✔
1783

1784
int main(int argc, char *argv[]) {
662,024✔
1785
  int  code = 0;
662,024✔
1786
  bool logInitialized = false;
662,024✔
1787
  bool cfgInitialized = false;
662,024✔
1788
  bool openClientRpcFinished = false;
662,024✔
1789
  bool residentFuncsInited = false;
662,024✔
1790
  bool udfSourceDirInited = false;
662,024✔
1791
  bool globalDataInited = false;
662,024✔
1792
  taosSetSkipKeyCheckMode();
662,024✔
1793

1794
  if (!taosCheckSystemIsLittleEnd()) {
662,024✔
1795
    (void)printf("failed to start since on non-little-end machines\n");
×
1796
    return -1;
×
1797
  }
1798

1799
  if (udfdParseArgs(argc, argv) != 0) {
662,024✔
1800
    (void)printf("failed to start since parse args error\n");
128✔
1801
    return -1;
128✔
1802
  }
1803

1804
  if (global.printVersion) {
661,896✔
1805
    udfdPrintVersion();
64✔
1806
    return 0;
64✔
1807
  }
1808

1809
  if (udfdInitLog() != 0) {
661,832✔
1810
    // ignore create log failed, because this error no matter
1811
    (void)printf("failed to init udfd log.");
×
1812
  } else {
1813
    logInitialized = true;  // log is initialized
661,832✔
1814
  }
1815

1816
  if ((code = taosPreLoadCfg(configDir, NULL, NULL, NULL, NULL, 0)) != 0) {
661,832✔
1817
    fnError("failed to start since pre load config error");
×
1818
    goto _exit;
×
1819
  }
1820

1821
  if ((code = dmGetEncryptKey()) != 0) {
661,832✔
1822
    fnError("failed to start since failed to get encrypt key");
×
1823
    goto _exit;
×
1824
  }
1825

1826
  if ((code = tryLoadCfgFromDataDir(tsCfg)) != 0) {
661,832✔
1827
    fnError("failed to start since try load config from data dir error");
×
1828
    goto _exit;
×
1829
  }
1830

1831
  if ((code = taosApplyCfg(configDir, NULL, NULL, NULL, NULL, 0)) != 0) {
661,832✔
1832
    fnError("failed to start since apply config error");
×
1833
    goto _exit;
×
1834
  }
1835
  cfgInitialized = true;  // cfg is initialized
661,832✔
1836
  fnInfo("udfd start with config file %s", configDir);
661,832✔
1837

1838
  if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
661,832✔
1839
    fnError("init ep set from cfg failed");
×
1840
    code = -3;
×
1841
    goto _exit;
×
1842
  }
1843
  fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
661,832✔
1844
  if (udfdOpenClientRpc() != 0) {
661,832✔
1845
    fnError("open rpc connection to mnode failed");
×
1846
    code = -4;
×
1847
    goto _exit;
×
1848
  }
1849
  fnInfo("udfd rpc client is opened");
661,832✔
1850
  openClientRpcFinished = true;  // rpc is opened
661,832✔
1851

1852
  if (udfdCreateUdfSourceDir() != 0) {
661,832✔
1853
    fnError("create udf source directory failed");
×
1854
    code = -5;
×
1855
    goto _exit;
×
1856
  }
1857
  udfSourceDirInited = true;  // udf source dir is created
661,832✔
1858
  fnInfo("udfd udf source directory is created");
661,832✔
1859

1860
  if (udfdGlobalDataInit() != 0) {
661,832✔
1861
    fnError("init global data failed");
×
1862
    code = -6;
×
1863
    goto _exit;
×
1864
  }
1865
  globalDataInited = true;  // global data is inited
661,832✔
1866
  fnInfo("udfd global data is inited");
661,832✔
1867

1868
  if (udfdUvInit() != 0) {
661,832✔
1869
    fnError("uv init failure");
×
1870
    code = -7;
×
1871
    goto _exit;
×
1872
  }
1873
  fnInfo("udfd uv is inited");
661,832✔
1874

1875
  if (udfdInitResidentFuncs() != 0) {
661,832✔
1876
    fnError("init resident functions failed");
×
1877
    code = -8;
×
1878
    goto _exit;
×
1879
  }
1880
  residentFuncsInited = true;  // resident functions are inited
661,832✔
1881
  fnInfo("udfd resident functions are inited");
661,832✔
1882

1883
  udfdRun();
661,832✔
1884
  fnInfo("udfd exit normally");
661,832✔
1885

1886
  removeListeningPipe();
661,832✔
1887

1888
_exit:
661,832✔
1889
  if (residentFuncsInited) {
661,832✔
1890
    udfdDeinitResidentFuncs();
661,832✔
1891
  }
1892
  udfdDeinitScriptPlugins();
661,832✔
1893
  if (globalDataInited) {
661,832✔
1894
    udfdGlobalDataDeinit();
661,832✔
1895
  }
1896
  if (udfSourceDirInited) {
661,832✔
1897
    udfdDestroyUdfSourceDir();
661,832✔
1898
  }
1899
  if (openClientRpcFinished) {
661,832✔
1900
    udfdCloseClientRpc();
661,832✔
1901
  }
1902
  if (cfgInitialized) {
661,832✔
1903
    taosCleanupCfg();
661,832✔
1904
  }
1905
  if (logInitialized) {
661,832✔
1906
    taosCloseLog();
661,832✔
1907
  }
1908

1909
  return code;
661,832✔
1910
}
1911
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc