• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.13
/source/libs/function/src/udfd.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifdef USE_UDF
17
// clang-format off
18
#include "uv.h"
19
#include "os.h"
20
#include "fnLog.h"
21
#include "thash.h"
22

23
#include "tudf.h"
24
#include "tudfInt.h"
25
#include "version.h"
26

27
#include "tdatablock.h"
28
#include "tdataformat.h"
29
#include "tglobal.h"
30
#include "tmsg.h"
31
#include "trpc.h"
32
#include "tmisce.h"
33
#include "tversion.h"
34
// clang-format on
35

36
#define UDFD_MAX_SCRIPT_PLUGINS 64
37
#define UDFD_MAX_SCRIPT_TYPE    1
38
#define UDFD_MAX_PLUGIN_FUNCS   9
39

40
typedef struct SUdfCPluginCtx {
41
  uv_lib_t lib;
42

43
  TUdfScalarProcFunc scalarProcFunc;
44

45
  TUdfAggStartFunc   aggStartFunc;
46
  TUdfAggProcessFunc aggProcFunc;
47
  TUdfAggFinishFunc  aggFinishFunc;
48

49
  TUdfInitFunc    initFunc;
50
  TUdfDestroyFunc destroyFunc;
51
} SUdfCPluginCtx;
52

53
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
2,168✔
54

55
int32_t udfdCPluginClose() { return 0; }
2,168✔
56

57
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
104,356✔
58
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
313,068✔
59
  char  initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
104,356✔
60
  char *initSuffix = "_init";
104,356✔
61
  snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
104,356✔
62
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
104,356✔
63

64
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
103,804✔
65
  char *destroySuffix = "_destroy";
103,804✔
66
  snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
103,804✔
67
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
103,804✔
68
  return 0;
103,804✔
69
}
70

71
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
12,523✔
72
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
37,569✔
73
  char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
12,523✔
74
  snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); 
12,523✔
75
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
12,523✔
76

77
  char  startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
12,523✔
78
  char *startSuffix = "_start";
12,523✔
79
  snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
12,523✔
80
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
12,523✔
81

82
  char  finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
11,902✔
83
  char *finishSuffix = "_finish";
11,902✔
84
  snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
11,902✔
85
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
11,902✔
86

87
  return 0;
11,902✔
88
}
89

90
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
104,356✔
91
  TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx);
313,068✔
92
  int32_t         err = 0;
104,356✔
93
  SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
104,356✔
94
  if (NULL == udfCtx) {
104,356✔
95
    return terrno;
×
96
  }
97
  err = uv_dlopen(udf->path, &udfCtx->lib);
104,356✔
98
  if (err != 0) {
104,356✔
99
    fnError("can not load library %s. error: %s, %s", udf->path, uv_strerror(err), udfCtx->lib.errmsg);
×
100
    taosMemoryFree(udfCtx);
×
101
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
102
  }
103
  const char *udfName = udf->name;
104,356✔
104

105
  err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
104,356✔
106
  if (err != 0) {
104,356✔
107
    fnError("can not load init/destroy functions. error: %d", err);
552✔
108
    err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
552✔
109
    goto _exit;
552✔
110
  }
111

112
  if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
103,804✔
113
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
91,281✔
114
    snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
91,281✔
115
    if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
91,281✔
116
      fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
×
117
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
118
      goto _exit;
×
119
    }
120
  } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
12,523✔
121
    err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
12,523✔
122
    if (err != 0) {
12,523✔
123
      fnError("can not load aggregation functions. error: %d", err);
621✔
124
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
621✔
125
      goto _exit;
621✔
126
    }
127
  }
128

129
  if (udfCtx->initFunc) {
103,183✔
130
    err = (udfCtx->initFunc)();
103,183✔
131
    if (err != 0) {
103,183✔
132
      fnError("udf init function failed. error: %d", err);
×
133
      goto _exit;
×
134
    }
135
  }
136
  *pUdfCtx = udfCtx;
103,183✔
137
  return 0;
103,183✔
138
_exit:
1,173✔
139
  uv_dlclose(&udfCtx->lib);
1,173✔
140
  taosMemoryFree(udfCtx);
1,173✔
141
  return err;
1,173✔
142
}
143

144
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
104,356✔
145
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx);
207,539✔
146
  SUdfCPluginCtx *ctx = udfCtx;
103,183✔
147
  int32_t         code = 0;
103,183✔
148
  if (ctx->destroyFunc) {
103,183✔
149
    code = (ctx->destroyFunc)();
103,183✔
150
  }
151
  uv_dlclose(&ctx->lib);
103,183✔
152
  taosMemoryFree(ctx);
103,183✔
153
  return code;
103,183✔
154
}
155

156
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
1,879,515✔
157
  TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx);
7,518,914✔
158
  SUdfCPluginCtx *ctx = udfCtx;
1,879,515✔
159
  if (ctx->scalarProcFunc) {
1,879,515✔
160
    return ctx->scalarProcFunc(block, resultCol);
1,879,820✔
161
  } else {
162
    fnError("udfd c plugin scalar proc not implemented");
×
163
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
164
  }
165
}
166

167
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
188,125✔
168
  TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx);
564,375✔
169
  SUdfCPluginCtx *ctx = udfCtx;
188,125✔
170
  if (ctx->aggStartFunc) {
188,125✔
171
    return ctx->aggStartFunc(buf);
188,125✔
172
  } else {
173
    fnError("udfd c plugin aggregation start not implemented");
×
174
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
175
  }
176
  return 0;
177
}
178

179
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
444,098✔
180
  TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx);
2,220,490✔
181
  SUdfCPluginCtx *ctx = udfCtx;
444,098✔
182
  if (ctx->aggProcFunc) {
444,098✔
183
    return ctx->aggProcFunc(block, interBuf, newInterBuf);
444,098✔
184
  } else {
185
    fnError("udfd c plugin aggregation process not implemented");
×
186
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
187
  }
188
}
189

190
// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
191
//                                void *udfCtx) {
192
//   TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
193
//   SUdfCPluginCtx *ctx = udfCtx;
194
//   if (ctx->aggMergeFunc) {
195
//     return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
196
//   } else {
197
//     fnError("udfd c plugin aggregation merge not implemented");
198
//     return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
199
//   }
200
// }
201

202
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
186,814✔
203
  TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
747,256✔
204
  SUdfCPluginCtx *ctx = udfCtx;
186,814✔
205
  if (ctx->aggFinishFunc) {
186,814✔
206
    return ctx->aggFinishFunc(buf, resultData);
186,814✔
207
  } else {
208
    fnError("udfd c plugin aggregation finish not implemented");
×
209
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
210
  }
211
  return 0;
212
}
213

214
// for c, the function pointer are filled directly and libloaded = true;
215
// for others, dlopen/dlsym to find function pointers
216
typedef struct SUdfScriptPlugin {
217
  int8_t scriptType;
218

219
  char     libPath[PATH_MAX];
220
  bool     libLoaded;
221
  uv_lib_t lib;
222

223
  TScriptUdfScalarProcFunc udfScalarProcFunc;
224
  TScriptUdfAggStartFunc   udfAggStartFunc;
225
  TScriptUdfAggProcessFunc udfAggProcFunc;
226
  TScriptUdfAggMergeFunc   udfAggMergeFunc;
227
  TScriptUdfAggFinishFunc  udfAggFinishFunc;
228

229
  TScriptUdfInitFunc    udfInitFunc;
230
  TScriptUdfDestoryFunc udfDestroyFunc;
231

232
  TScriptOpenFunc  openFunc;
233
  TScriptCloseFunc closeFunc;
234
} SUdfScriptPlugin;
235

236
typedef struct SUdfdContext {
237
  uv_loop_t  *loop;
238
  uv_pipe_t   ctrlPipe;
239
  uv_signal_t intrSignal;
240
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
241
  uv_pipe_t   listeningPipe;
242

243
  void     *clientRpc;
244
  SCorEpSet mgmtEp;
245

246
  uv_mutex_t udfsMutex;
247
  SHashObj  *udfsHash;
248

249
  uv_mutex_t        scriptPluginsMutex;
250
  SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
251

252
  SArray *residentFuncs;
253

254
  char udfDataDir[PATH_MAX];
255
  bool printVersion;
256
} SUdfdContext;
257

258
SUdfdContext global;
259

260
struct SUdfdUvConn;
261
struct SUvUdfWork;
262

263
typedef struct SUdfdUvConn {
264
  uv_stream_t *client;
265
  char        *inputBuf;
266
  int32_t      inputLen;
267
  int32_t      inputCap;
268
  int32_t      inputTotal;
269

270
  struct SUvUdfWork *pWorkList;  // head of work list
271
} SUdfdUvConn;
272

273
typedef struct SUvUdfWork {
274
  SUdfdUvConn *conn;
275
  uv_buf_t     input;
276
  uv_buf_t     output;
277

278
  struct SUvUdfWork *pWorkNext;
279
} SUvUdfWork;
280

281
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
282

283
typedef struct SUdf {
284
  char    name[TSDB_FUNC_NAME_LEN + 1];
285
  int32_t version;
286
  int64_t createdTime;
287

288
  int8_t  funcType;
289
  int8_t  scriptType;
290
  int8_t  outputType;
291
  int32_t outputLen;
292
  int32_t bufSize;
293

294
  char path[PATH_MAX];
295

296
  int32_t    refCount;
297
  EUdfState  state;
298
  uv_mutex_t lock;
299
  uv_cond_t  condReady;
300
  bool       resident;
301

302
  SUdfScriptPlugin *scriptPlugin;
303
  void             *scriptUdfCtx;
304

305
  int64_t lastFetchTime;  // last fetch time in milliseconds
306
  bool    expired;
307
} SUdf;
308

309
typedef struct SUdfcFuncHandle {
310
  SUdf *udf;
311
} SUdfcFuncHandle;
312

313
typedef enum EUdfdRpcReqRspType {
314
  UDFD_RPC_MNODE_CONNECT = 0,
315
  UDFD_RPC_RETRIVE_FUNC,
316
} EUdfdRpcReqRspType;
317

318
typedef struct SUdfdRpcSendRecvInfo {
319
  EUdfdRpcReqRspType rpcType;
320
  int32_t            code;
321
  void              *param;
322
  uv_sem_t           resultSem;
323
} SUdfdRpcSendRecvInfo;
324

325
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
326
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
327
static int32_t udfdConnectToMnode();
328
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
329
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
330
static int32_t udfdOpenClientRpc();
331
static void    udfdCloseClientRpc();
332

333
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
334
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
335
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
336
static void udfdProcessRequest(uv_work_t *req);
337
static void udfdOnWrite(uv_write_t *req, int status);
338
static void udfdSendResponse(uv_work_t *work, int status);
339
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
340
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
341
static void udfdHandleRequest(SUdfdUvConn *conn);
342
static void udfdPipeCloseCb(uv_handle_t *pipe);
343
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
241,270✔
344
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
345
static void udfdOnNewConnection(uv_stream_t *server, int status);
346

347
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
348
static void    removeListeningPipe();
349

350
static void    udfdPrintVersion();
351
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
352
static int32_t udfdInitLog();
353

354
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
355
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
356
static int32_t udfdUvInit();
357
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
358
static void    udfdRun();
359
static void    udfdConnectMnodeThreadFunc(void *args);
360

361
int32_t udfdNewUdf(SUdf **pUdf,  const char *udfName);
362
void  udfdGetFuncBodyPath(const SUdf *udf, char *path);
363

364
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
2,168✔
365
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
4,336✔
366
  plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
2,168✔
367
  plugin->openFunc = udfdCPluginOpen;
2,168✔
368
  plugin->closeFunc = udfdCPluginClose;
2,168✔
369
  plugin->udfInitFunc = udfdCPluginUdfInit;
2,168✔
370
  plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
2,168✔
371
  plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
2,168✔
372
  plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
2,168✔
373
  plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
2,168✔
374
  // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
375
  plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
2,168✔
376

377
  SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
2,168✔
378
  int32_t           err = plugin->openFunc(items, 1);
2,168✔
379
  if (err != 0) return err;
2,168✔
380
  return 0;
2,168✔
381
}
382

383
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
69✔
384
  TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func);
345✔
385
  int err = uv_dlopen(libPath, pLib);
69✔
386
  if (err != 0) {
69✔
387
    fnError("can not load library %s. error: %s, %s", libPath, uv_strerror(err), pLib->errmsg);
×
388
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
389
  }
390

391
  for (int i = 0; i < numOfFuncs; ++i) {
690✔
392
    err = uv_dlsym(pLib, funcName[i], func[i]);
621✔
393
    if (err != 0) {
621✔
394
      fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
×
395
    }
396
  }
397
  return 0;
69✔
398
}
399

400
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
69✔
401
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
138✔
402
  plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
69✔
403
  // todo: windows support
404
  snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
69✔
405
  plugin->libLoaded = false;
69✔
406
  const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen",         "pyClose",         "pyUdfInit",
69✔
407
                                                 "pyUdfDestroy",   "pyUdfScalarProc", "pyUdfAggStart",
408
                                                 "pyUdfAggFinish", "pyUdfAggProc",    "pyUdfAggMerge"};
409
  void      **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
69✔
410
           (void **)&plugin->openFunc,         (void **)&plugin->closeFunc,         (void **)&plugin->udfInitFunc,
69✔
411
           (void **)&plugin->udfDestroyFunc,   (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
69✔
412
           (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc,    (void **)&plugin->udfAggMergeFunc};
69✔
413
  int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
69✔
414
  if (err != 0) {
69✔
415
    fnError("can not load python plugin. lib path %s", plugin->libPath);
×
416
    return err;
×
417
  }
418

419
  if (plugin->openFunc) {
69✔
420
    int16_t lenPythonPath =
69✔
421
        strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1;  // global.udfDataDir:tsUdfdLdLibPath
69✔
422
    char *pythonPath = taosMemoryMalloc(lenPythonPath);
69✔
423
    if(pythonPath == NULL) {
69✔
424
      uv_dlclose(&plugin->lib);
×
425
      return terrno;
×
426
    }
427
#ifdef WINDOWS
428
    snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
429
#else
430
    snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
69✔
431
#endif
432
    SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
69✔
433
    err = plugin->openFunc(items, 2);
69✔
434
    taosMemoryFree(pythonPath);
69✔
435
  }
436
  if (err != 0) {
69✔
437
    fnError("udf script python plugin open func failed. error: %d", err);
×
438
    uv_dlclose(&plugin->lib);
×
439
    return err;
×
440
  }
441
  plugin->libLoaded = true;
69✔
442

443
  return 0;
69✔
444
}
445

446
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
2,168✔
447
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
4,336✔
448
  if (plugin->closeFunc) {
2,168✔
449
    if (plugin->closeFunc() != 0) {
2,168✔
450
      fnError("udf script c plugin close func failed.line:%d", __LINE__);
×
451
    }
452
  }
453
  plugin->openFunc = NULL;
2,168✔
454
  plugin->closeFunc = NULL;
2,168✔
455
  plugin->udfInitFunc = NULL;
2,168✔
456
  plugin->udfDestroyFunc = NULL;
2,168✔
457
  plugin->udfScalarProcFunc = NULL;
2,168✔
458
  plugin->udfAggStartFunc = NULL;
2,168✔
459
  plugin->udfAggProcFunc = NULL;
2,168✔
460
  plugin->udfAggMergeFunc = NULL;
2,168✔
461
  plugin->udfAggFinishFunc = NULL;
2,168✔
462
  return;
2,168✔
463
}
464

465
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
69✔
466
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
138✔
467
  if (plugin->closeFunc) {
69✔
468
    if (plugin->closeFunc() != 0) {
69✔
469
      fnError("udf script python plugin close func failed.line:%d", __LINE__);
×
470
    }
471
  }
472
  uv_dlclose(&plugin->lib);
69✔
473
  if (plugin->libLoaded) {
69✔
474
    plugin->libLoaded = false;
69✔
475
  }
476
  plugin->openFunc = NULL;
69✔
477
  plugin->closeFunc = NULL;
69✔
478
  plugin->udfInitFunc = NULL;
69✔
479
  plugin->udfDestroyFunc = NULL;
69✔
480
  plugin->udfScalarProcFunc = NULL;
69✔
481
  plugin->udfAggStartFunc = NULL;
69✔
482
  plugin->udfAggProcFunc = NULL;
69✔
483
  plugin->udfAggMergeFunc = NULL;
69✔
484
  plugin->udfAggFinishFunc = NULL;
69✔
485
}
486

487
int32_t udfdInitScriptPlugin(int8_t scriptType) {
2,237✔
488
  SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
2,237✔
489
  if (plugin == NULL) {
2,237✔
490
    return terrno;
×
491
  }
492
  int32_t err = 0;
2,237✔
493
  switch (scriptType) {
2,237✔
494
    case TSDB_FUNC_SCRIPT_BIN_LIB:
2,168✔
495
      err = udfdInitializeCPlugin(plugin);
2,168✔
496
      if (err != 0) {
2,168✔
497
        fnError("udf script c plugin init failed. error: %d", err);
×
498
        taosMemoryFree(plugin);
×
499
        return err;
×
500
      }
501
      break;
2,168✔
502
    case TSDB_FUNC_SCRIPT_PYTHON: {
69✔
503
      err = udfdInitializePythonPlugin(plugin);
69✔
504
      if (err != 0) {
69✔
505
        fnError("udf script python plugin init failed. error: %d", err);
×
506
        taosMemoryFree(plugin);
×
507
        return err;
×
508
      }
509
      break;
69✔
510
    }
511
    default:
×
512
      fnError("udf script type %d not supported", scriptType);
×
513
      taosMemoryFree(plugin);
×
514
      return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
515
  }
516

517
  global.scriptPlugins[scriptType] = plugin;
2,237✔
518
  return TSDB_CODE_SUCCESS;
2,237✔
519
}
520

521
void udfdDeinitScriptPlugins() {
578,758✔
522
  SUdfScriptPlugin *plugin = NULL;
578,758✔
523
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
578,758✔
524
  if (plugin != NULL) {
578,758✔
525
    udfdDeinitPythonPlugin(plugin);
69✔
526
    taosMemoryFree(plugin);
69✔
527
    global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = NULL;
69✔
528
  }
529

530
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
578,758✔
531
  if (plugin != NULL) {
578,758✔
532
    udfdDeinitCPlugin(plugin);
2,168✔
533
    taosMemoryFree(plugin);
2,168✔
534
    global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = NULL;
2,168✔
535
  }
536
  return;
578,758✔
537
}
538

539
void udfdProcessRequest(uv_work_t *req) {
3,252,663✔
540
  TAOS_UDF_CHECK_PTR_RVOID(req);
6,505,326✔
541
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
3,252,663✔
542
  if (uvUdf == NULL) {
3,252,663✔
543
    fnError("udf work is NULL");
×
544
    return;
×
545
  }
546
  SUdfRequest request = {0};
3,252,663✔
547
  if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
3,252,663✔
548
  {
UNCOV
549
    taosMemoryFreeClear(uvUdf->input.base);
×
UNCOV
550
    fnError("udf request decode failed");
×
551
    return;
×
552
  }
553

554
  switch (request.type) {
3,251,862✔
555
    case UDF_TASK_SETUP: {
241,270✔
556
      udfdProcessSetupRequest(uvUdf, &request);
241,270✔
557
      break;
241,270✔
558
    }
559

560
    case UDF_TASK_CALL: {
2,770,495✔
561
      udfdProcessCallRequest(uvUdf, &request);
2,770,495✔
562
      break;
2,770,991✔
563
    }
564
    case UDF_TASK_TEARDOWN: {
240,097✔
565
      udfdProcessTeardownRequest(uvUdf, &request);
240,097✔
566
      break;
240,097✔
567
    }
568
    default: {
×
569
      break;
×
570
    }
571
  }
572
}
573

574
static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
120,571✔
575
  udfInfo->bufSize = udf->bufSize;
120,571✔
576
  if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
120,571✔
577
    udfInfo->funcType = UDF_FUNC_TYPE_AGG;
16,387✔
578
  } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
104,184✔
579
    udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
104,184✔
580
  }
581
  udfInfo->name = udf->name;
120,571✔
582
  udfInfo->version = udf->version;
120,571✔
583
  udfInfo->createdTime = udf->createdTime;
120,571✔
584
  udfInfo->outputLen = udf->outputLen;
120,571✔
585
  udfInfo->outputType = udf->outputType;
120,571✔
586
  udfInfo->path = udf->path;
120,571✔
587
  udfInfo->scriptType = udf->scriptType;
120,571✔
588
}
120,571✔
589

590
static int32_t udfdInitUdf(char *udfName, SUdf *udf) {
120,571✔
591
  TAOS_UDF_CHECK_PTR_RCODE(udfName, udf);
361,713✔
592
  int32_t err = 0;
120,571✔
593
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
120,571✔
594
  if (err != 0) {
120,571✔
595
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
×
596
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
597
  }
598
  if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
120,571✔
599
    fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
×
600
    return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
601
  }
602

603
  uv_mutex_lock(&global.scriptPluginsMutex);
120,571✔
604
  SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
120,571✔
605
  if (scriptPlugin == NULL) {
120,571✔
606
    err = udfdInitScriptPlugin(udf->scriptType);
2,237✔
607
    if (err != 0) {
2,237✔
608
      fnError("udf name %s init script plugin failed. error %d", udfName, err);
×
609
      uv_mutex_unlock(&global.scriptPluginsMutex);
×
610
      return err;
×
611
    }
612
  }
613
  uv_mutex_unlock(&global.scriptPluginsMutex);
120,571✔
614
  udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
120,571✔
615

616
  SScriptUdfInfo info = {0};
120,571✔
617
  convertUdf2UdfInfo(udf, &info);
120,571✔
618
  err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
120,571✔
619
  if (err != 0) {
120,571✔
620
    fnError("udf name %s init failed. error %d", udfName, err);
1,173✔
621
    return err;
1,173✔
622
  }
623

624
  fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
119,398✔
625
  return 0;
119,398✔
626
}
627

628
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
120,571✔
629
  TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName);
361,713✔
630
  SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
120,571✔
631
  if (NULL == udfNew) {
120,571✔
632
    return terrno;
×
633
  }
634
  udfNew->refCount = 1;
120,571✔
635
  udfNew->lastFetchTime = taosGetTimestampMs();
120,571✔
636
  tstrncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
120,571✔
637

638
  udfNew->state = UDF_STATE_INIT;
120,571✔
639
  if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
120,571✔
640
  if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
120,571✔
641

642
  udfNew->resident = false;
120,571✔
643
  udfNew->expired = false;
120,571✔
644
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
122,192✔
645
    char *funcName = taosArrayGet(global.residentFuncs, i);
4,086✔
646
    if (strcmp(udfName, funcName) == 0) {
4,086✔
647
      udfNew->resident = true;
2,465✔
648
      break;
2,465✔
649
    }
650
  }
651
  *pUdf =  udfNew;
120,571✔
652

653
  fnTrace("udf new succeeded. name %s(%p)", udfNew->name, udfNew);
120,571✔
654

655
  return 0;
120,571✔
656
}
657

658
void udfdFreeUdf(void *pData) {
118,106✔
659
  SUdf *pSudf = (SUdf *)pData;
118,106✔
660
  if (pSudf == NULL) {
118,106✔
661
    return;
×
662
  }
663

664
  if (pSudf->scriptPlugin != NULL) {
118,106✔
665
    if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
118,106✔
666
      fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name);
1,173✔
667
    }
668
  }
669

670
  uv_mutex_destroy(&pSudf->lock);
118,106✔
671
  uv_cond_destroy(&pSudf->condReady);
118,106✔
672

673
  fnTrace("udf free succeeded. name %s(%p)", pSudf->name, pSudf);
118,106✔
674

675
  taosMemoryFree(pSudf);
118,106✔
676
}
677

678
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
241,270✔
679
  TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName);
723,810✔
680
  uv_mutex_lock(&global.udfsMutex);
241,270✔
681
  SUdf  **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
241,270✔
682
  int64_t currTime = taosGetTimestampMs();
241,270✔
683
  bool    expired = false;
241,270✔
684
  if (pUdfHash) {
241,270✔
685
    expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;  // 10s
120,699✔
686
    if (!expired) {
120,699✔
687
      ++(*pUdfHash)->refCount;
120,699✔
688
      *ppUdf = *pUdfHash;
120,699✔
689
      uv_mutex_unlock(&global.udfsMutex);
120,699✔
690
      fnInfo("udfd reuse existing udf. udf  %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
120,699✔
691
             (*ppUdf)->createdTime);
692
      return 0;
120,699✔
693
    } else {
UNCOV
694
      (*pUdfHash)->expired = true;
×
UNCOV
695
      fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
×
696
             (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
UNCOV
697
      if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
×
UNCOV
698
        fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName);
×
699
      }
700
    }
701
  }
702

703
  int32_t code = udfdNewUdf(ppUdf, udfName);
120,571✔
704
  if(code != 0) {
120,571✔
705
    uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
706
    return code;
×
707
  }
708

709
  if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
120,571✔
UNCOV
710
    uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
711
    return code;
×
712
  }
713
  uv_mutex_unlock(&global.udfsMutex);
120,571✔
714

715
  return 0;
120,571✔
716
}
717

718
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
241,270✔
719
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
723,810✔
720
  // TODO: tracable id from client. connect, setup, call, teardown
721
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
241,270✔
722

723
  SUdfSetupRequest *setup = &request->setup;
241,270✔
724
  int32_t           code = TSDB_CODE_SUCCESS;
241,270✔
725
  SUdf *udf = NULL;
241,270✔
726

727
  code = udfdGetOrCreateUdf(&udf, setup->udfName);
241,270✔
728
  if(code != 0) {
241,270✔
UNCOV
729
    fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
×
UNCOV
730
    goto _send;
×
731
  }
732
  uv_mutex_lock(&udf->lock);
241,270✔
733
  if (udf->state == UDF_STATE_INIT) {
241,270✔
734
    udf->state = UDF_STATE_LOADING;
120,571✔
735
    code = udfdInitUdf(setup->udfName, udf);
120,571✔
736
    if (code == 0) {
120,571✔
737
      udf->state = UDF_STATE_READY;
119,398✔
738
    } else {
739
      udf->state = UDF_STATE_INIT;
1,173✔
740
    }
741
    uv_cond_broadcast(&udf->condReady);
120,571✔
742
    uv_mutex_unlock(&udf->lock);
120,571✔
743
  } else {
744
    while (udf->state == UDF_STATE_LOADING) {
120,699✔
UNCOV
745
      uv_cond_wait(&udf->condReady, &udf->lock);
×
746
    }
747
    uv_mutex_unlock(&udf->lock);
120,699✔
748
  }
749

750
  SUdfcFuncHandle *handle = NULL;
241,270✔
751
  if (!code) {
241,270✔
752
    handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
240,097✔
753
    if (handle == NULL) {
240,097✔
UNCOV
754
      fnError("udfdProcessSetupRequest: malloc failed.");
×
UNCOV
755
      code = terrno;
×
756
    } else {
757
      handle->udf = udf;
240,097✔
758
    }
759
  }
760

761
_send:
137,292✔
762
  ;
763
  SUdfResponse rsp;
136,119✔
764
  rsp.seqNum = request->seqNum;
241,270✔
765
  rsp.type = request->type;
241,270✔
766
  rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
241,270✔
767
  rsp.setupRsp.udfHandle = (int64_t)(handle);
241,270✔
768
  rsp.setupRsp.outputType = udf->outputType;
241,270✔
769
  rsp.setupRsp.bytes = udf->outputLen;
241,270✔
770
  rsp.setupRsp.bufSize = udf->bufSize;
241,270✔
771

772
  int32_t len = encodeUdfResponse(NULL, &rsp);
241,270✔
773
  if(len < 0) {
241,270✔
UNCOV
774
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
UNCOV
775
    code = terrno;
×
776
    goto _exit;
×
777
  }
778
  rsp.msgLen = len;
241,270✔
779
  void *bufBegin = taosMemoryMalloc(len);
241,270✔
780
  if(bufBegin == NULL) {
241,270✔
781
    fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
×
782
    code = terrno;
×
783
    goto _exit;
×
784
  }
785

786
  void *buf = bufBegin;
241,270✔
787
  if(encodeUdfResponse(&buf, &rsp) < 0) {
241,270✔
UNCOV
788
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
UNCOV
789
    taosMemoryFree(bufBegin);
×
UNCOV
790
    code = terrno;
×
UNCOV
791
    goto _exit;
×
792
  }
793
  
794
  uvUdf->output = uv_buf_init(bufBegin, len);
241,270✔
795

796
  taosMemoryFreeClear(uvUdf->input.base);
241,270✔
797

798
_exit:
136,119✔
799
  if (code) {
241,270✔
800
    uv_mutex_lock(&global.udfsMutex);
1,173✔
801
    int32_t removeCode = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
1,173✔
802
    if (removeCode) {
1,173✔
803
      fnError("udf name %s remove from hash failed/setup, err:%0x %s", udf->name, removeCode, tstrerror(removeCode));
×
804
    }
805
    uv_mutex_unlock(&global.udfsMutex);
1,173✔
806

807
    fnError("udf free: setup failed. name %s(%p) err:%0x %s", udf->name, udf, code, tstrerror(code));
1,173✔
808

809
    udfdFreeUdf(udf);
1,173✔
810
  }
811
}
812

813
static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
1,903,719✔
814
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
1,903,719✔
UNCOV
815
    return TSDB_CODE_SUCCESS;
×
816
  }
817
  if (output->colData.numOfRows != block->info.rows) {
1,903,719✔
UNCOV
818
    fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
×
UNCOV
819
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
820
  }
821

822
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_BYROW) {
1,903,719✔
UNCOV
823
    for (int32_t i = 0; i < output->colData.numOfRows; ++i) {
×
824
      if (!udfColDataIsNull(output, i)) {
×
UNCOV
825
        if (IS_VAR_DATA_TYPE(output->colMeta.type)) {
×
UNCOV
826
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.payload != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
×
827
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.varOffsets[i] >= 0 &&
×
828
                                       output->colData.varLenCol.varOffsets[i] < output->colData.varLenCol.payloadLen,
829
                                   TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
830
        } else {
UNCOV
831
          TAOS_UDF_CHECK_CONDITION(
×
832
              output->colMeta.bytes * output->colData.numOfRows <= output->colData.fixLenCol.dataLen,
833
              TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
UNCOV
834
          break;
×
835
        }
836
      }
837
    }
838
  }
839

840
  return TSDB_CODE_SUCCESS;
1,903,719✔
841
}
842

843
static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
483,221✔
844
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
483,221✔
UNCOV
845
    return TSDB_CODE_SUCCESS;
×
846
  }
847
  if (output->numOfResult != 1 && output->numOfResult != 0) {
483,221✔
UNCOV
848
    fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
×
UNCOV
849
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
850
  }
851
  TAOS_UDF_CHECK_CONDITION(output->buf != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
483,221✔
852
  TAOS_UDF_CHECK_CONDITION(output->bufLen > 0, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
483,221✔
853
  return TSDB_CODE_SUCCESS;
483,221✔
854
}
855

856
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
2,770,434✔
857
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
8,311,294✔
858
  SUdfCallRequest *call = &request->call;
2,770,434✔
859
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
2,770,434✔
860
          request->seqNum);
861
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
2,771,166✔
862
  SUdf             *udf = handle->udf;
2,771,166✔
863
  SUdfResponse      response = {0};
2,771,166✔
864
  SUdfResponse     *rsp = &response;
2,771,166✔
865
  SUdfCallResponse *subRsp = &rsp->callRsp;
2,771,166✔
866

867
  int32_t code = TSDB_CODE_SUCCESS;
2,771,166✔
868
  switch (call->callType) {
2,771,166✔
869
    case TSDB_UDF_CALL_SCALA_PROC: {
1,907,072✔
870
      SUdfColumn output = {0};
1,907,072✔
871
      output.colMeta.bytes = udf->outputLen;
1,907,072✔
872
      output.colMeta.type = udf->outputType;
1,907,072✔
873
      output.colMeta.precision = 0;
1,907,072✔
874
      output.colMeta.scale = 0;
1,907,072✔
875
      if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
3,814,144✔
876
        SUdfDataBlock input = {0};
1,907,255✔
877
        code = convertDataBlockToUdfDataBlock(&call->block, &input);
1,907,255✔
878
        if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
1,905,372✔
879
        freeUdfDataDataBlock(&input);
1,904,823✔
880
        if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output);
1,906,592✔
881
        if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
1,904,762✔
882
      }
883
      freeUdfColumn(&output);
1,906,531✔
884
      break;
1,907,019✔
885
    }
886
    case TSDB_UDF_CALL_AGG_INIT: {
191,092✔
887
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
191,092✔
888
      if (outBuf.buf != NULL) {
191,092✔
889
        code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
191,092✔
890
      } else {
UNCOV
891
        code = terrno;
×
892
      }
893
      subRsp->resultBuf = outBuf;
191,092✔
894
      break;
191,092✔
895
    }
896
    case TSDB_UDF_CALL_AGG_PROC: {
483,221✔
897
      SUdfDataBlock input = {0};
483,221✔
898
      if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
483,221✔
899
        SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
483,221✔
900
        if (outBuf.buf != NULL) {
483,221✔
901
          code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
483,221✔
902
          freeUdfInterBuf(&call->interBuf);
483,221✔
903
          if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf);
483,221✔
904
          subRsp->resultBuf = outBuf;
483,221✔
905
        } else {
UNCOV
906
          code = terrno;
×
907
        }
908
      }
909
      freeUdfDataDataBlock(&input);
483,221✔
910

911
      break;
483,221✔
912
    }
913
    // case TSDB_UDF_CALL_AGG_MERGE: {
914
    //   SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
915
    //   if (outBuf.buf != NULL) {
916
    //     code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
917
    //     freeUdfInterBuf(&call->interBuf);
918
    //     freeUdfInterBuf(&call->interBuf2);
919
    //     subRsp->resultBuf = outBuf;
920
    //   } else {
921
    //     code = terrno;
922
    //   }
923
    // 
924
    //   break;
925
    // }
926
    case TSDB_UDF_CALL_AGG_FIN: {
189,781✔
927
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
189,781✔
928
      if (outBuf.buf != NULL) {
189,781✔
929
        code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
189,781✔
930
        freeUdfInterBuf(&call->interBuf);
189,781✔
931
        subRsp->resultBuf = outBuf;
189,781✔
932
      } else {
UNCOV
933
        code = terrno;
×
934
      }
935

936
      break;
189,781✔
937
    }
UNCOV
938
    default:
×
939
      break;
×
940
  }
941

942
  rsp->seqNum = request->seqNum;
2,771,113✔
943
  rsp->type = request->type;
2,771,113✔
944
  rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
2,771,113✔
945
  subRsp->callType = call->callType;
2,771,113✔
946

947
  int32_t len = encodeUdfResponse(NULL, rsp);
2,771,113✔
948
  if(len < 0) {
2,767,087✔
UNCOV
949
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
UNCOV
950
    goto _exit;
×
951
  }
952
  rsp->msgLen = len;
2,767,087✔
953
  void *bufBegin = taosMemoryMalloc(len);
2,767,087✔
954
  if (bufBegin == NULL) {
2,770,198✔
UNCOV
955
    fnError("udfdProcessCallRequest: malloc failed. len %d", len);
×
UNCOV
956
    goto _exit;
×
957
  }
958
  void *buf = bufBegin;
2,770,198✔
959
  if(encodeUdfResponse(&buf, rsp) < 0) {
2,770,198✔
UNCOV
960
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
UNCOV
961
    taosMemoryFree(bufBegin);
×
UNCOV
962
    goto _exit;
×
963
  }
964

965
  uvUdf->output = uv_buf_init(bufBegin, len);
2,768,185✔
966

967
_exit:
2,770,503✔
968
  switch (call->callType) {
2,770,503✔
969
    case TSDB_UDF_CALL_SCALA_PROC: {
1,906,409✔
970
      blockDataFreeRes(&call->block);
1,906,409✔
971
      blockDataFreeRes(&subRsp->resultData);
1,905,738✔
972
      break;
1,907,080✔
973
    }
974
    case TSDB_UDF_CALL_AGG_INIT: {
191,092✔
975
      freeUdfInterBuf(&subRsp->resultBuf);
191,092✔
976
      break;
191,092✔
977
    }
978
    case TSDB_UDF_CALL_AGG_PROC: {
483,221✔
979
      blockDataFreeRes(&call->block);
483,221✔
980
      freeUdfInterBuf(&subRsp->resultBuf);
483,221✔
981
      break;
483,099✔
982
    }
983
    // case TSDB_UDF_CALL_AGG_MERGE: {
984
    //   freeUdfInterBuf(&subRsp->resultBuf);
985
    //   break;
986
    // }
987
    case TSDB_UDF_CALL_AGG_FIN: {
189,781✔
988
      freeUdfInterBuf(&subRsp->resultBuf);
189,781✔
989
      break;
189,781✔
990
    }
UNCOV
991
    default:
×
UNCOV
992
      break;
×
993
  }
994

995
  taosMemoryFreeClear(uvUdf->input.base);
2,771,052✔
996
  return;
2,771,052✔
997
}
998

999
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
240,097✔
1000
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
720,291✔
1001
  SUdfTeardownRequest *teardown = &request->teardown;
240,097✔
1002
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
240,097✔
1003
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
240,097✔
1004
  SUdf            *udf = handle->udf;
240,097✔
1005
  bool             unloadUdf = false;
240,097✔
1006
  int32_t          code = TSDB_CODE_SUCCESS;
240,097✔
1007

1008
  uv_mutex_lock(&global.udfsMutex);
240,097✔
1009
  udf->refCount--;
240,097✔
1010
  if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
240,097✔
1011
    unloadUdf = true;
116,933✔
1012
    code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
116,933✔
1013
    if (code != 0) {
116,933✔
UNCOV
1014
      fnError("udf name %s remove from hash failed, err:%0x %s", udf->name, code, tstrerror(code));
×
UNCOV
1015
      uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
1016
      goto _send;
×
1017
    }
1018
  }
1019
  uv_mutex_unlock(&global.udfsMutex);
240,097✔
1020
  if (unloadUdf) {
240,097✔
1021
    fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
116,933✔
1022

1023
    udfdFreeUdf(udf);
116,933✔
1024
  }
1025

1026
_send:
123,164✔
1027
  taosMemoryFree(handle);
240,097✔
1028
  SUdfResponse  response = {0};
240,097✔
1029
  SUdfResponse *rsp = &response;
240,097✔
1030
  rsp->seqNum = request->seqNum;
240,097✔
1031
  rsp->type = request->type;
240,097✔
1032
  rsp->code = code;
240,097✔
1033
  int32_t len = encodeUdfResponse(NULL, rsp);
240,097✔
1034
  if (len < 0) {
240,097✔
UNCOV
1035
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
UNCOV
1036
    return;
×
1037
  }
1038
  rsp->msgLen = len;
240,097✔
1039
  void *bufBegin = taosMemoryMalloc(len);
240,097✔
1040
  if(bufBegin == NULL) {
240,097✔
UNCOV
1041
    fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
×
UNCOV
1042
    return;
×
1043
  }
1044
  void *buf = bufBegin;
240,097✔
1045
  if (encodeUdfResponse(&buf, rsp) < 0) {
240,097✔
UNCOV
1046
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
UNCOV
1047
    taosMemoryFree(bufBegin);
×
UNCOV
1048
    return;
×
1049
  }
1050
  uvUdf->output = uv_buf_init(bufBegin, len);
240,097✔
1051

1052
  taosMemoryFree(uvUdf->input.base);
240,097✔
1053
  return;
240,097✔
1054
}
1055

1056
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
120,571✔
1057
  TAOS_UDF_CHECK_PTR_RVOID(udf, path);
361,713✔
1058
  if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
120,571✔
1059
#ifdef WINDOWS
1060
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1061
#else
1062
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
104,356✔
1063
             udf->createdTime);
104,356✔
1064
#endif
1065
  } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
16,215✔
1066
#ifdef WINDOWS
1067
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1068
#else
1069
    snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
16,215✔
1070
#endif
1071
  } else {
1072
#ifdef WINDOWS
1073
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
1074
#else
UNCOV
1075
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
1076
#endif
1077
  }
1078
}
1079

1080
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
120,571✔
1081
  TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf);
361,713✔
1082
  if (!osDataSpaceAvailable()) {
120,571✔
UNCOV
1083
    terrno = TSDB_CODE_NO_DISKSPACE;
×
UNCOV
1084
    fnError("udfd create shared library failed since %s", terrstr());
×
UNCOV
1085
    return terrno;
×
1086
  }
1087

1088
  char path[PATH_MAX] = {0};
120,571✔
1089
  udfdGetFuncBodyPath(udf, path);
120,571✔
1090
  bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
120,571✔
1091
  if (fileExist) {
120,571✔
1092
    tstrncpy(udf->path, path, PATH_MAX);
113,665✔
1093
    fnInfo("udfd func body file. reuse existing file %s", path);
113,665✔
1094
    return TSDB_CODE_SUCCESS;
113,665✔
1095
  }
1096

1097
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
6,906✔
1098
  if (file == NULL) {
6,906✔
UNCOV
1099
    fnError("udfd write udf shared library: %s failed, error: %d %s", path, ERRNO, strerror(ERRNO));
×
UNCOV
1100
    return TSDB_CODE_FILE_CORRUPTED;
×
1101
  }
1102
  int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
6,906✔
1103
  if (count != pFuncInfo->codeSize) {
6,906✔
1104
    fnError("udfd write udf shared library failed");
×
UNCOV
1105
    return TSDB_CODE_FILE_CORRUPTED;
×
1106
  }
1107
  if(taosCloseFile(&file) != 0) {
6,906✔
UNCOV
1108
    fnError("udfdSaveFuncBodyToFile, udfd close file failed");
×
1109
    return TSDB_CODE_FILE_CORRUPTED;
×
1110
  }
1111

1112
  tstrncpy(udf->path, path, PATH_MAX);
6,906✔
1113
  return TSDB_CODE_SUCCESS;
6,906✔
1114
}
1115

1116
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
120,571✔
1117
  TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg);
361,713✔
1118
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
120,571✔
1119

1120
  if (pEpSet) {
120,571✔
1121
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
×
1122
      updateEpSet_s(&global.mgmtEp, pEpSet);
×
1123
    }
1124
  }
1125

1126
  if (pMsg->code != TSDB_CODE_SUCCESS) {
120,571✔
UNCOV
1127
    fnError("udfd rpc error, code:%s", tstrerror(pMsg->code));
×
1128
    msgInfo->code = pMsg->code;
×
1129
    goto _return;
×
1130
  }
1131

1132
  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
120,571✔
1133
    SConnectRsp connectRsp = {0};
×
1134
    if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
×
UNCOV
1135
      fnError("udfd deserialize connect response failed");
×
1136
      goto _return;
×
1137
    }
1138

UNCOV
1139
    int32_t now = taosGetTimestampSec();
×
1140
    int32_t delta = abs(now - connectRsp.svrTimestamp);
×
1141
    if (delta > 900) {
×
UNCOV
1142
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
1143
      goto _return;
×
1144
    }
1145

UNCOV
1146
    if (connectRsp.epSet.numOfEps == 0) {
×
UNCOV
1147
      msgInfo->code = TSDB_CODE_APP_ERROR;
×
UNCOV
1148
      goto _return;
×
1149
    }
1150

UNCOV
1151
    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
×
UNCOV
1152
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
×
1153
    }
UNCOV
1154
    msgInfo->code = 0;
×
1155
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
120,571✔
1156
    SRetrieveFuncRsp retrieveRsp = {0};
120,571✔
1157
    if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
120,571✔
UNCOV
1158
      fnError("udfd deserialize retrieve func response failed");
×
UNCOV
1159
      goto _return;
×
1160
    }
1161

1162
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
120,571✔
1163
    SUdf      *udf = msgInfo->param;
120,571✔
1164
    udf->funcType = pFuncInfo->funcType;
120,571✔
1165
    udf->scriptType = pFuncInfo->scriptType;
120,571✔
1166
    udf->outputType = pFuncInfo->outputType;
120,571✔
1167
    udf->outputLen = pFuncInfo->outputLen;
120,571✔
1168
    udf->bufSize = pFuncInfo->bufSize;
120,571✔
1169

1170
    SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
120,571✔
1171
    udf->version = pFuncExtraInfo->funcVersion;
120,571✔
1172
    udf->createdTime = pFuncExtraInfo->funcCreatedTime;
120,571✔
1173
    msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
120,571✔
1174
    if (msgInfo->code != 0) {
120,571✔
UNCOV
1175
      udf->lastFetchTime = 0;
×
1176
    }
1177
    tFreeSFuncInfo(pFuncInfo);
120,571✔
1178
    taosArrayDestroy(retrieveRsp.pFuncInfos);
120,571✔
1179
    taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
120,571✔
1180
  }
1181

1182
_return:
×
1183
  rpcFreeCont(pMsg->pCont);
120,571✔
1184
  uv_sem_post(&msgInfo->resultSem);
120,571✔
1185
  return;
120,571✔
1186
}
1187

1188
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
120,571✔
1189
  TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf);
482,284✔
1190
  SRetrieveFuncReq retrieveReq = {0};
120,571✔
1191
  retrieveReq.numOfFuncs = 1;
120,571✔
1192
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
120,571✔
1193
  if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
241,142✔
UNCOV
1194
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1195
    return terrno;
×
1196
  }
1197

1198
  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
120,571✔
1199
  if(contLen < 0) {
120,571✔
1200
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1201
    return terrno;
×
1202
  }
1203
  void   *pReq = rpcMallocCont(contLen);
120,571✔
1204
  if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq)  < 0) {
120,571✔
UNCOV
1205
    taosArrayDestroy(retrieveReq.pFuncNames);
×
UNCOV
1206
    rpcFreeCont(pReq);
×
UNCOV
1207
    return terrno;
×
1208
  }
1209
  taosArrayDestroy(retrieveReq.pFuncNames);
120,571✔
1210

1211
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
120,571✔
1212
  if(NULL == msgInfo) {
120,571✔
UNCOV
1213
    return terrno;
×
1214
  }
1215
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
120,571✔
1216
  msgInfo->param = udf;
120,571✔
1217
  if(uv_sem_init(&msgInfo->resultSem, 0)  != 0) {
120,571✔
UNCOV
1218
    taosMemoryFree(msgInfo);
×
1219
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1220
  }
1221

1222
  SRpcMsg rpcMsg = {0};
120,571✔
1223
  rpcMsg.pCont = pReq;
120,571✔
1224
  rpcMsg.contLen = contLen;
120,571✔
1225
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
120,571✔
1226
  rpcMsg.info.ahandle = msgInfo;
120,571✔
1227
  int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
120,571✔
1228
  if (code == 0) {
120,571✔
1229
    uv_sem_wait(&msgInfo->resultSem);
120,571✔
1230
    uv_sem_destroy(&msgInfo->resultSem);
120,571✔
1231
    code = msgInfo->code;
120,571✔
1232
  }
1233
  taosMemoryFree(msgInfo);
120,571✔
1234
  return code;
120,571✔
1235
}
1236

UNCOV
1237
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
×
UNCOV
1238
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
×
UNCOV
1239
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
×
UNCOV
1240
      code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
×
UNCOV
1241
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
UNCOV
1242
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) {
×
1243
      return false;
×
1244
    }
UNCOV
1245
    return true;
×
1246
  } else {
UNCOV
1247
    return false;
×
1248
  }
1249
}
1250

1251
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
578,758✔
1252
  pEpSet->version = 0;
578,758✔
1253

1254
  // init mnode ip set
1255
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
578,758✔
1256
  mgmtEpSet->numOfEps = 0;
578,758✔
1257
  mgmtEpSet->inUse = 0;
578,758✔
1258

1259
  if (firstEp && firstEp[0] != 0) {
578,758✔
1260
    if (strlen(firstEp) >= TSDB_EP_LEN) {
578,758✔
UNCOV
1261
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1262
      return -1;
×
1263
    }
1264

1265
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
578,758✔
1266
    if (code != TSDB_CODE_SUCCESS) {
578,758✔
UNCOV
1267
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1268
      return terrno;
×
1269
    }
1270

1271
    mgmtEpSet->numOfEps++;
578,758✔
1272
  }
1273

1274
  if (secondEp && secondEp[0] != 0) {
578,758✔
1275
    if (strlen(secondEp) >= TSDB_EP_LEN) {
578,758✔
UNCOV
1276
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1277
      return -1;
×
1278
    }
1279

1280
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
578,758✔
1281
    if (code != TSDB_CODE_SUCCESS) {
578,758✔
UNCOV
1282
      fnError("invalid ep %s", secondEp);
×
1283
    } else {
1284
      mgmtEpSet->numOfEps++;
578,758✔
1285
    }
1286
  }
1287

1288
  if (mgmtEpSet->numOfEps == 0) {
578,758✔
UNCOV
1289
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1290
    return -1;
×
1291
  }
1292

1293
  return 0;
578,758✔
1294
}
1295

1296
int32_t udfdOpenClientRpc() {
578,758✔
1297
  SRpcInit rpcInit = {0};
578,758✔
1298
  rpcInit.label = "UDFD";
578,758✔
1299
  rpcInit.numOfThreads = 1;
578,758✔
1300
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
578,758✔
1301
  rpcInit.sessions = 1024;
578,758✔
1302
  rpcInit.connType = TAOS_CONN_CLIENT;
578,758✔
1303
  rpcInit.idleTime = tsShellActivityTimer * 1000;
578,758✔
1304
  rpcInit.user = TSDB_DEFAULT_USER;
578,758✔
1305
  rpcInit.parent = &global;
578,758✔
1306
  rpcInit.rfp = udfdRpcRfp;
578,758✔
1307
  rpcInit.compressSize = tsCompressMsgSize;
578,758✔
1308

1309
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
578,758✔
1310
  connLimitNum = TMAX(connLimitNum, 10);
578,758✔
1311
  connLimitNum = TMIN(connLimitNum, 500);
578,758✔
1312
  rpcInit.connLimitNum = connLimitNum;
578,758✔
1313
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
578,758✔
1314

1315
  rpcInit.enableSSL = tsEnableTLS;
578,758✔
1316
  rpcInit.enableSasl = tsEnableSasl;
578,758✔
1317
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
578,758✔
1318
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
578,758✔
1319
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
578,758✔
1320
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
578,758✔
1321
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
578,758✔
1322
  TAOS_CHECK_RETURN(taosVersionStrToInt(td_version, &rpcInit.compatibilityVer));
578,758✔
1323

1324
  global.clientRpc = rpcOpen(&rpcInit);
578,758✔
1325
  if (global.clientRpc == NULL) {
578,758✔
UNCOV
1326
    fnError("failed to init dnode rpc client");
×
UNCOV
1327
    return terrno;
×
1328
  }
1329
  return 0;
578,758✔
1330
}
1331

1332
void udfdCloseClientRpc() {
578,758✔
1333
  fnInfo("udfd begin closing rpc");
578,758✔
1334
  rpcClose(global.clientRpc);
578,758✔
1335
  fnInfo("udfd finish closing rpc");
578,758✔
1336
}
578,758✔
1337

1338
void udfdOnWrite(uv_write_t *req, int status) {
3,252,907✔
1339
  TAOS_UDF_CHECK_PTR_RVOID(req);
6,505,814✔
1340
  SUvUdfWork *work = (SUvUdfWork *)req->data;
3,252,907✔
1341
  if (status < 0) {
3,252,907✔
UNCOV
1342
    fnError("udfd send response error, length:%zu code:%s", work->output.len, uv_err_name(status));
×
1343
  }
1344
  // remove work from the connection work list
1345
  if (work->conn != NULL) {
3,252,907✔
1346
    SUvUdfWork **ppWork;
1347
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
5,977,000✔
1348
    }
1349
    if (*ppWork == work) {
3,252,907✔
1350
      *ppWork = work->pWorkNext;
3,252,907✔
1351
    } else {
UNCOV
1352
      fnError("work not in conn any more");
×
1353
    }
1354
  }
1355
  taosMemoryFree(work->output.base);
3,252,907✔
1356
  taosMemoryFree(work);
3,252,907✔
1357
  taosMemoryFree(req);
3,252,907✔
1358
}
1359

1360
void udfdSendResponse(uv_work_t *work, int status) {
3,252,907✔
1361
  TAOS_UDF_CHECK_PTR_RVOID(work);
6,505,814✔
1362
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
3,252,907✔
1363

1364
  if (udfWork->conn != NULL) {
3,252,907✔
1365
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
3,252,907✔
1366
    if(write_req == NULL) {
3,252,907✔
UNCOV
1367
      fnError("udfd send response error, malloc failed");
×
UNCOV
1368
      taosMemoryFree(work);
×
UNCOV
1369
      return;
×
1370
    }
1371
    write_req->data = udfWork;
3,252,907✔
1372
    int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
3,252,907✔
1373
    if (code != 0) {
3,252,907✔
UNCOV
1374
      fnError("udfd send response error %s", uv_strerror(code));
×
UNCOV
1375
      taosMemoryFree(write_req);
×
1376
   }
1377
  }
1378
  taosMemoryFree(work);
3,252,907✔
1379
}
1380

1381
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
9,663,389✔
1382
  TAOS_UDF_CHECK_PTR_RVOID(handle, buf);
28,990,167✔
1383
  SUdfdUvConn *ctx = handle->data;
9,663,389✔
1384
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
9,663,389✔
1385
  if (ctx->inputCap == 0) {
9,663,389✔
1386
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
3,494,177✔
1387
    if (ctx->inputBuf) {
3,494,177✔
1388
      ctx->inputLen = 0;
3,494,177✔
1389
      ctx->inputCap = msgHeadSize;
3,494,177✔
1390
      ctx->inputTotal = -1;
3,494,177✔
1391

1392
      buf->base = ctx->inputBuf;
3,494,177✔
1393
      buf->len = ctx->inputCap;
3,494,177✔
1394
    } else {
UNCOV
1395
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
UNCOV
1396
      buf->len = 0;
×
1397
    }
1398
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
6,169,212✔
1399
    buf->base = ctx->inputBuf + ctx->inputLen;
2,913,683✔
1400
    buf->len = msgHeadSize - ctx->inputLen;
2,913,683✔
1401
  } else {
1402
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
3,255,529✔
1403
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
3,255,529✔
1404
    if (inputBuf) {
3,255,529✔
1405
      ctx->inputBuf = inputBuf;
3,255,529✔
1406
      buf->base = ctx->inputBuf + ctx->inputLen;
3,255,529✔
1407
      buf->len = ctx->inputCap - ctx->inputLen;
3,255,529✔
1408
    } else {
UNCOV
1409
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
UNCOV
1410
      buf->len = 0;
×
1411
    }
1412
  }
1413
}
1414

1415
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
6,508,436✔
1416
  if (pipe == NULL) {
6,508,436✔
UNCOV
1417
    fnError("udfd pipe is NULL, LINE:%d", __LINE__);
×
UNCOV
1418
    return false;
×
1419
  }
1420
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
6,508,436✔
1421
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
3,252,907✔
1422
  }
1423
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
6,508,436✔
1424
    fnDebug("receive request complete. length %d", pipe->inputLen);
3,252,907✔
1425
    return true;
3,252,907✔
1426
  }
1427
  return false;
3,255,529✔
1428
}
1429

1430
void udfdHandleRequest(SUdfdUvConn *conn) {
3,252,907✔
1431
  TAOS_UDF_CHECK_PTR_RVOID(conn);
6,505,814✔
1432
  char   *inputBuf = conn->inputBuf;
3,252,907✔
1433
  int32_t inputLen = conn->inputLen;
3,252,907✔
1434

1435
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
3,252,907✔
1436
  if(work == NULL) {
3,252,907✔
UNCOV
1437
    fnError("udfd malloc work failed");
×
UNCOV
1438
    return;
×
1439
  }
1440
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
3,252,907✔
1441
  if(udfWork == NULL) {
3,252,907✔
UNCOV
1442
    fnError("udfd malloc udf work failed");
×
UNCOV
1443
    taosMemoryFree(work);
×
UNCOV
1444
    return;
×
1445
  }
1446
  udfWork->conn = conn;
3,252,907✔
1447
  udfWork->pWorkNext = conn->pWorkList;
3,252,907✔
1448
  conn->pWorkList = udfWork;
3,252,907✔
1449
  udfWork->input = uv_buf_init(inputBuf, inputLen);
3,252,907✔
1450
  conn->inputBuf = NULL;
3,252,907✔
1451
  conn->inputLen = 0;
3,252,907✔
1452
  conn->inputCap = 0;
3,252,907✔
1453
  conn->inputTotal = -1;
3,252,907✔
1454
  work->data = udfWork;
3,252,907✔
1455
  if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
3,252,907✔
1456
  {
UNCOV
1457
    fnError("udfd queue work failed");
×
UNCOV
1458
    taosMemoryFree(work);
×
UNCOV
1459
    taosMemoryFree(udfWork);
×
1460
  }
1461
}
1462

1463
void udfdPipeCloseCb(uv_handle_t *pipe) {
241,270✔
1464
  TAOS_UDF_CHECK_PTR_RVOID(pipe);
482,540✔
1465
  SUdfdUvConn *conn = pipe->data;
241,270✔
1466
  SUvUdfWork  *pWork = conn->pWorkList;
241,270✔
1467
  while (pWork != NULL) {
241,270✔
UNCOV
1468
    pWork->conn = NULL;
×
UNCOV
1469
    pWork = pWork->pWorkNext;
×
1470
  }
1471

1472
  taosMemoryFree(conn->client);
241,270✔
1473
  taosMemoryFree(conn->inputBuf);
241,270✔
1474
  taosMemoryFree(conn);
241,270✔
1475
}
1476

1477
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
9,663,389✔
1478
  TAOS_UDF_CHECK_PTR_RVOID(client, buf);
28,990,167✔
1479
  fnDebug("udfd read %zd bytes from client", nread);
9,663,389✔
1480
  if (nread == 0) return;
9,663,389✔
1481

1482
  SUdfdUvConn *conn = client->data;
6,749,706✔
1483

1484
  if (nread > 0) {
6,749,706✔
1485
    conn->inputLen += nread;
6,508,436✔
1486
    if (isUdfdUvMsgComplete(conn)) {
6,508,436✔
1487
      udfdHandleRequest(conn);
3,252,907✔
1488
    } else {
1489
      // log error or continue;
1490
    }
1491
    return;
6,508,436✔
1492
  }
1493

1494
  if (nread < 0) {
241,270✔
1495
    if (nread == UV_EOF) {
241,270✔
1496
      fnInfo("udfd pipe read EOF");
241,270✔
1497
    } else {
UNCOV
1498
      fnError("Receive error %s", uv_err_name(nread));
×
1499
    }
1500
    udfdUvHandleError(conn);
241,270✔
1501
  }
1502
}
1503

1504
void udfdOnNewConnection(uv_stream_t *server, int status) {
241,270✔
1505
  TAOS_UDF_CHECK_PTR_RVOID(server);
482,540✔
1506
  if (status < 0) {
241,270✔
UNCOV
1507
    fnError("udfd new connection error, code:%s", uv_strerror(status));
×
1508
    return;
×
1509
  }
1510
  int32_t code = 0;
241,270✔
1511

1512
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
241,270✔
1513
  if(client == NULL) {
241,270✔
UNCOV
1514
    fnError("udfd pipe malloc failed");
×
UNCOV
1515
    return;
×
1516
  }
1517
  code = uv_pipe_init(global.loop, client, 0);
241,270✔
1518
  if (code) {
241,270✔
UNCOV
1519
    fnError("udfd pipe init error %s", uv_strerror(code));
×
1520
    taosMemoryFree(client);
×
1521
    return;
×
1522
  }
1523
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
241,270✔
1524
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
241,270✔
1525
    if(ctx == NULL) {
241,270✔
UNCOV
1526
      fnError("udfd conn malloc failed");
×
1527
      goto _exit;
×
1528
    }
1529
    ctx->pWorkList = NULL;
241,270✔
1530
    ctx->client = (uv_stream_t *)client;
241,270✔
1531
    ctx->inputBuf = 0;
241,270✔
1532
    ctx->inputLen = 0;
241,270✔
1533
    ctx->inputCap = 0;
241,270✔
1534
    client->data = ctx;
241,270✔
1535
    ctx->client = (uv_stream_t *)client;
241,270✔
1536
    code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
241,270✔
1537
    if (code) {
241,270✔
1538
      fnError("udfd read start error %s", uv_strerror(code));
×
UNCOV
1539
      udfdUvHandleError(ctx);
×
1540
      taosMemoryFree(ctx);
×
1541
      taosMemoryFree(client);
×
1542
    }
1543
    return;
241,270✔
1544
  }
UNCOV
1545
_exit:
×
UNCOV
1546
    uv_close((uv_handle_t *)client, NULL);
×
UNCOV
1547
    taosMemoryFree(client);
×
1548
}
1549

UNCOV
1550
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
×
UNCOV
1551
  TAOS_UDF_CHECK_PTR_RVOID(handle);
×
UNCOV
1552
  fnInfo("udfd signal received: %d\n", signum);
×
UNCOV
1553
  uv_fs_t req;
×
UNCOV
1554
  int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
UNCOV
1555
  if(code) {
×
UNCOV
1556
    fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
×
1557
  }
UNCOV
1558
  code = uv_signal_stop(handle);
×
UNCOV
1559
  if(code) {
×
UNCOV
1560
    fnError("stop signal handler failed, reason:%s", uv_strerror(code));
×
1561
  }
UNCOV
1562
  uv_stop(global.loop);
×
1563
}
1564

1565
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
580,141✔
1566
  for (int32_t i = 1; i < argc; ++i) {
1,159,360✔
1567
    if (strcmp(argv[i], "-c") == 0) {
580,141✔
1568
      if (i < argc - 1) {
579,680✔
1569
        if (strlen(argv[++i]) >= PATH_MAX) {
579,219✔
1570
          (void)printf("config file path overflow");
461✔
1571
          return -1;
461✔
1572
        }
1573
        tstrncpy(configDir, argv[i], PATH_MAX);
578,758✔
1574
      } else {
1575
        (void)printf("'-c' requires a parameter, default is %s\n", configDir);
461✔
1576
        return -1;
461✔
1577
      }
1578
    } else if (strcmp(argv[i], "-V") == 0) {
461✔
1579
      global.printVersion = true;
461✔
1580
    } else {
1581
    }
1582
  }
1583

1584
  return 0;
579,219✔
1585
}
1586

1587
static void udfdPrintVersion() {
461✔
1588
  (void)printf("%sudf version: %s compatible_version: %s\n", CUS_PROMPT, td_version, td_compatible_version);
461✔
1589
  (void)printf("git: %s\n", td_gitinfo);
461✔
1590
  (void)printf("build: %s\n", td_buildinfo);
461✔
1591
}
461✔
1592

1593
static int32_t udfdInitLog() {
578,758✔
1594
  const char *logName = "udfdlog";
578,758✔
1595
  TAOS_CHECK_RETURN(taosInitLogOutput(&logName));
578,758✔
1596
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
578,758✔
1597
}
1598

1599
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
578,758✔
1600
  TAOS_UDF_CHECK_PTR_RVOID(buf);
1,157,516✔
1601
  buf->base = taosMemoryMalloc(suggested_size);
578,758✔
1602
  if (buf->base == NULL) {
578,758✔
UNCOV
1603
    fnError("udfd ctrl pipe alloc buffer failed");
×
UNCOV
1604
    return;
×
1605
  }
1606
  buf->len = suggested_size;
578,758✔
1607
}
1608

1609
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
578,758✔
1610
  TAOS_UDF_CHECK_PTR_RVOID(q, buf);
1,736,274✔
1611
  if (nread < 0) {
578,758✔
1612
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
578,758✔
1613
    taosMemoryFree(buf->base);
578,758✔
1614
    uv_close((uv_handle_t *)q, NULL);
578,758✔
1615
    uv_stop(global.loop);
578,758✔
1616
    return;
578,758✔
1617
  }
UNCOV
1618
  fnError("udfd ctrl pipe read %zu bytes", nread);
×
UNCOV
1619
  taosMemoryFree(buf->base);
×
1620
}
1621

1622
static void removeListeningPipe() {
1,157,516✔
1623
  uv_fs_t req;
1,148,568✔
1624
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
1,157,516✔
1625
  uv_fs_req_cleanup(&req);
1,157,516✔
1626
  if(err) {
1,157,516✔
1627
    fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
1,157,063✔
1628
  }
1629
}
1,157,516✔
1630

1631
static int32_t udfdUvInit() {
578,758✔
1632
  TAOS_CHECK_RETURN(uv_loop_init(global.loop));
578,758✔
1633

1634
  if (tsStartUdfd) {  // udfd is started by taosd, which shall exit when taosd exit
578,758✔
1635
    TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
578,758✔
1636
    TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
578,758✔
1637
    TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
578,758✔
1638
  }
1639
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
578,758✔
1640

1641
  removeListeningPipe();
578,758✔
1642

1643
  TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
578,758✔
1644

1645
  TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
578,758✔
1646
  TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
578,758✔
1647

1648
  int r;
1649
  fnInfo("bind to pipe %s", global.listenPipeName);
578,758✔
1650
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
578,758✔
UNCOV
1651
    fnError("Bind error %s", uv_err_name(r));
×
UNCOV
1652
    removeListeningPipe();
×
UNCOV
1653
    return -2;
×
1654
  }
1655
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
578,758✔
UNCOV
1656
    fnError("Listen error %s", uv_err_name(r));
×
UNCOV
1657
    removeListeningPipe();
×
UNCOV
1658
    return -3;
×
1659
  }
1660
  return 0;
578,758✔
1661
}
1662

1663
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
1,157,516✔
1664
  if (!uv_is_closing(handle)) {
1,157,516✔
1665
    uv_close(handle, NULL);
1,157,516✔
1666
  }
1667
}
1,157,516✔
1668

1669
static int32_t udfdGlobalDataInit() {
578,758✔
1670
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
578,758✔
1671
  if (loop == NULL) {
578,758✔
1672
    fnError("udfd init uv loop failed, mem overflow");
×
UNCOV
1673
    return terrno;
×
1674
  }
1675
  global.loop = loop;
578,758✔
1676

1677
  if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
578,758✔
UNCOV
1678
    fnError("udfd init script plugins mutex failed");
×
UNCOV
1679
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1680
  }
1681

1682
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
578,758✔
1683
  if (global.udfsHash == NULL) {
578,758✔
UNCOV
1684
    return terrno;
×
1685
  }
1686
  // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
1687

1688
  if (uv_mutex_init(&global.udfsMutex) != 0) {
578,758✔
UNCOV
1689
    fnError("udfd init udfs mutex failed");
×
UNCOV
1690
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1691
  }
1692

1693
  return 0;
578,758✔
1694
}
1695

1696
static void udfdGlobalDataDeinit() {
578,758✔
1697
  uv_mutex_destroy(&global.udfsMutex);
578,758✔
1698
  uv_mutex_destroy(&global.scriptPluginsMutex);
578,758✔
1699
  taosMemoryFreeClear(global.loop);
578,758✔
1700
  fnInfo("udfd global data deinit");
578,758✔
1701
}
578,758✔
1702

1703
static void udfdRun() {
578,758✔
1704
  fnInfo("start udfd event loop");
578,758✔
1705
  int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
578,758✔
1706
  if(code != 0) {
578,758✔
1707
    fnError("udfd event loop still has active handles or requests.");
578,758✔
1708
  }
1709
  fnInfo("udfd event loop stopped.");
578,758✔
1710

1711
  (void)uv_loop_close(global.loop);
578,758✔
1712

1713
  uv_walk(global.loop, udfdCloseWalkCb, NULL);
578,758✔
1714
  code = uv_run(global.loop, UV_RUN_DEFAULT);
578,758✔
1715
  if(code != 0) {
578,758✔
UNCOV
1716
    fnError("udfd event loop still has active handles or requests.");
×
1717
  }
1718
  (void)uv_loop_close(global.loop);
578,758✔
1719
}
578,758✔
1720

1721
int32_t udfdInitResidentFuncs() {
578,758✔
1722
  if (strlen(tsUdfdResFuncs) == 0) {
578,758✔
1723
    return TSDB_CODE_SUCCESS;
576,752✔
1724
  }
1725

1726
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
2,006✔
1727
  char *pSave = tsUdfdResFuncs;
2,006✔
1728
  char *token;
1729
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
6,018✔
1730
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
4,012✔
1731
    tstrncpy(func, token, TSDB_FUNC_NAME_LEN);
4,012✔
1732
    fnInfo("udfd add resident function %s", func);
4,012✔
1733
    if(taosArrayPush(global.residentFuncs, func) == NULL)
8,024✔
1734
    {
UNCOV
1735
      taosArrayDestroy(global.residentFuncs);
×
UNCOV
1736
      return terrno;
×
1737
    }
1738
  }
1739

1740
  return TSDB_CODE_SUCCESS;
2,006✔
1741
}
1742

1743
void udfdDeinitResidentFuncs() {
578,758✔
1744
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
582,770✔
1745
    char  *funcName = taosArrayGet(global.residentFuncs, i);
4,012✔
1746
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
4,012✔
1747
    if (udfInHash) {
4,012✔
1748
      SUdf   *udf = *udfInHash;
2,465✔
1749
      int32_t code = 0;
2,465✔
1750
      if (udf->scriptPlugin->udfDestroyFunc) {
2,465✔
1751
        code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
2,465✔
1752
        fnDebug("udfd %s destroy function returns %d", funcName, code);
2,465✔
1753
      }
1754
      if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
2,465✔
1755
      {
UNCOV
1756
        fnError("udfd remove resident function %s failed", funcName);
×
1757
      }
1758
      taosMemoryFree(udf);
2,465✔
1759
    }
1760
  }
1761
  taosHashCleanup(global.udfsHash);
578,758✔
1762
  taosArrayDestroy(global.residentFuncs);
578,758✔
1763
  fnInfo("udfd resident functions are deinit");
578,758✔
1764
}
578,758✔
1765

1766
int32_t udfdCreateUdfSourceDir() {
578,758✔
1767
  snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
578,758✔
1768
  int32_t code = taosMkDir(global.udfDataDir);
578,758✔
1769
  if (code != TSDB_CODE_SUCCESS) {
578,758✔
UNCOV
1770
    snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
×
UNCOV
1771
    code = taosMkDir(global.udfDataDir);
×
1772
  }
1773
  fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
578,758✔
1774

1775
  return code;
578,758✔
1776
}
1777

1778
void udfdDestroyUdfSourceDir() {
578,758✔
1779
  fnInfo("destory udf source directory %s", global.udfDataDir);
578,758✔
1780
  taosRemoveDir(global.udfDataDir);
578,758✔
1781
}
578,758✔
1782

1783
int main(int argc, char *argv[]) {
580,141✔
1784
  int  code = 0;
580,141✔
1785
  bool logInitialized = false;
580,141✔
1786
  bool cfgInitialized = false;
580,141✔
1787
  bool openClientRpcFinished = false;
580,141✔
1788
  bool residentFuncsInited = false;
580,141✔
1789
  bool udfSourceDirInited = false;
580,141✔
1790
  bool globalDataInited = false;
580,141✔
1791
  taosSetSkipKeyCheckMode();
580,141✔
1792

1793
  if (!taosCheckSystemIsLittleEnd()) {
580,141✔
UNCOV
1794
    (void)printf("failed to start since on non-little-end machines\n");
×
UNCOV
1795
    return -1;
×
1796
  }
1797

1798
  if (udfdParseArgs(argc, argv) != 0) {
580,141✔
1799
    (void)printf("failed to start since parse args error\n");
922✔
1800
    return -1;
922✔
1801
  }
1802

1803
  if (global.printVersion) {
579,219✔
1804
    udfdPrintVersion();
461✔
1805
    return 0;
461✔
1806
  }
1807

1808
  if (udfdInitLog() != 0) {
578,758✔
1809
    // ignore create log failed, because this error no matter
UNCOV
1810
    (void)printf("failed to init udfd log.");
×
1811
  } else {
1812
    logInitialized = true;  // log is initialized
578,758✔
1813
  }
1814

1815
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
578,758✔
UNCOV
1816
    fnError("failed to start since read config error");
×
UNCOV
1817
    code = -2;
×
UNCOV
1818
    goto _exit;
×
1819
  }
1820
  cfgInitialized = true;  // cfg is initialized
578,758✔
1821
  fnInfo("udfd start with config file %s", configDir);
578,758✔
1822

1823
  if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
578,758✔
UNCOV
1824
    fnError("init ep set from cfg failed");
×
UNCOV
1825
    code = -3;
×
UNCOV
1826
    goto _exit;
×
1827
  }
1828
  fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
578,758✔
1829
  if (udfdOpenClientRpc() != 0) {
578,758✔
1830
    fnError("open rpc connection to mnode failed");
×
UNCOV
1831
    code = -4;
×
UNCOV
1832
    goto _exit;
×
1833
  }
1834
  fnInfo("udfd rpc client is opened");
578,758✔
1835
  openClientRpcFinished = true;  // rpc is opened
578,758✔
1836

1837
  if (udfdCreateUdfSourceDir() != 0) {
578,758✔
1838
    fnError("create udf source directory failed");
×
UNCOV
1839
    code = -5;
×
UNCOV
1840
    goto _exit;
×
1841
  }
1842
  udfSourceDirInited = true;  // udf source dir is created
578,758✔
1843
  fnInfo("udfd udf source directory is created");
578,758✔
1844

1845
  if (udfdGlobalDataInit() != 0) {
578,758✔
UNCOV
1846
    fnError("init global data failed");
×
UNCOV
1847
    code = -6;
×
UNCOV
1848
    goto _exit;
×
1849
  }
1850
  globalDataInited = true;  // global data is inited
578,758✔
1851
  fnInfo("udfd global data is inited");
578,758✔
1852

1853
  if (udfdUvInit() != 0) {
578,758✔
UNCOV
1854
    fnError("uv init failure");
×
UNCOV
1855
    code = -7;
×
UNCOV
1856
    goto _exit;
×
1857
  }
1858
  fnInfo("udfd uv is inited");
578,758✔
1859

1860
  if (udfdInitResidentFuncs() != 0) {
578,758✔
UNCOV
1861
    fnError("init resident functions failed");
×
UNCOV
1862
    code = -8;
×
UNCOV
1863
    goto _exit;
×
1864
  }
1865
  residentFuncsInited = true;  // resident functions are inited
578,758✔
1866
  fnInfo("udfd resident functions are inited");
578,758✔
1867

1868
  udfdRun();
578,758✔
1869
  fnInfo("udfd exit normally");
578,758✔
1870

1871
  removeListeningPipe();
578,758✔
1872

1873
_exit:
578,758✔
1874
  if (residentFuncsInited) {
578,758✔
1875
    udfdDeinitResidentFuncs();
578,758✔
1876
  }
1877
  udfdDeinitScriptPlugins();
578,758✔
1878
  if (globalDataInited) {
578,758✔
1879
    udfdGlobalDataDeinit();
578,758✔
1880
  }
1881
  if (udfSourceDirInited) {
578,758✔
1882
    udfdDestroyUdfSourceDir();
578,758✔
1883
  }
1884
  if (openClientRpcFinished) {
578,758✔
1885
    udfdCloseClientRpc();
578,758✔
1886
  }
1887
  if (cfgInitialized) {
578,758✔
1888
    taosCleanupCfg();
578,758✔
1889
  }
1890
  if (logInitialized) {
578,758✔
1891
    taosCloseLog();
578,758✔
1892
  }
1893

1894
  return code;
578,758✔
1895
}
1896
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc