• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3525

10 Nov 2024 03:50AM UTC coverage: 60.818% (-0.08%) from 60.898%
#3525

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

118634 of 249004 branches covered (47.64%)

Branch coverage included in aggregate %.

136 of 169 new or added lines in 23 files covered. (80.47%)

542 existing lines in 129 files now uncovered.

199071 of 273386 relevant lines covered (72.82%)

15691647.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.35
/source/libs/function/src/udfd.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "uv.h"
18
#include "os.h"
19
#include "fnLog.h"
20
#include "thash.h"
21

22
#include "tudf.h"
23
#include "tudfInt.h"
24
#include "version.h"
25

26
#include "tdatablock.h"
27
#include "tdataformat.h"
28
#include "tglobal.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "tmisce.h"
32
#include "tversion.h"
33
// clang-format on
34

35
#define UDFD_MAX_SCRIPT_PLUGINS 64
36
#define UDFD_MAX_SCRIPT_TYPE    1
37
#define UDFD_MAX_PLUGIN_FUNCS   9
38

39
typedef struct SUdfCPluginCtx {
40
  uv_lib_t lib;
41

42
  TUdfScalarProcFunc scalarProcFunc;
43

44
  TUdfAggStartFunc   aggStartFunc;
45
  TUdfAggProcessFunc aggProcFunc;
46
  TUdfAggFinishFunc  aggFinishFunc;
47
  TUdfAggMergeFunc   aggMergeFunc;
48

49
  TUdfInitFunc    initFunc;
50
  TUdfDestroyFunc destroyFunc;
51
} SUdfCPluginCtx;
52

53
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
13✔
54

55
int32_t udfdCPluginClose() { return 0; }
13✔
56

57
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
643✔
58
  char  initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
643✔
59
  char *initSuffix = "_init";
643✔
60
  snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
643✔
61
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
643✔
62

63
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
635✔
64
  char *destroySuffix = "_destroy";
635✔
65
  snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
635✔
66
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
635!
67
  return 0;
635✔
68
}
69

70
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
309✔
71
  char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
309✔
72
  snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); 
309✔
73
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
309!
74

75
  char  startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
309✔
76
  char *startSuffix = "_start";
309✔
77
  snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
309✔
78
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
309✔
79

80
  char  finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
300✔
81
  char *finishSuffix = "_finish";
300✔
82
  snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
300✔
83
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
300!
84

85
  char  mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
300✔
86
  char *mergeSuffix = "_merge";
300✔
87
  snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix);
300✔
88
  int ret = uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
300✔
89
  if (ret != 0) {
300!
90
    fnInfo("uv_dlsym function %s. error: %s", mergeFuncName, uv_strerror(ret));
300!
91
  }
92
  return 0;
300✔
93
}
94

95
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
643✔
96
  int32_t         err = 0;
643✔
97
  SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
643✔
98
  if (NULL == udfCtx) {
643!
99
    return terrno;
×
100
  }
101
  err = uv_dlopen(udf->path, &udfCtx->lib);
643✔
102
  if (err != 0) {
643!
103
    fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
×
104
    taosMemoryFree(udfCtx);
×
105
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
106
  }
107
  const char *udfName = udf->name;
643✔
108

109
  err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
643✔
110
  if (err != 0) {
643✔
111
    fnError("can not load init/destroy functions. error: %d", err);
8!
112
    err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
8✔
113
    goto _exit;
8✔
114
  }
115

116
  if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
635✔
117
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
326✔
118
    snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
326✔
119
    if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
326!
120
      fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
×
121
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
122
      goto _exit;
×
123
    }
124
  } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
309!
125
    err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
309✔
126
    if (err != 0) {
309✔
127
      fnError("can not load aggregation functions. error: %d", err);
9!
128
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
9✔
129
      goto _exit;
9✔
130
    }
131
  }
132

133
  if (udfCtx->initFunc) {
626!
134
    err = (udfCtx->initFunc)();
626✔
135
    if (err != 0) {
626!
136
      fnError("udf init function failed. error: %d", err);
×
137
      goto _exit;
×
138
    }
139
  }
140
  *pUdfCtx = udfCtx;
626✔
141
  return 0;
626✔
142
_exit:
17✔
143
  uv_dlclose(&udfCtx->lib);
17✔
144
  taosMemoryFree(udfCtx);
17✔
145
  return err;
17✔
146
}
147

148
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
615✔
149
  SUdfCPluginCtx *ctx = udfCtx;
615✔
150
  int32_t         code = 0;
615✔
151
  if (ctx->destroyFunc) {
615!
152
    code = (ctx->destroyFunc)();
615✔
153
  }
154
  uv_dlclose(&ctx->lib);
615✔
155
  taosMemoryFree(ctx);
615✔
156
  return code;
615✔
157
}
158

159
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
27,089✔
160
  SUdfCPluginCtx *ctx = udfCtx;
27,089✔
161
  if (ctx->scalarProcFunc) {
27,089!
162
    return ctx->scalarProcFunc(block, resultCol);
27,113✔
163
  } else {
164
    fnError("udfd c plugin scalar proc not implemented");
×
165
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
166
  }
167
}
168

169
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
804✔
170
  SUdfCPluginCtx *ctx = udfCtx;
804✔
171
  if (ctx->aggStartFunc) {
804!
172
    return ctx->aggStartFunc(buf);
804✔
173
  } else {
UNCOV
174
    fnError("udfd c plugin aggregation start not implemented");
×
UNCOV
175
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
176
  }
177
  return 0;
178
}
179

180
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
1,838✔
181
  SUdfCPluginCtx *ctx = udfCtx;
1,838✔
182
  if (ctx->aggProcFunc) {
1,838!
183
    return ctx->aggProcFunc(block, interBuf, newInterBuf);
1,838✔
184
  } else {
185
    fnError("udfd c plugin aggregation process not implemented");
×
186
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
187
  }
188
}
189

190
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
×
191
                               void *udfCtx) {
192
  SUdfCPluginCtx *ctx = udfCtx;
×
193
  if (ctx->aggMergeFunc) {
×
194
    return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
×
195
  } else {
196
    fnError("udfd c plugin aggregation merge not implemented");
×
197
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
198
  }
199
}
200

201
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
785✔
202
  SUdfCPluginCtx *ctx = udfCtx;
785✔
203
  if (ctx->aggFinishFunc) {
785!
204
    return ctx->aggFinishFunc(buf, resultData);
785✔
205
  } else {
206
    fnError("udfd c plugin aggregation finish not implemented");
×
207
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
208
  }
209
  return 0;
210
}
211

212
// for c, the function pointer are filled directly and libloaded = true;
213
// for others, dlopen/dlsym to find function pointers
214
typedef struct SUdfScriptPlugin {
215
  int8_t scriptType;
216

217
  char     libPath[PATH_MAX];
218
  bool     libLoaded;
219
  uv_lib_t lib;
220

221
  TScriptUdfScalarProcFunc udfScalarProcFunc;
222
  TScriptUdfAggStartFunc   udfAggStartFunc;
223
  TScriptUdfAggProcessFunc udfAggProcFunc;
224
  TScriptUdfAggMergeFunc   udfAggMergeFunc;
225
  TScriptUdfAggFinishFunc  udfAggFinishFunc;
226

227
  TScriptUdfInitFunc    udfInitFunc;
228
  TScriptUdfDestoryFunc udfDestroyFunc;
229

230
  TScriptOpenFunc  openFunc;
231
  TScriptCloseFunc closeFunc;
232
} SUdfScriptPlugin;
233

234
typedef struct SUdfdContext {
235
  uv_loop_t  *loop;
236
  uv_pipe_t   ctrlPipe;
237
  uv_signal_t intrSignal;
238
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
239
  uv_pipe_t   listeningPipe;
240

241
  void     *clientRpc;
242
  SCorEpSet mgmtEp;
243

244
  uv_mutex_t udfsMutex;
245
  SHashObj  *udfsHash;
246

247
  uv_mutex_t        scriptPluginsMutex;
248
  SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
249

250
  SArray *residentFuncs;
251

252
  char udfDataDir[PATH_MAX];
253
  bool printVersion;
254
} SUdfdContext;
255

256
SUdfdContext global;
257

258
struct SUdfdUvConn;
259
struct SUvUdfWork;
260

261
typedef struct SUdfdUvConn {
262
  uv_stream_t *client;
263
  char        *inputBuf;
264
  int32_t      inputLen;
265
  int32_t      inputCap;
266
  int32_t      inputTotal;
267

268
  struct SUvUdfWork *pWorkList;  // head of work list
269
} SUdfdUvConn;
270

271
typedef struct SUvUdfWork {
272
  SUdfdUvConn *conn;
273
  uv_buf_t     input;
274
  uv_buf_t     output;
275

276
  struct SUvUdfWork *pWorkNext;
277
} SUvUdfWork;
278

279
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
280

281
typedef struct SUdf {
282
  char    name[TSDB_FUNC_NAME_LEN + 1];
283
  int32_t version;
284
  int64_t createdTime;
285

286
  int8_t  funcType;
287
  int8_t  scriptType;
288
  int8_t  outputType;
289
  int32_t outputLen;
290
  int32_t bufSize;
291

292
  char path[PATH_MAX];
293

294
  int32_t    refCount;
295
  EUdfState  state;
296
  uv_mutex_t lock;
297
  uv_cond_t  condReady;
298
  bool       resident;
299

300
  SUdfScriptPlugin *scriptPlugin;
301
  void             *scriptUdfCtx;
302

303
  int64_t lastFetchTime;  // last fetch time in milliseconds
304
  bool    expired;
305
} SUdf;
306

307
typedef struct SUdfcFuncHandle {
308
  SUdf *udf;
309
} SUdfcFuncHandle;
310

311
typedef enum EUdfdRpcReqRspType {
312
  UDFD_RPC_MNODE_CONNECT = 0,
313
  UDFD_RPC_RETRIVE_FUNC,
314
} EUdfdRpcReqRspType;
315

316
typedef struct SUdfdRpcSendRecvInfo {
317
  EUdfdRpcReqRspType rpcType;
318
  int32_t            code;
319
  void              *param;
320
  uv_sem_t           resultSem;
321
} SUdfdRpcSendRecvInfo;
322

323
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
324
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
325
static int32_t udfdConnectToMnode();
326
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
327
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
328
static int32_t udfdOpenClientRpc();
329
static void    udfdCloseClientRpc();
330

331
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
332
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
333
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
334
static void udfdProcessRequest(uv_work_t *req);
335
static void udfdOnWrite(uv_write_t *req, int status);
336
static void udfdSendResponse(uv_work_t *work, int status);
337
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
338
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
339
static void udfdHandleRequest(SUdfdUvConn *conn);
340
static void udfdPipeCloseCb(uv_handle_t *pipe);
341
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
901✔
342
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
343
static void udfdOnNewConnection(uv_stream_t *server, int status);
344

345
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
346
static void    removeListeningPipe();
347

348
static void    udfdPrintVersion();
349
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
350
static int32_t udfdInitLog();
351

352
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
353
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
354
static int32_t udfdUvInit();
355
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
356
static void    udfdRun();
357
static void    udfdConnectMnodeThreadFunc(void *args);
358

359
int32_t udfdNewUdf(SUdf **pUdf,  const char *udfName);
360
void  udfdGetFuncBodyPath(const SUdf *udf, char *path);
361

362
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
13✔
363
  plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
13✔
364
  plugin->openFunc = udfdCPluginOpen;
13✔
365
  plugin->closeFunc = udfdCPluginClose;
13✔
366
  plugin->udfInitFunc = udfdCPluginUdfInit;
13✔
367
  plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
13✔
368
  plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
13✔
369
  plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
13✔
370
  plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
13✔
371
  plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
13✔
372
  plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
13✔
373

374
  SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
13✔
375
  int32_t           err = plugin->openFunc(items, 1);
13✔
376
  if (err != 0) return err;
13!
377
  return 0;
13✔
378
}
379

380
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
2✔
381
  int err = uv_dlopen(libPath, pLib);
2✔
382
  if (err != 0) {
2!
383
    fnError("can not load library %s. error: %s", libPath, uv_strerror(err));
×
384
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
385
  }
386

387
  for (int i = 0; i < numOfFuncs; ++i) {
20✔
388
    err = uv_dlsym(pLib, funcName[i], func[i]);
18✔
389
    if (err != 0) {
18!
390
      fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
×
391
    }
392
  }
393
  return 0;
2✔
394
}
395

396
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
2✔
397
  plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
2✔
398
  // todo: windows support
399
  snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
2✔
400
  plugin->libLoaded = false;
2✔
401
  const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen",         "pyClose",         "pyUdfInit",
2✔
402
                                                 "pyUdfDestroy",   "pyUdfScalarProc", "pyUdfAggStart",
403
                                                 "pyUdfAggFinish", "pyUdfAggProc",    "pyUdfAggMerge"};
404
  void      **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
2✔
405
           (void **)&plugin->openFunc,         (void **)&plugin->closeFunc,         (void **)&plugin->udfInitFunc,
2✔
406
           (void **)&plugin->udfDestroyFunc,   (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
2✔
407
           (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc,    (void **)&plugin->udfAggMergeFunc};
2✔
408
  int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
2✔
409
  if (err != 0) {
2!
410
    fnError("can not load python plugin. lib path %s", plugin->libPath);
×
411
    return err;
×
412
  }
413

414
  if (plugin->openFunc) {
2!
415
    int16_t lenPythonPath =
2✔
416
        strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1;  // global.udfDataDir:tsUdfdLdLibPath
2✔
417
    char *pythonPath = taosMemoryMalloc(lenPythonPath);
2✔
418
    if(pythonPath == NULL) {
2!
419
      uv_dlclose(&plugin->lib);
×
420
      return terrno;
×
421
    }
422
#ifdef WINDOWS
423
    snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
424
#else
425
    snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
2✔
426
#endif
427
    SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
2✔
428
    err = plugin->openFunc(items, 2);
2✔
429
    taosMemoryFree(pythonPath);
2✔
430
  }
431
  if (err != 0) {
2!
432
    fnError("udf script python plugin open func failed. error: %d", err);
×
433
    uv_dlclose(&plugin->lib);
×
434
    return err;
×
435
  }
436
  plugin->libLoaded = true;
2✔
437

438
  return 0;
2✔
439
}
440

441
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
13✔
442
  if (plugin->closeFunc) {
13!
443
    if (plugin->closeFunc() != 0) {
13!
444
      fnError("udf script c plugin close func failed.line:%d", __LINE__);
×
445
    }
446
  }
447
  plugin->openFunc = NULL;
13✔
448
  plugin->closeFunc = NULL;
13✔
449
  plugin->udfInitFunc = NULL;
13✔
450
  plugin->udfDestroyFunc = NULL;
13✔
451
  plugin->udfScalarProcFunc = NULL;
13✔
452
  plugin->udfAggStartFunc = NULL;
13✔
453
  plugin->udfAggProcFunc = NULL;
13✔
454
  plugin->udfAggMergeFunc = NULL;
13✔
455
  plugin->udfAggFinishFunc = NULL;
13✔
456
  return;
13✔
457
}
458

459
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
2✔
460
  if (plugin->closeFunc) {
2!
461
    if(plugin->closeFunc() != 0) {
2!
462
      fnError("udf script python plugin close func failed.line:%d", __LINE__);
×
463
    }
464
  }
465
  uv_dlclose(&plugin->lib);
2✔
466
  if (plugin->libLoaded) {
2!
467
    plugin->libLoaded = false;
2✔
468
  }
469
  plugin->openFunc = NULL;
2✔
470
  plugin->closeFunc = NULL;
2✔
471
  plugin->udfInitFunc = NULL;
2✔
472
  plugin->udfDestroyFunc = NULL;
2✔
473
  plugin->udfScalarProcFunc = NULL;
2✔
474
  plugin->udfAggStartFunc = NULL;
2✔
475
  plugin->udfAggProcFunc = NULL;
2✔
476
  plugin->udfAggMergeFunc = NULL;
2✔
477
  plugin->udfAggFinishFunc = NULL;
2✔
478
}
2✔
479

480
int32_t udfdInitScriptPlugin(int8_t scriptType) {
15✔
481
  SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
15✔
482
  if (plugin == NULL) {
15!
483
    return terrno;
×
484
  }
485
  int32_t err = 0;
15✔
486
  switch (scriptType) {
15!
487
    case TSDB_FUNC_SCRIPT_BIN_LIB:
13✔
488
      err = udfdInitializeCPlugin(plugin);
13✔
489
      if (err != 0) {
13!
490
        fnError("udf script c plugin init failed. error: %d", err);
×
491
        taosMemoryFree(plugin);
×
492
        return err;
×
493
      }
494
      break;
13✔
495
    case TSDB_FUNC_SCRIPT_PYTHON: {
2✔
496
      err = udfdInitializePythonPlugin(plugin);
2✔
497
      if (err != 0) {
2!
498
        fnError("udf script python plugin init failed. error: %d", err);
×
499
        taosMemoryFree(plugin);
×
500
        return err;
×
501
      }
502
      break;
2✔
503
    }
504
    default:
×
505
      fnError("udf script type %d not supported", scriptType);
×
506
      taosMemoryFree(plugin);
×
507
      return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
508
  }
509

510
  global.scriptPlugins[scriptType] = plugin;
15✔
511
  return TSDB_CODE_SUCCESS;
15✔
512
}
513

514
void udfdDeinitScriptPlugins() {
2,515✔
515
  SUdfScriptPlugin *plugin = NULL;
2,515✔
516
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
2,515✔
517
  if (plugin != NULL) {
2,515✔
518
    udfdDeinitPythonPlugin(plugin);
2✔
519
    taosMemoryFree(plugin);
2✔
520
  }
521

522
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
2,515✔
523
  if (plugin != NULL) {
2,515✔
524
    udfdDeinitCPlugin(plugin);
13✔
525
    taosMemoryFree(plugin);
13✔
526
  }
527
  return;
2,515✔
528
}
529

530
void udfdProcessRequest(uv_work_t *req) {
33,714✔
531
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
33,714✔
532
  SUdfRequest request = {0};
33,714✔
533
  if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
33,714✔
534
  {
535
    taosMemoryFree(uvUdf->input.base);
25✔
536
    fnError("udf request decode failed");
×
537
    return;
×
538
  }
539

540
  switch (request.type) {
33,638!
541
    case UDF_TASK_SETUP: {
901✔
542
      udfdProcessSetupRequest(uvUdf, &request);
901✔
543
      break;
901✔
544
    }
545

546
    case UDF_TASK_CALL: {
31,901✔
547
      udfdProcessCallRequest(uvUdf, &request);
31,901✔
548
      break;
31,927✔
549
    }
550
    case UDF_TASK_TEARDOWN: {
836✔
551
      udfdProcessTeardownRequest(uvUdf, &request);
836✔
552
      break;
836✔
553
    }
554
    default: {
×
555
      break;
×
556
    }
557
  }
558
}
559

560
void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
802✔
561
  udfInfo->bufSize = udf->bufSize;
802✔
562
  if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
802✔
563
    udfInfo->funcType = UDF_FUNC_TYPE_AGG;
393✔
564
  } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
409!
565
    udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
409✔
566
  }
567
  udfInfo->name = udf->name;
802✔
568
  udfInfo->version = udf->version;
802✔
569
  udfInfo->createdTime = udf->createdTime;
802✔
570
  udfInfo->outputLen = udf->outputLen;
802✔
571
  udfInfo->outputType = udf->outputType;
802✔
572
  udfInfo->path = udf->path;
802✔
573
  udfInfo->scriptType = udf->scriptType;
802✔
574
}
802✔
575

576
int32_t udfdInitUdf(char *udfName, SUdf *udf) {
802✔
577
  int32_t err = 0;
802✔
578
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
802✔
579
  if (err != 0) {
802!
580
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
×
581
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
582
  }
583
  if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
802!
584
    fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
×
585
    return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
586
  }
587

588
  uv_mutex_lock(&global.scriptPluginsMutex);
802✔
589
  SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
802✔
590
  if (scriptPlugin == NULL) {
802✔
591
    err = udfdInitScriptPlugin(udf->scriptType);
15✔
592
    if (err != 0) {
15!
593
      fnError("udf name %s init script plugin failed. error %d", udfName, err);
×
594
      uv_mutex_unlock(&global.scriptPluginsMutex);
×
595
      return err;
×
596
    }
597
  }
598
  uv_mutex_unlock(&global.scriptPluginsMutex);
802✔
599
  udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
802✔
600

601
  SScriptUdfInfo info = {0};
802✔
602
  convertUdf2UdfInfo(udf, &info);
802✔
603
  err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
802✔
604
  if (err != 0) {
802✔
605
    fnError("udf name %s init failed. error %d", udfName, err);
17!
606
    return err;
17✔
607
  }
608

609
  fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
785!
610
  return 0;
785✔
611
}
612

613
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
788✔
614
  SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
788✔
615
  if (NULL == udfNew) {
788!
616
    return terrno;
×
617
  }
618
  udfNew->refCount = 1;
788✔
619
  udfNew->lastFetchTime = taosGetTimestampMs();
788✔
620
  tstrncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
788✔
621

622
  udfNew->state = UDF_STATE_INIT;
788✔
623
  if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
788!
624
  if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
788!
625

626
  udfNew->resident = false;
788✔
627
  udfNew->expired = false;
788✔
628
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
788!
629
    char *funcName = taosArrayGet(global.residentFuncs, i);
×
630
    if (strcmp(udfName, funcName) == 0) {
×
631
      udfNew->resident = true;
×
632
      break;
×
633
    }
634
  }
635
  *pUdf =  udfNew;
788✔
636
  return 0;
788✔
637
}
638

639
void udfdFreeUdf(void *pData) {
×
640
  SUdf *pSudf = (SUdf *)pData;
×
641
  if (pSudf == NULL) {
×
642
    return;
×
643
  }
644

645
  if (pSudf->scriptPlugin != NULL) {
×
646
    if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
×
647
      fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name);
×
648
    }
649
  }
650

651
  uv_mutex_destroy(&pSudf->lock);
×
652
  uv_cond_destroy(&pSudf->condReady);
×
653
  taosMemoryFree(pSudf);
×
654
}
655

656
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
901✔
657
  uv_mutex_lock(&global.udfsMutex);
901✔
658
  SUdf  **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
901✔
659
  int64_t currTime = taosGetTimestampMs();
901✔
660
  bool    expired = false;
901✔
661
  if (pUdfHash) {
901✔
662
    expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;  // 10s
120✔
663
    if (!expired) {
120✔
664
      ++(*pUdfHash)->refCount;
113✔
665
      *ppUdf = *pUdfHash;
113✔
666
      uv_mutex_unlock(&global.udfsMutex);
113✔
667
      fnInfo("udfd reuse existing udf. udf  %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
113!
668
             (*ppUdf)->createdTime);
669
      return 0;
113✔
670
    } else {
671
      (*pUdfHash)->expired = true;
7✔
672
      fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
7!
673
             (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
674
      if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
7!
675
        fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName);
×
676
      }
677
    }
678
  }
679

680
  int32_t code = udfdNewUdf(ppUdf, udfName);
788✔
681
  if(code != 0) {
788!
682
    uv_mutex_unlock(&global.udfsMutex);
×
683
    return code;
×
684
  }
685

686
  if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
788!
687
    uv_mutex_unlock(&global.udfsMutex);
×
688
    return code;
×
689
  }
690
  uv_mutex_unlock(&global.udfsMutex);
788✔
691

692
  return 0;
788✔
693
}
694

695
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
901✔
696
  // TODO: tracable id from client. connect, setup, call, teardown
697
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
901!
698

699
  SUdfSetupRequest *setup = &request->setup;
901✔
700
  int32_t           code = TSDB_CODE_SUCCESS;
901✔
701
  SUdf *udf = NULL;
901✔
702

703
  code = udfdGetOrCreateUdf(&udf, setup->udfName);
901✔
704
  if(code != 0) {
901!
705
    fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
×
706
    goto _send;
×
707
  }
708
  uv_mutex_lock(&udf->lock);
901✔
709
  if (udf->state == UDF_STATE_INIT) {
901✔
710
    udf->state = UDF_STATE_LOADING;
802✔
711
    code = udfdInitUdf(setup->udfName, udf);
802✔
712
    if (code == 0) {
802✔
713
      udf->state = UDF_STATE_READY;
785✔
714
    } else {
715
      udf->state = UDF_STATE_INIT;
17✔
716
    }
717
    uv_cond_broadcast(&udf->condReady);
802✔
718
    uv_mutex_unlock(&udf->lock);
802✔
719
  } else {
720
    while (udf->state == UDF_STATE_LOADING) {
99!
721
      uv_cond_wait(&udf->condReady, &udf->lock);
×
722
    }
723
    uv_mutex_unlock(&udf->lock);
99✔
724
  }
725
  SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
901✔
726
  if(handle == NULL) {
901!
727
    fnError("udfdProcessSetupRequest: malloc failed.");
×
728
    code = terrno;
×
729
  }
730
  handle->udf = udf;
901✔
731

732
_send:
901✔
733
  ;
734
  SUdfResponse rsp;
735
  rsp.seqNum = request->seqNum;
901✔
736
  rsp.type = request->type;
901✔
737
  rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
901✔
738
  rsp.setupRsp.udfHandle = (int64_t)(handle);
901✔
739
  rsp.setupRsp.outputType = udf->outputType;
901✔
740
  rsp.setupRsp.bytes = udf->outputLen;
901✔
741
  rsp.setupRsp.bufSize = udf->bufSize;
901✔
742

743
  int32_t len = encodeUdfResponse(NULL, &rsp);
901✔
744
  if(len < 0) {
901!
745
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
746
    return;
×
747
  }
748
  rsp.msgLen = len;
901✔
749
  void *bufBegin = taosMemoryMalloc(len);
901✔
750
  if(bufBegin == NULL) {
901!
751
    fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
×
752
    return;
×
753
  }
754
  void *buf = bufBegin;
901✔
755
  if(encodeUdfResponse(&buf, &rsp) < 0) {
901!
756
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
757
    taosMemoryFree(bufBegin);
×
758
    return;
×
759
  }
760
  
761
  uvUdf->output = uv_buf_init(bufBegin, len);
901✔
762

763
  taosMemoryFree(uvUdf->input.base);
901✔
764
  return;
901✔
765
}
766

767
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
31,898✔
768
  SUdfCallRequest *call = &request->call;
31,898✔
769
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
31,898✔
770
          request->seqNum);
771
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
31,941✔
772
  SUdf             *udf = handle->udf;
31,941✔
773
  SUdfResponse      response = {0};
31,941✔
774
  SUdfResponse     *rsp = &response;
31,941✔
775
  SUdfCallResponse *subRsp = &rsp->callRsp;
31,941✔
776

777
  int32_t code = TSDB_CODE_SUCCESS;
31,941✔
778
  switch (call->callType) {
31,941!
779
    case TSDB_UDF_CALL_SCALA_PROC: {
27,575✔
780
      SUdfColumn output = {0};
27,575✔
781
      output.colMeta.bytes = udf->outputLen;
27,575✔
782
      output.colMeta.type = udf->outputType;
27,575✔
783
      output.colMeta.precision = 0;
27,575✔
784
      output.colMeta.scale = 0;
27,575✔
785
      if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
55,150!
786
        SUdfDataBlock input = {0};
27,567✔
787
        code = convertDataBlockToUdfDataBlock(&call->block, &input);
27,567✔
788
        if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
27,580!
789
        freeUdfDataDataBlock(&input);
27,369✔
790
        if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
27,584✔
791
      }
792
      freeUdfColumn(&output);
27,570✔
793
      break;
27,604✔
794
    }
795
    case TSDB_UDF_CALL_AGG_INIT: {
908✔
796
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
908✔
797
      if (outBuf.buf != NULL) {
908!
798
        code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
908✔
799
      } else {
800
        code = terrno;
×
801
      }
802
      subRsp->resultBuf = outBuf;
908✔
803
      break;
908✔
804
    }
805
    case TSDB_UDF_CALL_AGG_PROC: {
2,571✔
806
      SUdfDataBlock input = {0};
2,571✔
807
      if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
2,571!
808
        SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
2,571✔
809
        if (outBuf.buf != NULL) {
2,571!
810
          code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
2,571✔
811
          freeUdfInterBuf(&call->interBuf);
2,571✔
812
          subRsp->resultBuf = outBuf;
2,571✔
813
        } else {
814
          code = terrno;
×
815
        }
816
      }
817
      freeUdfDataDataBlock(&input);
2,571✔
818

819
      break;
2,571✔
820
    }
821
    case TSDB_UDF_CALL_AGG_MERGE: {
×
822
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
×
823
      if (outBuf.buf != NULL) {
×
824
        code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
×
825
        freeUdfInterBuf(&call->interBuf);
×
826
        freeUdfInterBuf(&call->interBuf2);
×
827
        subRsp->resultBuf = outBuf;
×
828
      } else {
829
        code = terrno;
×
830
      }
831

832
      break;
×
833
    }
834
    case TSDB_UDF_CALL_AGG_FIN: {
887✔
835
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
887✔
836
      if (outBuf.buf != NULL) {
887!
837
        code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
887✔
838
        freeUdfInterBuf(&call->interBuf);
887✔
839
        subRsp->resultBuf = outBuf;
887✔
840
      } else {
841
        code = terrno;
×
842
      }
843

844
      break;
887✔
845
    }
846
    default:
×
847
      break;
×
848
  }
849

850
  rsp->seqNum = request->seqNum;
31,970✔
851
  rsp->type = request->type;
31,970✔
852
  rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
31,970✔
853
  subRsp->callType = call->callType;
31,970✔
854

855
  int32_t len = encodeUdfResponse(NULL, rsp);
31,970✔
856
  if(len < 0) {
31,761!
857
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
858
    goto _exit;
×
859
  }
860
  rsp->msgLen = len;
31,761✔
861
  void *bufBegin = taosMemoryMalloc(len);
31,761✔
862
  if (bufBegin == NULL) {
31,949!
863
    fnError("udfdProcessCallRequest: malloc failed. len %d", len);
×
864
    goto _exit;
×
865
  }
866
  void *buf = bufBegin;
31,949✔
867
  if(encodeUdfResponse(&buf, rsp) < 0) {
31,949!
868
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
869
    taosMemoryFree(bufBegin);
×
870
    goto _exit;
×
871
  }
872

873
  uvUdf->output = uv_buf_init(bufBegin, len);
31,903✔
874

875
_exit:
31,851✔
876
  switch (call->callType) {
31,851!
877
    case TSDB_UDF_CALL_SCALA_PROC: {
27,485✔
878
      blockDataFreeRes(&call->block);
27,485✔
879
      blockDataFreeRes(&subRsp->resultData);
27,477✔
880
      break;
27,571✔
881
    }
882
    case TSDB_UDF_CALL_AGG_INIT: {
908✔
883
      freeUdfInterBuf(&subRsp->resultBuf);
908✔
884
      break;
908✔
885
    }
886
    case TSDB_UDF_CALL_AGG_PROC: {
2,571✔
887
      blockDataFreeRes(&call->block);
2,571✔
888
      freeUdfInterBuf(&subRsp->resultBuf);
2,571✔
889
      break;
2,571✔
890
    }
891
    case TSDB_UDF_CALL_AGG_MERGE: {
×
892
      freeUdfInterBuf(&subRsp->resultBuf);
×
893
      break;
×
894
    }
895
    case TSDB_UDF_CALL_AGG_FIN: {
887✔
896
      freeUdfInterBuf(&subRsp->resultBuf);
887✔
897
      break;
887✔
898
    }
899
    default:
×
900
      break;
×
901
  }
902

903
  taosMemoryFree(uvUdf->input.base);
31,937✔
904
  return;
31,942✔
905
}
906

907
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
836✔
908
  SUdfTeardownRequest *teardown = &request->teardown;
836✔
909
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
836!
910
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
836✔
911
  SUdf            *udf = handle->udf;
836✔
912
  bool             unloadUdf = false;
836✔
913
  int32_t          code = TSDB_CODE_SUCCESS;
836✔
914

915
  uv_mutex_lock(&global.udfsMutex);
836✔
916
  udf->refCount--;
836✔
917
  if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
836!
918
    unloadUdf = true;
751✔
919
    code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
751✔
920
    if (code != 0) {
751!
921
      fnError("udf name %s remove from hash failed, err:%0x %s", udf->name, code, tstrerror(code));
×
922
      uv_mutex_unlock(&global.udfsMutex);
×
923
      goto _send;
×
924
    }
925
  }
926
  uv_mutex_unlock(&global.udfsMutex);
836✔
927
  if (unloadUdf) {
836✔
928
    fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
751!
929
    uv_cond_destroy(&udf->condReady);
751✔
930
    uv_mutex_destroy(&udf->lock);
751✔
931
    code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
751✔
932
    fnDebug("udfd destroy function returns %d", code);
751✔
933
    taosMemoryFree(udf);
751✔
934
  }
935

936
_send:
85✔
937
  taosMemoryFree(handle);
836✔
938
  SUdfResponse  response = {0};
836✔
939
  SUdfResponse *rsp = &response;
836✔
940
  rsp->seqNum = request->seqNum;
836✔
941
  rsp->type = request->type;
836✔
942
  rsp->code = code;
836✔
943
  int32_t len = encodeUdfResponse(NULL, rsp);
836✔
944
  if (len < 0) {
836!
945
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
946
    return;
×
947
  }
948
  rsp->msgLen = len;
836✔
949
  void *bufBegin = taosMemoryMalloc(len);
836✔
950
  if(bufBegin == NULL) {
836!
951
    fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
×
952
    return;
×
953
  }
954
  void *buf = bufBegin;
836✔
955
  if (encodeUdfResponse(&buf, rsp) < 0) {
836!
956
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
957
    taosMemoryFree(bufBegin);
×
958
    return;
×
959
  }
960
  uvUdf->output = uv_buf_init(bufBegin, len);
836✔
961

962
  taosMemoryFree(uvUdf->input.base);
836✔
963
  return;
836✔
964
}
965

966
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
802✔
967
  if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
802✔
968
#ifdef WINDOWS
969
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
970
#else
971
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
643✔
972
             udf->createdTime);
643✔
973
#endif
974
  } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
159!
975
#ifdef WINDOWS
976
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
977
#else
978
    snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
159✔
979
#endif
980
  } else {
981
#ifdef WINDOWS
982
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
983
#else
984
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
985
#endif
986
  }
987
}
802✔
988

989
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
802✔
990
  if (!osDataSpaceAvailable()) {
802!
991
    terrno = TSDB_CODE_NO_DISKSPACE;
×
992
    fnError("udfd create shared library failed since %s", terrstr());
×
993
    return terrno;
×
994
  }
995

996
  char path[PATH_MAX] = {0};
802✔
997
  udfdGetFuncBodyPath(udf, path);
802✔
998
  bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
802✔
999
  if (fileExist) {
802✔
1000
    tstrncpy(udf->path, path, PATH_MAX);
718✔
1001
    fnInfo("udfd func body file. reuse existing file %s", path);
718!
1002
    return TSDB_CODE_SUCCESS;
718✔
1003
  }
1004

1005
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
84✔
1006
  if (file == NULL) {
84!
1007
    fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(terrno));
×
1008
    return TSDB_CODE_FILE_CORRUPTED;
×
1009
  }
1010
  int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
84✔
1011
  if (count != pFuncInfo->codeSize) {
84!
1012
    fnError("udfd write udf shared library failed");
×
1013
    return TSDB_CODE_FILE_CORRUPTED;
×
1014
  }
1015
  if(taosCloseFile(&file) != 0) {
84!
1016
    fnError("udfdSaveFuncBodyToFile, udfd close file failed");
×
1017
    return TSDB_CODE_FILE_CORRUPTED;
×
1018
  }
1019

1020
  tstrncpy(udf->path, path, PATH_MAX);
84✔
1021
  return TSDB_CODE_SUCCESS;
84✔
1022
}
1023

1024
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
802✔
1025
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
802✔
1026

1027
  if (pEpSet) {
802!
1028
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
×
1029
      updateEpSet_s(&global.mgmtEp, pEpSet);
×
1030
    }
1031
  }
1032

1033
  if (pMsg->code != TSDB_CODE_SUCCESS) {
802!
1034
    fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
×
1035
    msgInfo->code = pMsg->code;
×
1036
    goto _return;
×
1037
  }
1038

1039
  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
802!
1040
    SConnectRsp connectRsp = {0};
×
1041
    if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
×
1042
      fnError("udfd deserialize connect response failed");
×
1043
      goto _return;
×
1044
    }
1045

1046
    int32_t now = taosGetTimestampSec();
×
1047
    int32_t delta = abs(now - connectRsp.svrTimestamp);
×
1048
    if (delta > 900) {
×
1049
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
×
1050
      goto _return;
×
1051
    }
1052

1053
    if (connectRsp.epSet.numOfEps == 0) {
×
1054
      msgInfo->code = TSDB_CODE_APP_ERROR;
×
1055
      goto _return;
×
1056
    }
1057

1058
    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
×
1059
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
×
1060
    }
1061
    msgInfo->code = 0;
×
1062
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
802!
1063
    SRetrieveFuncRsp retrieveRsp = {0};
802✔
1064
    if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
802!
1065
      fnError("udfd deserialize retrieve func response failed");
×
1066
      goto _return;
×
1067
    }
1068

1069
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
802✔
1070
    SUdf      *udf = msgInfo->param;
802✔
1071
    udf->funcType = pFuncInfo->funcType;
802✔
1072
    udf->scriptType = pFuncInfo->scriptType;
802✔
1073
    udf->outputType = pFuncInfo->outputType;
802✔
1074
    udf->outputLen = pFuncInfo->outputLen;
802✔
1075
    udf->bufSize = pFuncInfo->bufSize;
802✔
1076

1077
    SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
802✔
1078
    udf->version = pFuncExtraInfo->funcVersion;
802✔
1079
    udf->createdTime = pFuncExtraInfo->funcCreatedTime;
802✔
1080
    msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
802✔
1081
    if (msgInfo->code != 0) {
802!
1082
      udf->lastFetchTime = 0;
×
1083
    }
1084
    tFreeSFuncInfo(pFuncInfo);
802✔
1085
    taosArrayDestroy(retrieveRsp.pFuncInfos);
802✔
1086
    taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
802✔
1087
  }
1088

1089
_return:
×
1090
  rpcFreeCont(pMsg->pCont);
802✔
1091
  uv_sem_post(&msgInfo->resultSem);
802✔
1092
  return;
802✔
1093
}
1094

1095
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
802✔
1096
  SRetrieveFuncReq retrieveReq = {0};
802✔
1097
  retrieveReq.numOfFuncs = 1;
802✔
1098
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
802✔
1099
  if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
1,604!
1100
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1101
    return terrno;
×
1102
  }
1103

1104
  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
802✔
1105
  if(contLen < 0) {
802!
1106
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1107
    return terrno;
×
1108
  }
1109
  void   *pReq = rpcMallocCont(contLen);
802✔
1110
  if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq)  < 0) {
802!
1111
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1112
    rpcFreeCont(pReq);
×
1113
    return terrno;
×
1114
  }
1115
  taosArrayDestroy(retrieveReq.pFuncNames);
802✔
1116

1117
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
802✔
1118
  if(NULL == msgInfo) {
802!
1119
    return terrno;
×
1120
  }
1121
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
802✔
1122
  msgInfo->param = udf;
802✔
1123
  if(uv_sem_init(&msgInfo->resultSem, 0)  != 0) {
802!
1124
    taosMemoryFree(msgInfo);
×
1125
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1126
  }
1127

1128
  SRpcMsg rpcMsg = {0};
802✔
1129
  rpcMsg.pCont = pReq;
802✔
1130
  rpcMsg.contLen = contLen;
802✔
1131
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
802✔
1132
  rpcMsg.info.ahandle = msgInfo;
802✔
1133
  int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
802✔
1134
  if (code == 0) {
802!
1135
    uv_sem_wait(&msgInfo->resultSem);
802✔
1136
    uv_sem_destroy(&msgInfo->resultSem);
802✔
1137
    code = msgInfo->code;
802✔
1138
  }
1139
  taosMemoryFree(msgInfo);
802✔
1140
  return code;
802✔
1141
}
1142

1143
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
×
1144
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
×
1145
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
×
1146
      code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
×
1147
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
1148
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) {
×
1149
      return false;
×
1150
    }
1151
    return true;
×
1152
  } else {
1153
    return false;
×
1154
  }
1155
}
1156

1157
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
2,515✔
1158
  pEpSet->version = 0;
2,515✔
1159

1160
  // init mnode ip set
1161
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
2,515✔
1162
  mgmtEpSet->numOfEps = 0;
2,515✔
1163
  mgmtEpSet->inUse = 0;
2,515✔
1164

1165
  if (firstEp && firstEp[0] != 0) {
2,515!
1166
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,515!
1167
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1168
      return -1;
×
1169
    }
1170

1171
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
2,515✔
1172
    if (code != TSDB_CODE_SUCCESS) {
2,515!
1173
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1174
      return terrno;
×
1175
    }
1176

1177
    mgmtEpSet->numOfEps++;
2,515✔
1178
  }
1179

1180
  if (secondEp && secondEp[0] != 0) {
2,515!
1181
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,515!
1182
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1183
      return -1;
×
1184
    }
1185

1186
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,515✔
1187
    if (code != TSDB_CODE_SUCCESS) {
2,515!
1188
      fnError("invalid ep %s", secondEp);
×
1189
    } else {
1190
      mgmtEpSet->numOfEps++;
2,515✔
1191
    }
1192
  }
1193

1194
  if (mgmtEpSet->numOfEps == 0) {
2,515!
1195
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1196
    return -1;
×
1197
  }
1198

1199
  return 0;
2,515✔
1200
}
1201

1202
int32_t udfdOpenClientRpc() {
2,515✔
1203
  SRpcInit rpcInit = {0};
2,515✔
1204
  rpcInit.label = "UDFD";
2,515✔
1205
  rpcInit.numOfThreads = 1;
2,515✔
1206
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
2,515✔
1207
  rpcInit.sessions = 1024;
2,515✔
1208
  rpcInit.connType = TAOS_CONN_CLIENT;
2,515✔
1209
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,515✔
1210
  rpcInit.user = TSDB_DEFAULT_USER;
2,515✔
1211
  rpcInit.parent = &global;
2,515✔
1212
  rpcInit.rfp = udfdRpcRfp;
2,515✔
1213
  rpcInit.compressSize = tsCompressMsgSize;
2,515✔
1214

1215
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,515✔
1216
  connLimitNum = TMAX(connLimitNum, 10);
2,515✔
1217
  connLimitNum = TMIN(connLimitNum, 500);
2,515✔
1218
  rpcInit.connLimitNum = connLimitNum;
2,515✔
1219
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,515✔
1220
  TAOS_CHECK_RETURN(taosVersionStrToInt(td_version, &rpcInit.compatibilityVer));
2,515!
1221
  global.clientRpc = rpcOpen(&rpcInit);
2,515✔
1222
  if (global.clientRpc == NULL) {
2,515!
1223
    fnError("failed to init dnode rpc client");
×
1224
    return terrno;
×
1225
  }
1226
  return 0;
2,515✔
1227
}
1228

1229
void udfdCloseClientRpc() {
2,515✔
1230
  fnInfo("udfd begin closing rpc");
2,515!
1231
  rpcClose(global.clientRpc);
2,515✔
1232
  fnInfo("udfd finish closing rpc");
2,515!
1233
}
2,515✔
1234

1235
void udfdOnWrite(uv_write_t *req, int status) {
33,717✔
1236
  SUvUdfWork *work = (SUvUdfWork *)req->data;
33,717✔
1237
  if (status < 0) {
33,717!
1238
    fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
×
1239
  }
1240
  // remove work from the connection work list
1241
  if (work->conn != NULL) {
33,717!
1242
    SUvUdfWork **ppWork;
1243
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
70,590!
1244
    }
1245
    if (*ppWork == work) {
33,717!
1246
      *ppWork = work->pWorkNext;
33,717✔
1247
    } else {
1248
      fnError("work not in conn any more");
×
1249
    }
1250
  }
1251
  taosMemoryFree(work->output.base);
33,717✔
1252
  taosMemoryFree(work);
33,717✔
1253
  taosMemoryFree(req);
33,717✔
1254
}
33,717✔
1255

1256
void udfdSendResponse(uv_work_t *work, int status) {
33,717✔
1257
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
33,717✔
1258

1259
  if (udfWork->conn != NULL) {
33,717!
1260
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
33,717✔
1261
    if(write_req == NULL) {
33,717!
1262
      fnError("udfd send response error, malloc failed");
×
1263
      taosMemoryFree(work);
×
1264
      return;
×
1265
    }
1266
    write_req->data = udfWork;
33,717✔
1267
    int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
33,717✔
1268
    if (code != 0) {
33,717!
1269
      fnError("udfd send response error %s", uv_strerror(code));
×
1270
      taosMemoryFree(write_req);
×
1271
   }
1272
  }
1273
  taosMemoryFree(work);
33,717✔
1274
}
1275

1276
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
97,199✔
1277
  SUdfdUvConn *ctx = handle->data;
97,199✔
1278
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
97,199✔
1279
  if (ctx->inputCap == 0) {
97,199✔
1280
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
34,618✔
1281
    if (ctx->inputBuf) {
34,618!
1282
      ctx->inputLen = 0;
34,618✔
1283
      ctx->inputCap = msgHeadSize;
34,618✔
1284
      ctx->inputTotal = -1;
34,618✔
1285

1286
      buf->base = ctx->inputBuf;
34,618✔
1287
      buf->len = ctx->inputCap;
34,618✔
1288
    } else {
1289
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1290
      buf->len = 0;
×
1291
    }
1292
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
62,581!
1293
    buf->base = ctx->inputBuf + ctx->inputLen;
28,847✔
1294
    buf->len = msgHeadSize - ctx->inputLen;
28,847✔
1295
  } else {
1296
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
33,734✔
1297
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
33,734✔
1298
    if (inputBuf) {
33,734!
1299
      ctx->inputBuf = inputBuf;
33,734✔
1300
      buf->base = ctx->inputBuf + ctx->inputLen;
33,734✔
1301
      buf->len = ctx->inputCap - ctx->inputLen;
33,734✔
1302
    } else {
1303
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1304
      buf->len = 0;
×
1305
    }
1306
  }
1307
}
97,199✔
1308

1309
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
67,451✔
1310
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
67,451!
1311
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
33,717✔
1312
  }
1313
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
67,451✔
1314
    fnDebug("receive request complete. length %d", pipe->inputLen);
33,717✔
1315
    return true;
33,717✔
1316
  }
1317
  return false;
33,734✔
1318
}
1319

1320
void udfdHandleRequest(SUdfdUvConn *conn) {
33,717✔
1321
  char   *inputBuf = conn->inputBuf;
33,717✔
1322
  int32_t inputLen = conn->inputLen;
33,717✔
1323

1324
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
33,717✔
1325
  if(work == NULL) {
33,717!
1326
    fnError("udfd malloc work failed");
×
1327
    return;
×
1328
  }
1329
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
33,717✔
1330
  if(udfWork == NULL) {
33,717!
1331
    fnError("udfd malloc udf work failed");
×
1332
    taosMemoryFree(work);
×
1333
    return;
×
1334
  }
1335
  udfWork->conn = conn;
33,717✔
1336
  udfWork->pWorkNext = conn->pWorkList;
33,717✔
1337
  conn->pWorkList = udfWork;
33,717✔
1338
  udfWork->input = uv_buf_init(inputBuf, inputLen);
33,717✔
1339
  conn->inputBuf = NULL;
33,717✔
1340
  conn->inputLen = 0;
33,717✔
1341
  conn->inputCap = 0;
33,717✔
1342
  conn->inputTotal = -1;
33,717✔
1343
  work->data = udfWork;
33,717✔
1344
  if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
33,717!
1345
  {
1346
    fnError("udfd queue work failed");
×
1347
    taosMemoryFree(work);
×
1348
    taosMemoryFree(udfWork);
×
1349
  }
1350
}
1351

1352
void udfdPipeCloseCb(uv_handle_t *pipe) {
901✔
1353
  SUdfdUvConn *conn = pipe->data;
901✔
1354
  SUvUdfWork  *pWork = conn->pWorkList;
901✔
1355
  while (pWork != NULL) {
901!
1356
    pWork->conn = NULL;
×
1357
    pWork = pWork->pWorkNext;
×
1358
  }
1359

1360
  taosMemoryFree(conn->client);
901✔
1361
  taosMemoryFree(conn->inputBuf);
901✔
1362
  taosMemoryFree(conn);
901✔
1363
}
901✔
1364

1365
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
97,199✔
1366
  fnDebug("udfd read %zd bytes from client", nread);
97,199✔
1367
  if (nread == 0) return;
97,199✔
1368

1369
  SUdfdUvConn *conn = client->data;
68,352✔
1370

1371
  if (nread > 0) {
68,352✔
1372
    conn->inputLen += nread;
67,451✔
1373
    if (isUdfdUvMsgComplete(conn)) {
67,451✔
1374
      udfdHandleRequest(conn);
33,717✔
1375
    } else {
1376
      // log error or continue;
1377
    }
1378
    return;
67,451✔
1379
  }
1380

1381
  if (nread < 0) {
901!
1382
    if (nread == UV_EOF) {
901!
1383
      fnInfo("udfd pipe read EOF");
901!
1384
    } else {
1385
      fnError("Receive error %s", uv_err_name(nread));
×
1386
    }
1387
    udfdUvHandleError(conn);
901✔
1388
  }
1389
}
1390

1391
void udfdOnNewConnection(uv_stream_t *server, int status) {
901✔
1392
  if (status < 0) {
901!
1393
    fnError("udfd new connection error. code: %s", uv_strerror(status));
×
1394
    return;
×
1395
  }
1396
  int32_t code = 0;
901✔
1397

1398
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
901✔
1399
  if(client == NULL) {
901!
1400
    fnError("udfd pipe malloc failed");
×
1401
    return;
×
1402
  }
1403
  code = uv_pipe_init(global.loop, client, 0);
901✔
1404
  if (code) {
901!
1405
    fnError("udfd pipe init error %s", uv_strerror(code));
×
1406
    taosMemoryFree(client);
×
1407
    return;
×
1408
  }
1409
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
901!
1410
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
901✔
1411
    if(ctx == NULL) {
901!
1412
      fnError("udfd conn malloc failed");
×
1413
      goto _exit;
×
1414
    }
1415
    ctx->pWorkList = NULL;
901✔
1416
    ctx->client = (uv_stream_t *)client;
901✔
1417
    ctx->inputBuf = 0;
901✔
1418
    ctx->inputLen = 0;
901✔
1419
    ctx->inputCap = 0;
901✔
1420
    client->data = ctx;
901✔
1421
    ctx->client = (uv_stream_t *)client;
901✔
1422
    code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
901✔
1423
    if (code) {
901!
1424
      fnError("udfd read start error %s", uv_strerror(code));
×
1425
      udfdUvHandleError(ctx);
×
1426
      taosMemoryFree(ctx);
×
1427
      taosMemoryFree(client);
×
1428
    }
1429
    return;
901✔
1430
  }
1431
_exit:
×
1432
    uv_close((uv_handle_t *)client, NULL);
×
1433
    taosMemoryFree(client);
×
1434
}
1435

1436
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
×
1437
  fnInfo("udfd signal received: %d\n", signum);
×
1438
  uv_fs_t req;
1439
  int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
1440
  if(code) {
×
1441
    fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
×
1442
  }
1443
  code = uv_signal_stop(handle);
×
1444
  if(code) {
×
1445
    fnError("stop signal handler failed, reason:%s", uv_strerror(code));
×
1446
  }
1447
  uv_stop(global.loop);
×
1448
}
×
1449

1450
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
2,515✔
1451
  for (int32_t i = 1; i < argc; ++i) {
5,030✔
1452
    if (strcmp(argv[i], "-c") == 0) {
2,515!
1453
      if (i < argc - 1) {
2,515!
1454
        if (strlen(argv[++i]) >= PATH_MAX) {
2,515!
1455
          (void)printf("config file path overflow");
×
1456
          return -1;
×
1457
        }
1458
        tstrncpy(configDir, argv[i], PATH_MAX);
2,515✔
1459
      } else {
1460
        (void)printf("'-c' requires a parameter, default is %s\n", configDir);
×
1461
        return -1;
×
1462
      }
1463
    } else if (strcmp(argv[i], "-V") == 0) {
×
1464
      global.printVersion = true;
×
1465
    } else {
1466
    }
1467
  }
1468

1469
  return 0;
2,515✔
1470
}
1471

1472
static void udfdPrintVersion() {
×
1473
  (void)printf("udfd version: %s compatible_version: %s\n", td_version, td_compatible_version);
×
1474
  (void)printf("git: %s\n", td_gitinfo);
×
1475
  (void)printf("build: %s\n", td_buildinfo);
×
1476
}
×
1477

1478
static int32_t udfdInitLog() {
2,515✔
1479
  char logName[12] = {0};
2,515✔
1480
  snprintf(logName, sizeof(logName), "%slog", "udfd");
2,515✔
1481
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
2,515✔
1482
}
1483

1484
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
2,515✔
1485
  buf->base = taosMemoryMalloc(suggested_size);
2,515✔
1486
  if (buf->base == NULL) {
2,515!
1487
    fnError("udfd ctrl pipe alloc buffer failed");
×
1488
    return;
×
1489
  }
1490
  buf->len = suggested_size;
2,515✔
1491
}
1492

1493
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
2,515✔
1494
  if (nread < 0) {
2,515!
1495
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
2,515!
1496
    taosMemoryFree(buf->base);
2,515✔
1497
    uv_close((uv_handle_t *)q, NULL);
2,515✔
1498
    uv_stop(global.loop);
2,515✔
1499
    return;
2,515✔
1500
  }
1501
  fnError("udfd ctrl pipe read %zu bytes", nread);
×
1502
  taosMemoryFree(buf->base);
×
1503
}
1504

1505
static void removeListeningPipe() {
5,030✔
1506
  uv_fs_t req;
1507
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
5,030✔
1508
  uv_fs_req_cleanup(&req);
5,030✔
1509
  if(err) {
5,030✔
1510
    fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
5,011!
1511
  }
1512
}
5,030✔
1513

1514
static int32_t udfdUvInit() {
2,515✔
1515
  TAOS_CHECK_RETURN(uv_loop_init(global.loop));
2,515!
1516

1517
  if (tsStartUdfd) {  // udfd is started by taosd, which shall exit when taosd exit
2,515!
1518
    TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
2,515!
1519
    TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
2,515!
1520
    TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
2,515!
1521
  }
1522
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
2,515✔
1523

1524
  removeListeningPipe();
2,515✔
1525

1526
  TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
2,515!
1527

1528
  TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
2,515!
1529
  TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
2,515!
1530

1531
  int r;
1532
  fnInfo("bind to pipe %s", global.listenPipeName);
2,515!
1533
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
2,515!
1534
    fnError("Bind error %s", uv_err_name(r));
×
1535
    removeListeningPipe();
×
1536
    return -2;
×
1537
  }
1538
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
2,515!
1539
    fnError("Listen error %s", uv_err_name(r));
×
1540
    removeListeningPipe();
×
1541
    return -3;
×
1542
  }
1543
  return 0;
2,515✔
1544
}
1545

1546
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
5,030✔
1547
  if (!uv_is_closing(handle)) {
5,030!
1548
    uv_close(handle, NULL);
5,030✔
1549
  }
1550
}
5,030✔
1551

1552
static int32_t udfdGlobalDataInit() {
2,515✔
1553
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
2,515✔
1554
  if (loop == NULL) {
2,515!
1555
    fnError("udfd init uv loop failed, mem overflow");
×
1556
    return terrno;
×
1557
  }
1558
  global.loop = loop;
2,515✔
1559

1560
  if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
2,515!
1561
    fnError("udfd init script plugins mutex failed");
×
1562
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1563
  }
1564

1565
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2,515✔
1566
  if (global.udfsHash == NULL) {
2,515!
1567
    return terrno;
×
1568
  }
1569
  // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
1570

1571
  if (uv_mutex_init(&global.udfsMutex) != 0) {
2,515!
1572
    fnError("udfd init udfs mutex failed");
×
1573
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1574
  }
1575

1576
  return 0;
2,515✔
1577
}
1578

1579
static void udfdGlobalDataDeinit() {
2,515✔
1580
  taosHashCleanup(global.udfsHash);
2,515✔
1581
  uv_mutex_destroy(&global.udfsMutex);
2,515✔
1582
  uv_mutex_destroy(&global.scriptPluginsMutex);
2,515✔
1583
  taosMemoryFree(global.loop);
2,515✔
1584
  fnInfo("udfd global data deinit");
2,515!
1585
}
2,515✔
1586

1587
static void udfdRun() {
2,515✔
1588
  fnInfo("start udfd event loop");
2,515!
1589
  int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
2,515✔
1590
  if(code != 0) {
2,515!
1591
    fnError("udfd event loop still has active handles or requests.");
2,515!
1592
  }
1593
  fnInfo("udfd event loop stopped.");
2,515!
1594

1595
  (void)uv_loop_close(global.loop);
2,515✔
1596

1597
  uv_walk(global.loop, udfdCloseWalkCb, NULL);
2,515✔
1598
  code = uv_run(global.loop, UV_RUN_DEFAULT);
2,515✔
1599
  if(code != 0) {
2,515!
1600
    fnError("udfd event loop still has active handles or requests.");
×
1601
  }
1602
  (void)uv_loop_close(global.loop);
2,515✔
1603
}
2,515✔
1604

1605
int32_t udfdInitResidentFuncs() {
2,515✔
1606
  if (strlen(tsUdfdResFuncs) == 0) {
2,515!
1607
    return TSDB_CODE_SUCCESS;
2,515✔
1608
  }
1609

1610
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
×
1611
  char *pSave = tsUdfdResFuncs;
×
1612
  char *token;
1613
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
×
1614
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
×
1615
    tstrncpy(func, token, TSDB_FUNC_NAME_LEN);
×
1616
    fnInfo("udfd add resident function %s", func);
×
1617
    if(taosArrayPush(global.residentFuncs, func) == NULL)
×
1618
    {
1619
      taosArrayDestroy(global.residentFuncs);
×
1620
      return terrno;
×
1621
    }
1622
  }
1623

1624
  return TSDB_CODE_SUCCESS;
×
1625
}
1626

1627
void udfdDeinitResidentFuncs() {
2,515✔
1628
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
2,515!
1629
    char  *funcName = taosArrayGet(global.residentFuncs, i);
×
1630
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
×
1631
    if (udfInHash) {
×
1632
      SUdf   *udf = *udfInHash;
×
1633
      int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
×
1634
      fnDebug("udfd destroy function returns %d", code);
×
1635
      if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
×
1636
      {
1637
        fnError("udfd remove resident function %s failed", funcName);
×
1638
      }
1639
      taosMemoryFree(udf);
×
1640
    }
1641
  }
1642
  taosArrayDestroy(global.residentFuncs);
2,515✔
1643
  fnInfo("udfd resident functions are deinit");
2,515!
1644
}
2,515✔
1645

1646
int32_t udfdCreateUdfSourceDir() {
2,515✔
1647
  snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
2,515✔
1648
  int32_t code = taosMkDir(global.udfDataDir);
2,515✔
1649
  if (code != TSDB_CODE_SUCCESS) {
2,515!
1650
    snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
×
1651
    code = taosMkDir(global.udfDataDir);
×
1652
  }
1653
  fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
2,515!
1654

1655
  return code;
2,515✔
1656
}
1657

1658
void udfdDestroyUdfSourceDir() {
2,515✔
1659
  fnInfo("destory udf source directory %s", global.udfDataDir);
2,515!
1660
  taosRemoveDir(global.udfDataDir);
2,515✔
1661
}
2,515✔
1662

1663
int main(int argc, char *argv[]) {
2,515✔
1664
  int  code = 0;
2,515✔
1665
  bool logInitialized = false;
2,515✔
1666
  bool cfgInitialized = false;
2,515✔
1667
  bool openClientRpcFinished = false;
2,515✔
1668
  bool residentFuncsInited = false;
2,515✔
1669
  bool udfSourceDirInited = false;
2,515✔
1670
  bool globalDataInited = false;
2,515✔
1671

1672
  if (!taosCheckSystemIsLittleEnd()) {
2,515!
1673
    (void)printf("failed to start since on non-little-end machines\n");
×
1674
    return -1;
×
1675
  }
1676

1677
  if (udfdParseArgs(argc, argv) != 0) {
2,515!
1678
    (void)printf("failed to start since parse args error\n");
×
1679
    return -1;
×
1680
  }
1681

1682
  if (global.printVersion) {
2,515!
1683
    udfdPrintVersion();
×
1684
    return 0;
×
1685
  }
1686

1687
  if (udfdInitLog() != 0) {
2,515!
1688
    // ignore create log failed, because this error no matter
1689
    (void)printf("failed to init udfd log.");
×
1690
  } else {
1691
    logInitialized = true;  // log is initialized
2,515✔
1692
  }
1693

1694
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
2,515!
1695
    fnError("failed to start since read config error");
×
1696
    code = -2;
×
1697
    goto _exit;
×
1698
  }
1699
  cfgInitialized = true;  // cfg is initialized
2,515✔
1700
  fnInfo("udfd start with config file %s", configDir);
2,515!
1701

1702
  if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
2,515!
1703
    fnError("init ep set from cfg failed");
×
1704
    code = -3;
×
1705
    goto _exit;
×
1706
  }
1707
  fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
2,515!
1708
  if (udfdOpenClientRpc() != 0) {
2,515!
1709
    fnError("open rpc connection to mnode failed");
×
1710
    code = -4;
×
1711
    goto _exit;
×
1712
  }
1713
  fnInfo("udfd rpc client is opened");
2,515!
1714
  openClientRpcFinished = true;  // rpc is opened
2,515✔
1715

1716
  if (udfdCreateUdfSourceDir() != 0) {
2,515!
1717
    fnError("create udf source directory failed");
×
1718
    code = -5;
×
1719
    goto _exit;
×
1720
  }
1721
  udfSourceDirInited = true;  // udf source dir is created
2,515✔
1722
  fnInfo("udfd udf source directory is created");
2,515!
1723

1724
  if (udfdGlobalDataInit() != 0) {
2,515!
1725
    fnError("init global data failed");
×
1726
    code = -6;
×
1727
    goto _exit;
×
1728
  }
1729
  globalDataInited = true;  // global data is inited
2,515✔
1730
  fnInfo("udfd global data is inited");
2,515!
1731

1732
  if (udfdUvInit() != 0) {
2,515!
1733
    fnError("uv init failure");
×
1734
    code = -7;
×
1735
    goto _exit;
×
1736
  }
1737
  fnInfo("udfd uv is inited");
2,515!
1738

1739
  if (udfdInitResidentFuncs() != 0) {
2,515!
1740
    fnError("init resident functions failed");
×
1741
    code = -8;
×
1742
    goto _exit;
×
1743
  }
1744
  residentFuncsInited = true;  // resident functions are inited
2,515✔
1745
  fnInfo("udfd resident functions are inited");
2,515!
1746

1747
  udfdRun();
2,515✔
1748
  fnInfo("udfd exit normally");
2,515!
1749

1750
  removeListeningPipe();
2,515✔
1751
  udfdDeinitScriptPlugins();
2,515✔
1752

1753
_exit:
2,515✔
1754
  if (globalDataInited) {
2,515!
1755
    udfdGlobalDataDeinit();
2,515✔
1756
  }
1757
  if (residentFuncsInited) {
2,515!
1758
    udfdDeinitResidentFuncs();
2,515✔
1759
  }
1760
  if (udfSourceDirInited) {
2,515!
1761
    udfdDestroyUdfSourceDir();
2,515✔
1762
  }
1763
  if (openClientRpcFinished) {
2,515!
1764
    udfdCloseClientRpc();
2,515✔
1765
  }
1766
  if (cfgInitialized) {
2,515!
1767
    taosCleanupCfg();
2,515✔
1768
  }
1769
  if (logInitialized) {
2,515!
1770
    taosCloseLog();
2,515✔
1771
  }
1772

1773
  return code;
2,515✔
1774
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc