• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.19
/source/libs/function/src/udfd.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "uv.h"
18
#include "os.h"
19
#include "fnLog.h"
20
#include "thash.h"
21

22
#include "tudf.h"
23
#include "tudfInt.h"
24
#include "version.h"
25

26
#include "tdatablock.h"
27
#include "tdataformat.h"
28
#include "tglobal.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "tmisce.h"
32
#include "tversion.h"
33
// clang-format on
34

35
#define UDFD_MAX_SCRIPT_PLUGINS 64
36
#define UDFD_MAX_SCRIPT_TYPE    1
37
#define UDFD_MAX_PLUGIN_FUNCS   9
38

39
typedef struct SUdfCPluginCtx {
40
  uv_lib_t lib;
41

42
  TUdfScalarProcFunc scalarProcFunc;
43

44
  TUdfAggStartFunc   aggStartFunc;
45
  TUdfAggProcessFunc aggProcFunc;
46
  TUdfAggFinishFunc  aggFinishFunc;
47
  TUdfAggMergeFunc   aggMergeFunc;
48

49
  TUdfInitFunc    initFunc;
50
  TUdfDestroyFunc destroyFunc;
51
} SUdfCPluginCtx;
52

53
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
13✔
54

55
int32_t udfdCPluginClose() { return 0; }
13✔
56

57
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
485✔
58
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
1,455!
59
  char  initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
485✔
60
  char *initSuffix = "_init";
485✔
61
  snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
485✔
62
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
485✔
63

64
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
477✔
65
  char *destroySuffix = "_destroy";
477✔
66
  snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
477✔
67
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
477!
68
  return 0;
477✔
69
}
70

71
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
300✔
72
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
900!
73
  char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
300✔
74
  snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); 
300✔
75
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
300!
76

77
  char  startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
300✔
78
  char *startSuffix = "_start";
300✔
79
  snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
300✔
80
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
300!
81

82
  char  finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
300✔
83
  char *finishSuffix = "_finish";
300✔
84
  snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
300✔
85
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
300!
86

87
  char  mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
300✔
88
  char *mergeSuffix = "_merge";
300✔
89
  snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix);
300✔
90
  int ret = uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
300✔
91
  if (ret != 0) {
300!
92
    fnInfo("uv_dlsym function %s. error: %s", mergeFuncName, uv_strerror(ret));
300!
93
  }
94
  return 0;
300✔
95
}
96

97
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
485✔
98
  TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx);
1,455!
99
  int32_t         err = 0;
485✔
100
  SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
485✔
101
  if (NULL == udfCtx) {
485!
102
    return terrno;
×
103
  }
104
  err = uv_dlopen(udf->path, &udfCtx->lib);
485✔
105
  if (err != 0) {
485!
106
    fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
×
107
    taosMemoryFree(udfCtx);
×
108
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
109
  }
110
  const char *udfName = udf->name;
485✔
111

112
  err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
485✔
113
  if (err != 0) {
485✔
114
    fnError("can not load init/destroy functions. error: %d", err);
8!
115
    err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
8✔
116
    goto _exit;
8✔
117
  }
118

119
  if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
477✔
120
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
177✔
121
    snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
177✔
122
    if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
177!
123
      fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
×
124
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
125
      goto _exit;
×
126
    }
127
  } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
300!
128
    err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
300✔
129
    if (err != 0) {
300!
130
      fnError("can not load aggregation functions. error: %d", err);
×
131
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
132
      goto _exit;
×
133
    }
134
  }
135

136
  if (udfCtx->initFunc) {
477!
137
    err = (udfCtx->initFunc)();
477✔
138
    if (err != 0) {
477!
139
      fnError("udf init function failed. error: %d", err);
×
140
      goto _exit;
×
141
    }
142
  }
143
  *pUdfCtx = udfCtx;
477✔
144
  return 0;
477✔
145
_exit:
8✔
146
  uv_dlclose(&udfCtx->lib);
8✔
147
  taosMemoryFree(udfCtx);
8✔
148
  return err;
8✔
149
}
150

151
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
464✔
152
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx);
928!
153
  SUdfCPluginCtx *ctx = udfCtx;
464✔
154
  int32_t         code = 0;
464✔
155
  if (ctx->destroyFunc) {
464!
156
    code = (ctx->destroyFunc)();
464✔
157
  }
158
  uv_dlclose(&ctx->lib);
464✔
159
  taosMemoryFree(ctx);
464✔
160
  return code;
464✔
161
}
162

163
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
27,193✔
164
  TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx);
108,964!
165
  SUdfCPluginCtx *ctx = udfCtx;
27,193✔
166
  if (ctx->scalarProcFunc) {
27,193!
167
    return ctx->scalarProcFunc(block, resultCol);
27,265✔
168
  } else {
169
    fnError("udfd c plugin scalar proc not implemented");
×
170
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
171
  }
172
}
173

174
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
813✔
175
  TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx);
2,439!
176
  SUdfCPluginCtx *ctx = udfCtx;
813✔
177
  if (ctx->aggStartFunc) {
813✔
178
    return ctx->aggStartFunc(buf);
804✔
179
  } else {
180
    fnError("udfd c plugin aggregation start not implemented");
9!
181
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
9✔
182
  }
183
  return 0;
184
}
185

186
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
1,838✔
187
  TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx);
9,190!
188
  SUdfCPluginCtx *ctx = udfCtx;
1,838✔
189
  if (ctx->aggProcFunc) {
1,838!
190
    return ctx->aggProcFunc(block, interBuf, newInterBuf);
1,838✔
191
  } else {
192
    fnError("udfd c plugin aggregation process not implemented");
×
193
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
194
  }
195
}
196

197
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
×
198
                               void *udfCtx) {
199
  TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
×
200
  SUdfCPluginCtx *ctx = udfCtx;
×
201
  if (ctx->aggMergeFunc) {
×
202
    return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
×
203
  } else {
204
    fnError("udfd c plugin aggregation merge not implemented");
×
205
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
206
  }
207
}
208

209
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
785✔
210
  TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
3,140!
211
  SUdfCPluginCtx *ctx = udfCtx;
785✔
212
  if (ctx->aggFinishFunc) {
785!
213
    return ctx->aggFinishFunc(buf, resultData);
785✔
214
  } else {
215
    fnError("udfd c plugin aggregation finish not implemented");
×
216
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
217
  }
218
  return 0;
219
}
220

221
// for c, the function pointer are filled directly and libloaded = true;
222
// for others, dlopen/dlsym to find function pointers
223
typedef struct SUdfScriptPlugin {
224
  int8_t scriptType;
225

226
  char     libPath[PATH_MAX];
227
  bool     libLoaded;
228
  uv_lib_t lib;
229

230
  TScriptUdfScalarProcFunc udfScalarProcFunc;
231
  TScriptUdfAggStartFunc   udfAggStartFunc;
232
  TScriptUdfAggProcessFunc udfAggProcFunc;
233
  TScriptUdfAggMergeFunc   udfAggMergeFunc;
234
  TScriptUdfAggFinishFunc  udfAggFinishFunc;
235

236
  TScriptUdfInitFunc    udfInitFunc;
237
  TScriptUdfDestoryFunc udfDestroyFunc;
238

239
  TScriptOpenFunc  openFunc;
240
  TScriptCloseFunc closeFunc;
241
} SUdfScriptPlugin;
242

243
typedef struct SUdfdContext {
244
  uv_loop_t  *loop;
245
  uv_pipe_t   ctrlPipe;
246
  uv_signal_t intrSignal;
247
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
248
  uv_pipe_t   listeningPipe;
249

250
  void     *clientRpc;
251
  SCorEpSet mgmtEp;
252

253
  uv_mutex_t udfsMutex;
254
  SHashObj  *udfsHash;
255

256
  uv_mutex_t        scriptPluginsMutex;
257
  SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
258

259
  SArray *residentFuncs;
260

261
  char udfDataDir[PATH_MAX];
262
  bool printVersion;
263
} SUdfdContext;
264

265
SUdfdContext global;
266

267
struct SUdfdUvConn;
268
struct SUvUdfWork;
269

270
typedef struct SUdfdUvConn {
271
  uv_stream_t *client;
272
  char        *inputBuf;
273
  int32_t      inputLen;
274
  int32_t      inputCap;
275
  int32_t      inputTotal;
276

277
  struct SUvUdfWork *pWorkList;  // head of work list
278
} SUdfdUvConn;
279

280
typedef struct SUvUdfWork {
281
  SUdfdUvConn *conn;
282
  uv_buf_t     input;
283
  uv_buf_t     output;
284

285
  struct SUvUdfWork *pWorkNext;
286
} SUvUdfWork;
287

288
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
289

290
typedef struct SUdf {
291
  char    name[TSDB_FUNC_NAME_LEN + 1];
292
  int32_t version;
293
  int64_t createdTime;
294

295
  int8_t  funcType;
296
  int8_t  scriptType;
297
  int8_t  outputType;
298
  int32_t outputLen;
299
  int32_t bufSize;
300

301
  char path[PATH_MAX];
302

303
  int32_t    refCount;
304
  EUdfState  state;
305
  uv_mutex_t lock;
306
  uv_cond_t  condReady;
307
  bool       resident;
308

309
  SUdfScriptPlugin *scriptPlugin;
310
  void             *scriptUdfCtx;
311

312
  int64_t lastFetchTime;  // last fetch time in milliseconds
313
  bool    expired;
314
} SUdf;
315

316
typedef struct SUdfcFuncHandle {
317
  SUdf *udf;
318
} SUdfcFuncHandle;
319

320
typedef enum EUdfdRpcReqRspType {
321
  UDFD_RPC_MNODE_CONNECT = 0,
322
  UDFD_RPC_RETRIVE_FUNC,
323
} EUdfdRpcReqRspType;
324

325
typedef struct SUdfdRpcSendRecvInfo {
326
  EUdfdRpcReqRspType rpcType;
327
  int32_t            code;
328
  void              *param;
329
  uv_sem_t           resultSem;
330
} SUdfdRpcSendRecvInfo;
331

332
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
333
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
334
static int32_t udfdConnectToMnode();
335
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
336
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
337
static int32_t udfdOpenClientRpc();
338
static void    udfdCloseClientRpc();
339

340
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
341
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
342
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
343
static void udfdProcessRequest(uv_work_t *req);
344
static void udfdOnWrite(uv_write_t *req, int status);
345
static void udfdSendResponse(uv_work_t *work, int status);
346
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
347
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
348
static void udfdHandleRequest(SUdfdUvConn *conn);
349
static void udfdPipeCloseCb(uv_handle_t *pipe);
350
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
762✔
351
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
352
static void udfdOnNewConnection(uv_stream_t *server, int status);
353

354
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
355
static void    removeListeningPipe();
356

357
static void    udfdPrintVersion();
358
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
359
static int32_t udfdInitLog();
360

361
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
362
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
363
static int32_t udfdUvInit();
364
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
365
static void    udfdRun();
366
static void    udfdConnectMnodeThreadFunc(void *args);
367

368
int32_t udfdNewUdf(SUdf **pUdf,  const char *udfName);
369
void  udfdGetFuncBodyPath(const SUdf *udf, char *path);
370

371
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
13✔
372
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
26!
373
  plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
13✔
374
  plugin->openFunc = udfdCPluginOpen;
13✔
375
  plugin->closeFunc = udfdCPluginClose;
13✔
376
  plugin->udfInitFunc = udfdCPluginUdfInit;
13✔
377
  plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
13✔
378
  plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
13✔
379
  plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
13✔
380
  plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
13✔
381
  plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
13✔
382
  plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
13✔
383

384
  SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
13✔
385
  int32_t           err = plugin->openFunc(items, 1);
13✔
386
  if (err != 0) return err;
13!
387
  return 0;
13✔
388
}
389

390
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
2✔
391
  TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func);
10!
392
  int err = uv_dlopen(libPath, pLib);
2✔
393
  if (err != 0) {
2!
394
    fnError("can not load library %s. error: %s", libPath, uv_strerror(err));
×
395
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
396
  }
397

398
  for (int i = 0; i < numOfFuncs; ++i) {
20✔
399
    err = uv_dlsym(pLib, funcName[i], func[i]);
18✔
400
    if (err != 0) {
18!
401
      fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
×
402
    }
403
  }
404
  return 0;
2✔
405
}
406

407
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
2✔
408
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
4!
409
  plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
2✔
410
  // todo: windows support
411
  snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
2✔
412
  plugin->libLoaded = false;
2✔
413
  const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen",         "pyClose",         "pyUdfInit",
2✔
414
                                                 "pyUdfDestroy",   "pyUdfScalarProc", "pyUdfAggStart",
415
                                                 "pyUdfAggFinish", "pyUdfAggProc",    "pyUdfAggMerge"};
416
  void      **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
2✔
417
           (void **)&plugin->openFunc,         (void **)&plugin->closeFunc,         (void **)&plugin->udfInitFunc,
2✔
418
           (void **)&plugin->udfDestroyFunc,   (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
2✔
419
           (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc,    (void **)&plugin->udfAggMergeFunc};
2✔
420
  int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
2✔
421
  if (err != 0) {
2!
422
    fnError("can not load python plugin. lib path %s", plugin->libPath);
×
423
    return err;
×
424
  }
425

426
  if (plugin->openFunc) {
2!
427
    int16_t lenPythonPath =
2✔
428
        strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1;  // global.udfDataDir:tsUdfdLdLibPath
2✔
429
    char *pythonPath = taosMemoryMalloc(lenPythonPath);
2✔
430
    if(pythonPath == NULL) {
2!
431
      uv_dlclose(&plugin->lib);
×
432
      return terrno;
×
433
    }
434
#ifdef WINDOWS
435
    snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
436
#else
437
    snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
2✔
438
#endif
439
    SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
2✔
440
    err = plugin->openFunc(items, 2);
2✔
441
    taosMemoryFree(pythonPath);
2✔
442
  }
443
  if (err != 0) {
2!
444
    fnError("udf script python plugin open func failed. error: %d", err);
×
445
    uv_dlclose(&plugin->lib);
×
446
    return err;
×
447
  }
448
  plugin->libLoaded = true;
2✔
449

450
  return 0;
2✔
451
}
452

453
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
13✔
454
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
26!
455
  if (plugin->closeFunc) {
13!
456
    if (plugin->closeFunc() != 0) {
13!
457
      fnError("udf script c plugin close func failed.line:%d", __LINE__);
×
458
    }
459
  }
460
  plugin->openFunc = NULL;
13✔
461
  plugin->closeFunc = NULL;
13✔
462
  plugin->udfInitFunc = NULL;
13✔
463
  plugin->udfDestroyFunc = NULL;
13✔
464
  plugin->udfScalarProcFunc = NULL;
13✔
465
  plugin->udfAggStartFunc = NULL;
13✔
466
  plugin->udfAggProcFunc = NULL;
13✔
467
  plugin->udfAggMergeFunc = NULL;
13✔
468
  plugin->udfAggFinishFunc = NULL;
13✔
469
  return;
13✔
470
}
471

472
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
2✔
473
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
4!
474
  if (plugin->closeFunc) {
2!
475
    if (plugin->closeFunc() != 0) {
2!
476
      fnError("udf script python plugin close func failed.line:%d", __LINE__);
×
477
    }
478
  }
479
  uv_dlclose(&plugin->lib);
2✔
480
  if (plugin->libLoaded) {
2!
481
    plugin->libLoaded = false;
2✔
482
  }
483
  plugin->openFunc = NULL;
2✔
484
  plugin->closeFunc = NULL;
2✔
485
  plugin->udfInitFunc = NULL;
2✔
486
  plugin->udfDestroyFunc = NULL;
2✔
487
  plugin->udfScalarProcFunc = NULL;
2✔
488
  plugin->udfAggStartFunc = NULL;
2✔
489
  plugin->udfAggProcFunc = NULL;
2✔
490
  plugin->udfAggMergeFunc = NULL;
2✔
491
  plugin->udfAggFinishFunc = NULL;
2✔
492
}
493

494
int32_t udfdInitScriptPlugin(int8_t scriptType) {
15✔
495
  SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
15✔
496
  if (plugin == NULL) {
15!
497
    return terrno;
×
498
  }
499
  int32_t err = 0;
15✔
500
  switch (scriptType) {
15!
501
    case TSDB_FUNC_SCRIPT_BIN_LIB:
13✔
502
      err = udfdInitializeCPlugin(plugin);
13✔
503
      if (err != 0) {
13!
504
        fnError("udf script c plugin init failed. error: %d", err);
×
505
        taosMemoryFree(plugin);
×
506
        return err;
×
507
      }
508
      break;
13✔
509
    case TSDB_FUNC_SCRIPT_PYTHON: {
2✔
510
      err = udfdInitializePythonPlugin(plugin);
2✔
511
      if (err != 0) {
2!
512
        fnError("udf script python plugin init failed. error: %d", err);
×
513
        taosMemoryFree(plugin);
×
514
        return err;
×
515
      }
516
      break;
2✔
517
    }
518
    default:
×
519
      fnError("udf script type %d not supported", scriptType);
×
520
      taosMemoryFree(plugin);
×
521
      return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
522
  }
523

524
  global.scriptPlugins[scriptType] = plugin;
15✔
525
  return TSDB_CODE_SUCCESS;
15✔
526
}
527

528
void udfdDeinitScriptPlugins() {
2,531✔
529
  SUdfScriptPlugin *plugin = NULL;
2,531✔
530
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
2,531✔
531
  if (plugin != NULL) {
2,531✔
532
    udfdDeinitPythonPlugin(plugin);
2✔
533
    taosMemoryFree(plugin);
2✔
534
    global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = NULL;
2✔
535
  }
536

537
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
2,531✔
538
  if (plugin != NULL) {
2,531✔
539
    udfdDeinitCPlugin(plugin);
13✔
540
    taosMemoryFree(plugin);
13✔
541
    global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = NULL;
13✔
542
  }
543
  return;
2,531✔
544
}
545

546
void udfdProcessRequest(uv_work_t *req) {
33,605✔
547
  TAOS_UDF_CHECK_PTR_RVOID(req);
67,211!
548
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
33,605✔
549
  if (uvUdf == NULL) {
33,605!
550
    fnError("udf work is NULL");
×
551
    return;
×
552
  }
553
  SUdfRequest request = {0};
33,605✔
554
  if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
33,605✔
555
  {
556
    taosMemoryFreeClear(uvUdf->input.base);
49!
557
    fnError("udf request decode failed");
49!
558
    return;
×
559
  }
560

561
  switch (request.type) {
33,509!
562
    case UDF_TASK_SETUP: {
762✔
563
      udfdProcessSetupRequest(uvUdf, &request);
762✔
564
      break;
762✔
565
    }
566

567
    case UDF_TASK_CALL: {
32,040✔
568
      udfdProcessCallRequest(uvUdf, &request);
32,040✔
569
      break;
32,097✔
570
    }
571
    case UDF_TASK_TEARDOWN: {
707✔
572
      udfdProcessTeardownRequest(uvUdf, &request);
707✔
573
      break;
707✔
574
    }
575
    default: {
×
576
      break;
×
577
    }
578
  }
579
}
580

581
static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
669✔
582
  udfInfo->bufSize = udf->bufSize;
669✔
583
  if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
669✔
584
    udfInfo->funcType = UDF_FUNC_TYPE_AGG;
411✔
585
  } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
258!
586
    udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
258✔
587
  }
588
  udfInfo->name = udf->name;
669✔
589
  udfInfo->version = udf->version;
669✔
590
  udfInfo->createdTime = udf->createdTime;
669✔
591
  udfInfo->outputLen = udf->outputLen;
669✔
592
  udfInfo->outputType = udf->outputType;
669✔
593
  udfInfo->path = udf->path;
669✔
594
  udfInfo->scriptType = udf->scriptType;
669✔
595
}
669✔
596

597
static int32_t udfdInitUdf(char *udfName, SUdf *udf) {
669✔
598
  TAOS_UDF_CHECK_PTR_RCODE(udfName, udf);
2,007!
599
  int32_t err = 0;
669✔
600
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
669✔
601
  if (err != 0) {
669!
602
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
×
603
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
604
  }
605
  if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
669!
606
    fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
×
607
    return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
608
  }
609

610
  uv_mutex_lock(&global.scriptPluginsMutex);
669✔
611
  SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
669✔
612
  if (scriptPlugin == NULL) {
669✔
613
    err = udfdInitScriptPlugin(udf->scriptType);
15✔
614
    if (err != 0) {
15!
615
      fnError("udf name %s init script plugin failed. error %d", udfName, err);
×
616
      uv_mutex_unlock(&global.scriptPluginsMutex);
×
617
      return err;
×
618
    }
619
  }
620
  uv_mutex_unlock(&global.scriptPluginsMutex);
669✔
621
  udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
669✔
622

623
  SScriptUdfInfo info = {0};
669✔
624
  convertUdf2UdfInfo(udf, &info);
669✔
625
  err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
669✔
626
  if (err != 0) {
669✔
627
    fnError("udf name %s init failed. error %d", udfName, err);
8!
628
    return err;
8✔
629
  }
630

631
  fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
661!
632
  return 0;
661✔
633
}
634

635
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
663✔
636
  TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName);
1,989!
637
  SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
663✔
638
  if (NULL == udfNew) {
663!
639
    return terrno;
×
640
  }
641
  udfNew->refCount = 1;
663✔
642
  udfNew->lastFetchTime = taosGetTimestampMs();
663✔
643
  tstrncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
663✔
644

645
  udfNew->state = UDF_STATE_INIT;
663✔
646
  if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
663!
647
  if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
663!
648

649
  udfNew->resident = false;
663✔
650
  udfNew->expired = false;
663✔
651
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
663!
652
    char *funcName = taosArrayGet(global.residentFuncs, i);
×
653
    if (strcmp(udfName, funcName) == 0) {
×
654
      udfNew->resident = true;
×
655
      break;
×
656
    }
657
  }
658
  *pUdf =  udfNew;
663✔
659
  return 0;
663✔
660
}
661

662
void udfdFreeUdf(void *pData) {
×
663
  SUdf *pSudf = (SUdf *)pData;
×
664
  if (pSudf == NULL) {
×
665
    return;
×
666
  }
667

668
  if (pSudf->scriptPlugin != NULL) {
×
669
    if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
×
670
      fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name);
×
671
    }
672
  }
673

674
  uv_mutex_destroy(&pSudf->lock);
×
675
  uv_cond_destroy(&pSudf->condReady);
×
676
  taosMemoryFree(pSudf);
×
677
}
678

679
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
762✔
680
  TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName);
2,286!
681
  uv_mutex_lock(&global.udfsMutex);
762✔
682
  SUdf  **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
762✔
683
  int64_t currTime = taosGetTimestampMs();
762✔
684
  bool    expired = false;
762✔
685
  if (pUdfHash) {
762✔
686
    expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;  // 10s
105✔
687
    if (!expired) {
105✔
688
      ++(*pUdfHash)->refCount;
99✔
689
      *ppUdf = *pUdfHash;
99✔
690
      uv_mutex_unlock(&global.udfsMutex);
99✔
691
      fnInfo("udfd reuse existing udf. udf  %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
99!
692
             (*ppUdf)->createdTime);
693
      return 0;
99✔
694
    } else {
695
      (*pUdfHash)->expired = true;
6✔
696
      fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
6!
697
             (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
698
      if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
6!
699
        fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName);
×
700
      }
701
    }
702
  }
703

704
  int32_t code = udfdNewUdf(ppUdf, udfName);
663✔
705
  if(code != 0) {
663!
706
    uv_mutex_unlock(&global.udfsMutex);
×
707
    return code;
×
708
  }
709

710
  if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
663!
711
    uv_mutex_unlock(&global.udfsMutex);
×
712
    return code;
×
713
  }
714
  uv_mutex_unlock(&global.udfsMutex);
663✔
715

716
  return 0;
663✔
717
}
718

719
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
762✔
720
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
2,286!
721
  // TODO: tracable id from client. connect, setup, call, teardown
722
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
762!
723

724
  SUdfSetupRequest *setup = &request->setup;
762✔
725
  int32_t           code = TSDB_CODE_SUCCESS;
762✔
726
  SUdf *udf = NULL;
762✔
727

728
  code = udfdGetOrCreateUdf(&udf, setup->udfName);
762✔
729
  if(code != 0) {
762!
730
    fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
×
731
    goto _send;
×
732
  }
733
  uv_mutex_lock(&udf->lock);
762✔
734
  if (udf->state == UDF_STATE_INIT) {
762✔
735
    udf->state = UDF_STATE_LOADING;
669✔
736
    code = udfdInitUdf(setup->udfName, udf);
669✔
737
    if (code == 0) {
669✔
738
      udf->state = UDF_STATE_READY;
661✔
739
    } else {
740
      udf->state = UDF_STATE_INIT;
8✔
741
    }
742
    uv_cond_broadcast(&udf->condReady);
669✔
743
    uv_mutex_unlock(&udf->lock);
669✔
744
  } else {
745
    while (udf->state == UDF_STATE_LOADING) {
93!
746
      uv_cond_wait(&udf->condReady, &udf->lock);
×
747
    }
748
    uv_mutex_unlock(&udf->lock);
93✔
749
  }
750
  SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
762✔
751
  if(handle == NULL) {
762!
752
    fnError("udfdProcessSetupRequest: malloc failed.");
×
753
    code = terrno;
×
754
  }
755
  handle->udf = udf;
762✔
756

757
_send:
762✔
758
  ;
759
  SUdfResponse rsp;
760
  rsp.seqNum = request->seqNum;
762✔
761
  rsp.type = request->type;
762✔
762
  rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
762✔
763
  rsp.setupRsp.udfHandle = (int64_t)(handle);
762✔
764
  rsp.setupRsp.outputType = udf->outputType;
762✔
765
  rsp.setupRsp.bytes = udf->outputLen;
762✔
766
  rsp.setupRsp.bufSize = udf->bufSize;
762✔
767

768
  int32_t len = encodeUdfResponse(NULL, &rsp);
762✔
769
  if(len < 0) {
762!
770
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
771
    return;
×
772
  }
773
  rsp.msgLen = len;
762✔
774
  void *bufBegin = taosMemoryMalloc(len);
762✔
775
  if(bufBegin == NULL) {
762!
776
    fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
×
777
    return;
×
778
  }
779
  void *buf = bufBegin;
762✔
780
  if(encodeUdfResponse(&buf, &rsp) < 0) {
762!
781
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
782
    taosMemoryFree(bufBegin);
×
783
    return;
×
784
  }
785
  
786
  uvUdf->output = uv_buf_init(bufBegin, len);
762✔
787

788
  taosMemoryFreeClear(uvUdf->input.base);
762!
789
  return;
762✔
790
}
791

792
static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
27,505✔
793
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
27,505!
794
    return TSDB_CODE_SUCCESS;
×
795
  }
796
  if (output->colData.numOfRows != block->info.rows) {
27,505!
797
    fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
×
798
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
799
  }
800

801
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_BYROW) {
27,505!
802
    for (int32_t i = 0; i < output->colData.numOfRows; ++i) {
739,690✔
803
      if (!udfColDataIsNull(output, i)) {
739,397✔
804
        if (IS_VAR_DATA_TYPE(output->colMeta.type)) {
39,179!
805
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.payload != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
11,825!
806
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.varOffsets[i] >= 0 &&
11,825!
807
                                       output->colData.varLenCol.varOffsets[i] < output->colData.varLenCol.payloadLen,
808
                                   TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
809
        } else {
810
          TAOS_UDF_CHECK_CONDITION(
27,354!
811
              output->colMeta.bytes * output->colData.numOfRows <= output->colData.fixLenCol.dataLen,
812
              TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
813
          break;
27,354✔
814
        }
815
      }
816
    }
817
  }
818

819
  return TSDB_CODE_SUCCESS;
27,505✔
820
}
821

822
static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
2,571✔
823
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
2,571!
824
    return TSDB_CODE_SUCCESS;
×
825
  }
826
  if (output->numOfResult != 1 && output->numOfResult != 0) {
2,571!
827
    fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
×
828
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
829
  }
830
  TAOS_UDF_CHECK_CONDITION(output->buf != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
2,571!
831
  TAOS_UDF_CHECK_CONDITION(output->bufLen > 0, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
2,571!
832
  return TSDB_CODE_SUCCESS;
2,571✔
833
}
834

835
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
32,016✔
836
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
96,145!
837
  SUdfCallRequest *call = &request->call;
32,016✔
838
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
32,016✔
839
          request->seqNum);
840
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
32,030✔
841
  SUdf             *udf = handle->udf;
32,030✔
842
  SUdfResponse      response = {0};
32,030✔
843
  SUdfResponse     *rsp = &response;
32,030✔
844
  SUdfCallResponse *subRsp = &rsp->callRsp;
32,030✔
845

846
  int32_t code = TSDB_CODE_SUCCESS;
32,030✔
847
  switch (call->callType) {
32,030!
848
    case TSDB_UDF_CALL_SCALA_PROC: {
27,653✔
849
      SUdfColumn output = {0};
27,653✔
850
      output.colMeta.bytes = udf->outputLen;
27,653✔
851
      output.colMeta.type = udf->outputType;
27,653✔
852
      output.colMeta.precision = 0;
27,653✔
853
      output.colMeta.scale = 0;
27,653✔
854
      if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
55,306!
855
        SUdfDataBlock input = {0};
27,738✔
856
        code = convertDataBlockToUdfDataBlock(&call->block, &input);
27,738✔
857
        if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
27,733!
858
        freeUdfDataDataBlock(&input);
27,452✔
859
        if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output);
27,535!
860
        if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
27,702!
861
      }
862
      freeUdfColumn(&output);
27,636✔
863
      break;
27,738✔
864
    }
865
    case TSDB_UDF_CALL_AGG_INIT: {
917✔
866
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
917✔
867
      if (outBuf.buf != NULL) {
917!
868
        code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
917✔
869
      } else {
870
        code = terrno;
×
871
      }
872
      subRsp->resultBuf = outBuf;
917✔
873
      break;
917✔
874
    }
875
    case TSDB_UDF_CALL_AGG_PROC: {
2,571✔
876
      SUdfDataBlock input = {0};
2,571✔
877
      if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
2,571!
878
        SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
2,571✔
879
        if (outBuf.buf != NULL) {
2,571!
880
          code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
2,571✔
881
          freeUdfInterBuf(&call->interBuf);
2,571✔
882
          if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf);
2,571!
883
          subRsp->resultBuf = outBuf;
2,571✔
884
        } else {
885
          code = terrno;
×
886
        }
887
      }
888
      freeUdfDataDataBlock(&input);
2,571✔
889

890
      break;
2,571✔
891
    }
892
    case TSDB_UDF_CALL_AGG_MERGE: {
×
893
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
×
894
      if (outBuf.buf != NULL) {
×
895
        code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
×
896
        freeUdfInterBuf(&call->interBuf);
×
897
        freeUdfInterBuf(&call->interBuf2);
×
898
        subRsp->resultBuf = outBuf;
×
899
      } else {
900
        code = terrno;
×
901
      }
902

903
      break;
×
904
    }
905
    case TSDB_UDF_CALL_AGG_FIN: {
889✔
906
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
889✔
907
      if (outBuf.buf != NULL) {
889!
908
        code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
889✔
909
        freeUdfInterBuf(&call->interBuf);
889✔
910
        subRsp->resultBuf = outBuf;
889✔
911
      } else {
912
        code = terrno;
×
913
      }
914

915
      break;
889✔
916
    }
917
    default:
×
918
      break;
×
919
  }
920

921
  rsp->seqNum = request->seqNum;
32,115✔
922
  rsp->type = request->type;
32,115✔
923
  rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
32,115✔
924
  subRsp->callType = call->callType;
32,115✔
925

926
  int32_t len = encodeUdfResponse(NULL, rsp);
32,115✔
927
  if(len < 0) {
31,838!
928
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
929
    goto _exit;
×
930
  }
931
  rsp->msgLen = len;
31,838✔
932
  void *bufBegin = taosMemoryMalloc(len);
31,838✔
933
  if (bufBegin == NULL) {
32,109!
934
    fnError("udfdProcessCallRequest: malloc failed. len %d", len);
×
935
    goto _exit;
×
936
  }
937
  void *buf = bufBegin;
32,109✔
938
  if(encodeUdfResponse(&buf, rsp) < 0) {
32,109!
939
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
940
    taosMemoryFree(bufBegin);
×
941
    goto _exit;
×
942
  }
943

944
  uvUdf->output = uv_buf_init(bufBegin, len);
32,098✔
945

946
_exit:
31,905✔
947
  switch (call->callType) {
31,905!
948
    case TSDB_UDF_CALL_SCALA_PROC: {
27,528✔
949
      blockDataFreeRes(&call->block);
27,528✔
950
      blockDataFreeRes(&subRsp->resultData);
27,624✔
951
      break;
27,736✔
952
    }
953
    case TSDB_UDF_CALL_AGG_INIT: {
917✔
954
      freeUdfInterBuf(&subRsp->resultBuf);
917✔
955
      break;
917✔
956
    }
957
    case TSDB_UDF_CALL_AGG_PROC: {
2,571✔
958
      blockDataFreeRes(&call->block);
2,571✔
959
      freeUdfInterBuf(&subRsp->resultBuf);
2,571✔
960
      break;
2,457✔
961
    }
962
    case TSDB_UDF_CALL_AGG_MERGE: {
×
963
      freeUdfInterBuf(&subRsp->resultBuf);
×
964
      break;
×
965
    }
966
    case TSDB_UDF_CALL_AGG_FIN: {
889✔
967
      freeUdfInterBuf(&subRsp->resultBuf);
889✔
968
      break;
889✔
969
    }
970
    default:
×
971
      break;
×
972
  }
973

974
  taosMemoryFreeClear(uvUdf->input.base);
31,999!
975
  return;
32,134✔
976
}
977

978
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
707✔
979
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
2,121!
980
  SUdfTeardownRequest *teardown = &request->teardown;
707✔
981
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
707!
982
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
707✔
983
  SUdf            *udf = handle->udf;
707✔
984
  bool             unloadUdf = false;
707✔
985
  int32_t          code = TSDB_CODE_SUCCESS;
707✔
986

987
  uv_mutex_lock(&global.udfsMutex);
707✔
988
  udf->refCount--;
707✔
989
  if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
707!
990
    unloadUdf = true;
628✔
991
    code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
628✔
992
    if (code != 0) {
628!
993
      fnError("udf name %s remove from hash failed, err:%0x %s", udf->name, code, tstrerror(code));
×
994
      uv_mutex_unlock(&global.udfsMutex);
×
995
      goto _send;
×
996
    }
997
  }
998
  uv_mutex_unlock(&global.udfsMutex);
707✔
999
  if (unloadUdf) {
707✔
1000
    fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
628!
1001
    uv_cond_destroy(&udf->condReady);
628✔
1002
    uv_mutex_destroy(&udf->lock);
628✔
1003
    code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
628✔
1004
    fnDebug("udfd destroy function returns %d", code);
628✔
1005
    taosMemoryFree(udf);
628✔
1006
  }
1007

1008
_send:
79✔
1009
  taosMemoryFree(handle);
707✔
1010
  SUdfResponse  response = {0};
707✔
1011
  SUdfResponse *rsp = &response;
707✔
1012
  rsp->seqNum = request->seqNum;
707✔
1013
  rsp->type = request->type;
707✔
1014
  rsp->code = code;
707✔
1015
  int32_t len = encodeUdfResponse(NULL, rsp);
707✔
1016
  if (len < 0) {
707!
1017
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
1018
    return;
×
1019
  }
1020
  rsp->msgLen = len;
707✔
1021
  void *bufBegin = taosMemoryMalloc(len);
707✔
1022
  if(bufBegin == NULL) {
707!
1023
    fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
×
1024
    return;
×
1025
  }
1026
  void *buf = bufBegin;
707✔
1027
  if (encodeUdfResponse(&buf, rsp) < 0) {
707!
1028
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
1029
    taosMemoryFree(bufBegin);
×
1030
    return;
×
1031
  }
1032
  uvUdf->output = uv_buf_init(bufBegin, len);
707✔
1033

1034
  taosMemoryFree(uvUdf->input.base);
707✔
1035
  return;
707✔
1036
}
1037

1038
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
669✔
1039
  TAOS_UDF_CHECK_PTR_RVOID(udf, path);
2,007!
1040
  if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
669✔
1041
#ifdef WINDOWS
1042
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1043
#else
1044
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
485✔
1045
             udf->createdTime);
485✔
1046
#endif
1047
  } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
184!
1048
#ifdef WINDOWS
1049
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1050
#else
1051
    snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
184✔
1052
#endif
1053
  } else {
1054
#ifdef WINDOWS
1055
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
1056
#else
1057
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
1058
#endif
1059
  }
1060
}
1061

1062
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
669✔
1063
  TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf);
2,007!
1064
  if (!osDataSpaceAvailable()) {
669!
1065
    terrno = TSDB_CODE_NO_DISKSPACE;
×
1066
    fnError("udfd create shared library failed since %s", terrstr());
×
1067
    return terrno;
×
1068
  }
1069

1070
  char path[PATH_MAX] = {0};
669✔
1071
  udfdGetFuncBodyPath(udf, path);
669✔
1072
  bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
669✔
1073
  if (fileExist) {
669✔
1074
    tstrncpy(udf->path, path, PATH_MAX);
587✔
1075
    fnInfo("udfd func body file. reuse existing file %s", path);
587!
1076
    return TSDB_CODE_SUCCESS;
587✔
1077
  }
1078

1079
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
82✔
1080
  if (file == NULL) {
82!
1081
    fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(terrno));
×
1082
    return TSDB_CODE_FILE_CORRUPTED;
×
1083
  }
1084
  int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
82✔
1085
  if (count != pFuncInfo->codeSize) {
82!
1086
    fnError("udfd write udf shared library failed");
×
1087
    return TSDB_CODE_FILE_CORRUPTED;
×
1088
  }
1089
  if(taosCloseFile(&file) != 0) {
82!
1090
    fnError("udfdSaveFuncBodyToFile, udfd close file failed");
×
1091
    return TSDB_CODE_FILE_CORRUPTED;
×
1092
  }
1093

1094
  tstrncpy(udf->path, path, PATH_MAX);
82✔
1095
  return TSDB_CODE_SUCCESS;
82✔
1096
}
1097

1098
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
669✔
1099
  TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg);
2,007!
1100
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
669✔
1101

1102
  if (pEpSet) {
669!
1103
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
×
1104
      updateEpSet_s(&global.mgmtEp, pEpSet);
×
1105
    }
1106
  }
1107

1108
  if (pMsg->code != TSDB_CODE_SUCCESS) {
669!
1109
    fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
×
1110
    msgInfo->code = pMsg->code;
×
1111
    goto _return;
×
1112
  }
1113

1114
  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
669!
1115
    SConnectRsp connectRsp = {0};
×
1116
    if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
×
1117
      fnError("udfd deserialize connect response failed");
×
1118
      goto _return;
×
1119
    }
1120

1121
    int32_t now = taosGetTimestampSec();
×
1122
    int32_t delta = abs(now - connectRsp.svrTimestamp);
×
1123
    if (delta > 900) {
×
1124
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
×
1125
      goto _return;
×
1126
    }
1127

1128
    if (connectRsp.epSet.numOfEps == 0) {
×
1129
      msgInfo->code = TSDB_CODE_APP_ERROR;
×
1130
      goto _return;
×
1131
    }
1132

1133
    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
×
1134
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
×
1135
    }
1136
    msgInfo->code = 0;
×
1137
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
669!
1138
    SRetrieveFuncRsp retrieveRsp = {0};
669✔
1139
    if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
669!
UNCOV
1140
      fnError("udfd deserialize retrieve func response failed");
×
UNCOV
1141
      goto _return;
×
1142
    }
1143

1144
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
669✔
1145
    SUdf      *udf = msgInfo->param;
669✔
1146
    udf->funcType = pFuncInfo->funcType;
669✔
1147
    udf->scriptType = pFuncInfo->scriptType;
669✔
1148
    udf->outputType = pFuncInfo->outputType;
669✔
1149
    udf->outputLen = pFuncInfo->outputLen;
669✔
1150
    udf->bufSize = pFuncInfo->bufSize;
669✔
1151

1152
    SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
669✔
1153
    udf->version = pFuncExtraInfo->funcVersion;
669✔
1154
    udf->createdTime = pFuncExtraInfo->funcCreatedTime;
669✔
1155
    msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
669✔
1156
    if (msgInfo->code != 0) {
669!
1157
      udf->lastFetchTime = 0;
×
1158
    }
1159
    tFreeSFuncInfo(pFuncInfo);
669✔
1160
    taosArrayDestroy(retrieveRsp.pFuncInfos);
669✔
1161
    taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
669✔
1162
  }
1163

1164
_return:
×
1165
  rpcFreeCont(pMsg->pCont);
669✔
1166
  uv_sem_post(&msgInfo->resultSem);
669✔
1167
  return;
669✔
1168
}
1169

1170
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
669✔
1171
  TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf);
2,676!
1172
  SRetrieveFuncReq retrieveReq = {0};
669✔
1173
  retrieveReq.numOfFuncs = 1;
669✔
1174
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
669✔
1175
  if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
1,338!
1176
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1177
    return terrno;
×
1178
  }
1179

1180
  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
669✔
1181
  if(contLen < 0) {
669!
1182
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1183
    return terrno;
×
1184
  }
1185
  void   *pReq = rpcMallocCont(contLen);
669✔
1186
  if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq)  < 0) {
669!
1187
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1188
    rpcFreeCont(pReq);
×
1189
    return terrno;
×
1190
  }
1191
  taosArrayDestroy(retrieveReq.pFuncNames);
669✔
1192

1193
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
669✔
1194
  if(NULL == msgInfo) {
669!
1195
    return terrno;
×
1196
  }
1197
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
669✔
1198
  msgInfo->param = udf;
669✔
1199
  if(uv_sem_init(&msgInfo->resultSem, 0)  != 0) {
669!
1200
    taosMemoryFree(msgInfo);
×
1201
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1202
  }
1203

1204
  SRpcMsg rpcMsg = {0};
669✔
1205
  rpcMsg.pCont = pReq;
669✔
1206
  rpcMsg.contLen = contLen;
669✔
1207
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
669✔
1208
  rpcMsg.info.ahandle = msgInfo;
669✔
1209
  int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
669✔
1210
  if (code == 0) {
669!
1211
    uv_sem_wait(&msgInfo->resultSem);
669✔
1212
    uv_sem_destroy(&msgInfo->resultSem);
669✔
1213
    code = msgInfo->code;
669✔
1214
  }
1215
  taosMemoryFree(msgInfo);
669✔
1216
  return code;
669✔
1217
}
1218

1219
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
×
1220
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
×
1221
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
×
1222
      code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
×
1223
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
1224
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) {
×
1225
      return false;
×
1226
    }
1227
    return true;
×
1228
  } else {
1229
    return false;
×
1230
  }
1231
}
1232

1233
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
2,531✔
1234
  pEpSet->version = 0;
2,531✔
1235

1236
  // init mnode ip set
1237
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
2,531✔
1238
  mgmtEpSet->numOfEps = 0;
2,531✔
1239
  mgmtEpSet->inUse = 0;
2,531✔
1240

1241
  if (firstEp && firstEp[0] != 0) {
2,531!
1242
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,531!
1243
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1244
      return -1;
×
1245
    }
1246

1247
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
2,531✔
1248
    if (code != TSDB_CODE_SUCCESS) {
2,531!
1249
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1250
      return terrno;
×
1251
    }
1252

1253
    mgmtEpSet->numOfEps++;
2,531✔
1254
  }
1255

1256
  if (secondEp && secondEp[0] != 0) {
2,531!
1257
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,531!
1258
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1259
      return -1;
×
1260
    }
1261

1262
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,531✔
1263
    if (code != TSDB_CODE_SUCCESS) {
2,531!
1264
      fnError("invalid ep %s", secondEp);
×
1265
    } else {
1266
      mgmtEpSet->numOfEps++;
2,531✔
1267
    }
1268
  }
1269

1270
  if (mgmtEpSet->numOfEps == 0) {
2,531!
1271
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1272
    return -1;
×
1273
  }
1274

1275
  return 0;
2,531✔
1276
}
1277

1278
int32_t udfdOpenClientRpc() {
2,531✔
1279
  SRpcInit rpcInit = {0};
2,531✔
1280
  rpcInit.label = "UDFD";
2,531✔
1281
  rpcInit.numOfThreads = 1;
2,531✔
1282
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
2,531✔
1283
  rpcInit.sessions = 1024;
2,531✔
1284
  rpcInit.connType = TAOS_CONN_CLIENT;
2,531✔
1285
  rpcInit.idleTime = tsShellActivityTimer * 1000;
2,531✔
1286
  rpcInit.user = TSDB_DEFAULT_USER;
2,531✔
1287
  rpcInit.parent = &global;
2,531✔
1288
  rpcInit.rfp = udfdRpcRfp;
2,531✔
1289
  rpcInit.compressSize = tsCompressMsgSize;
2,531✔
1290

1291
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
2,531✔
1292
  connLimitNum = TMAX(connLimitNum, 10);
2,531✔
1293
  connLimitNum = TMIN(connLimitNum, 500);
2,531✔
1294
  rpcInit.connLimitNum = connLimitNum;
2,531✔
1295
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
2,531✔
1296
  TAOS_CHECK_RETURN(taosVersionStrToInt(td_version, &rpcInit.compatibilityVer));
2,531!
1297
  global.clientRpc = rpcOpen(&rpcInit);
2,531✔
1298
  if (global.clientRpc == NULL) {
2,531!
1299
    fnError("failed to init dnode rpc client");
×
1300
    return terrno;
×
1301
  }
1302
  return 0;
2,531✔
1303
}
1304

1305
void udfdCloseClientRpc() {
2,531✔
1306
  fnInfo("udfd begin closing rpc");
2,531!
1307
  rpcClose(global.clientRpc);
2,531✔
1308
  fnInfo("udfd finish closing rpc");
2,531!
1309
}
2,531✔
1310

1311
void udfdOnWrite(uv_write_t *req, int status) {
33,612✔
1312
  TAOS_UDF_CHECK_PTR_RVOID(req);
67,224!
1313
  SUvUdfWork *work = (SUvUdfWork *)req->data;
33,612✔
1314
  if (status < 0) {
33,612!
1315
    fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
×
1316
  }
1317
  // remove work from the connection work list
1318
  if (work->conn != NULL) {
33,612!
1319
    SUvUdfWork **ppWork;
1320
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
70,751!
1321
    }
1322
    if (*ppWork == work) {
33,612!
1323
      *ppWork = work->pWorkNext;
33,612✔
1324
    } else {
1325
      fnError("work not in conn any more");
×
1326
    }
1327
  }
1328
  taosMemoryFree(work->output.base);
33,612✔
1329
  taosMemoryFree(work);
33,612✔
1330
  taosMemoryFree(req);
33,612✔
1331
}
1332

1333
void udfdSendResponse(uv_work_t *work, int status) {
33,612✔
1334
  TAOS_UDF_CHECK_PTR_RVOID(work);
67,224!
1335
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
33,612✔
1336

1337
  if (udfWork->conn != NULL) {
33,612!
1338
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
33,612✔
1339
    if(write_req == NULL) {
33,612!
1340
      fnError("udfd send response error, malloc failed");
×
1341
      taosMemoryFree(work);
×
1342
      return;
×
1343
    }
1344
    write_req->data = udfWork;
33,612✔
1345
    int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
33,612✔
1346
    if (code != 0) {
33,612!
1347
      fnError("udfd send response error %s", uv_strerror(code));
×
1348
      taosMemoryFree(write_req);
×
1349
   }
1350
  }
1351
  taosMemoryFree(work);
33,612✔
1352
}
1353

1354
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
96,922✔
1355
  TAOS_UDF_CHECK_PTR_RVOID(handle, buf);
290,766!
1356
  SUdfdUvConn *ctx = handle->data;
96,922✔
1357
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
96,922✔
1358
  if (ctx->inputCap == 0) {
96,922✔
1359
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
34,374✔
1360
    if (ctx->inputBuf) {
34,374!
1361
      ctx->inputLen = 0;
34,374✔
1362
      ctx->inputCap = msgHeadSize;
34,374✔
1363
      ctx->inputTotal = -1;
34,374✔
1364

1365
      buf->base = ctx->inputBuf;
34,374✔
1366
      buf->len = ctx->inputCap;
34,374✔
1367
    } else {
1368
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1369
      buf->len = 0;
×
1370
    }
1371
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
62,548!
1372
    buf->base = ctx->inputBuf + ctx->inputLen;
28,886✔
1373
    buf->len = msgHeadSize - ctx->inputLen;
28,886✔
1374
  } else {
1375
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
33,662✔
1376
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
33,662✔
1377
    if (inputBuf) {
33,662!
1378
      ctx->inputBuf = inputBuf;
33,662✔
1379
      buf->base = ctx->inputBuf + ctx->inputLen;
33,662✔
1380
      buf->len = ctx->inputCap - ctx->inputLen;
33,662✔
1381
    } else {
1382
      fnError("udfd can not allocate enough memory") buf->base = NULL;
×
1383
      buf->len = 0;
×
1384
    }
1385
  }
1386
}
1387

1388
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
67,274✔
1389
  if (pipe == NULL) {
67,274!
1390
    fnError("udfd pipe is NULL, LINE:%d", __LINE__);
×
1391
    return false;
×
1392
  }
1393
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
67,274!
1394
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
33,612✔
1395
  }
1396
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
67,274✔
1397
    fnDebug("receive request complete. length %d", pipe->inputLen);
33,612✔
1398
    return true;
33,612✔
1399
  }
1400
  return false;
33,662✔
1401
}
1402

1403
void udfdHandleRequest(SUdfdUvConn *conn) {
33,612✔
1404
  TAOS_UDF_CHECK_PTR_RVOID(conn);
67,224!
1405
  char   *inputBuf = conn->inputBuf;
33,612✔
1406
  int32_t inputLen = conn->inputLen;
33,612✔
1407

1408
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
33,612✔
1409
  if(work == NULL) {
33,612!
1410
    fnError("udfd malloc work failed");
×
1411
    return;
×
1412
  }
1413
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
33,612✔
1414
  if(udfWork == NULL) {
33,612!
1415
    fnError("udfd malloc udf work failed");
×
1416
    taosMemoryFree(work);
×
1417
    return;
×
1418
  }
1419
  udfWork->conn = conn;
33,612✔
1420
  udfWork->pWorkNext = conn->pWorkList;
33,612✔
1421
  conn->pWorkList = udfWork;
33,612✔
1422
  udfWork->input = uv_buf_init(inputBuf, inputLen);
33,612✔
1423
  conn->inputBuf = NULL;
33,612✔
1424
  conn->inputLen = 0;
33,612✔
1425
  conn->inputCap = 0;
33,612✔
1426
  conn->inputTotal = -1;
33,612✔
1427
  work->data = udfWork;
33,612✔
1428
  if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
33,612!
1429
  {
1430
    fnError("udfd queue work failed");
×
1431
    taosMemoryFree(work);
×
1432
    taosMemoryFree(udfWork);
×
1433
  }
1434
}
1435

1436
void udfdPipeCloseCb(uv_handle_t *pipe) {
762✔
1437
  TAOS_UDF_CHECK_PTR_RVOID(pipe);
1,524!
1438
  SUdfdUvConn *conn = pipe->data;
762✔
1439
  SUvUdfWork  *pWork = conn->pWorkList;
762✔
1440
  while (pWork != NULL) {
762!
1441
    pWork->conn = NULL;
×
1442
    pWork = pWork->pWorkNext;
×
1443
  }
1444

1445
  taosMemoryFree(conn->client);
762✔
1446
  taosMemoryFree(conn->inputBuf);
762✔
1447
  taosMemoryFree(conn);
762✔
1448
}
1449

1450
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
96,922✔
1451
  TAOS_UDF_CHECK_PTR_RVOID(client, buf);
290,766!
1452
  fnDebug("udfd read %zd bytes from client", nread);
96,922✔
1453
  if (nread == 0) return;
96,922✔
1454

1455
  SUdfdUvConn *conn = client->data;
68,036✔
1456

1457
  if (nread > 0) {
68,036✔
1458
    conn->inputLen += nread;
67,274✔
1459
    if (isUdfdUvMsgComplete(conn)) {
67,274✔
1460
      udfdHandleRequest(conn);
33,612✔
1461
    } else {
1462
      // log error or continue;
1463
    }
1464
    return;
67,274✔
1465
  }
1466

1467
  if (nread < 0) {
762!
1468
    if (nread == UV_EOF) {
762!
1469
      fnInfo("udfd pipe read EOF");
762!
1470
    } else {
1471
      fnError("Receive error %s", uv_err_name(nread));
×
1472
    }
1473
    udfdUvHandleError(conn);
762✔
1474
  }
1475
}
1476

1477
void udfdOnNewConnection(uv_stream_t *server, int status) {
762✔
1478
  TAOS_UDF_CHECK_PTR_RVOID(server);
1,524!
1479
  if (status < 0) {
762!
1480
    fnError("udfd new connection error. code: %s", uv_strerror(status));
×
1481
    return;
×
1482
  }
1483
  int32_t code = 0;
762✔
1484

1485
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
762✔
1486
  if(client == NULL) {
762!
1487
    fnError("udfd pipe malloc failed");
×
1488
    return;
×
1489
  }
1490
  code = uv_pipe_init(global.loop, client, 0);
762✔
1491
  if (code) {
762!
1492
    fnError("udfd pipe init error %s", uv_strerror(code));
×
1493
    taosMemoryFree(client);
×
1494
    return;
×
1495
  }
1496
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
762!
1497
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
762✔
1498
    if(ctx == NULL) {
762!
1499
      fnError("udfd conn malloc failed");
×
1500
      goto _exit;
×
1501
    }
1502
    ctx->pWorkList = NULL;
762✔
1503
    ctx->client = (uv_stream_t *)client;
762✔
1504
    ctx->inputBuf = 0;
762✔
1505
    ctx->inputLen = 0;
762✔
1506
    ctx->inputCap = 0;
762✔
1507
    client->data = ctx;
762✔
1508
    ctx->client = (uv_stream_t *)client;
762✔
1509
    code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
762✔
1510
    if (code) {
762!
1511
      fnError("udfd read start error %s", uv_strerror(code));
×
1512
      udfdUvHandleError(ctx);
×
1513
      taosMemoryFree(ctx);
×
1514
      taosMemoryFree(client);
×
1515
    }
1516
    return;
762✔
1517
  }
1518
_exit:
×
1519
    uv_close((uv_handle_t *)client, NULL);
×
1520
    taosMemoryFree(client);
×
1521
}
1522

1523
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
×
1524
  TAOS_UDF_CHECK_PTR_RVOID(handle);
×
1525
  fnInfo("udfd signal received: %d\n", signum);
×
1526
  uv_fs_t req;
1527
  int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
1528
  if(code) {
×
1529
    fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
×
1530
  }
1531
  code = uv_signal_stop(handle);
×
1532
  if(code) {
×
1533
    fnError("stop signal handler failed, reason:%s", uv_strerror(code));
×
1534
  }
1535
  uv_stop(global.loop);
×
1536
}
1537

1538
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
2,531✔
1539
  for (int32_t i = 1; i < argc; ++i) {
5,062✔
1540
    if (strcmp(argv[i], "-c") == 0) {
2,531!
1541
      if (i < argc - 1) {
2,531!
1542
        if (strlen(argv[++i]) >= PATH_MAX) {
2,531!
1543
          (void)printf("config file path overflow");
×
1544
          return -1;
×
1545
        }
1546
        tstrncpy(configDir, argv[i], PATH_MAX);
2,531✔
1547
      } else {
1548
        (void)printf("'-c' requires a parameter, default is %s\n", configDir);
×
1549
        return -1;
×
1550
      }
1551
    } else if (strcmp(argv[i], "-V") == 0) {
×
1552
      global.printVersion = true;
×
1553
    } else {
1554
    }
1555
  }
1556

1557
  return 0;
2,531✔
1558
}
1559

1560
static void udfdPrintVersion() {
×
1561
  (void)printf("udfd version: %s compatible_version: %s\n", td_version, td_compatible_version);
×
1562
  (void)printf("git: %s\n", td_gitinfo);
×
1563
  (void)printf("build: %s\n", td_buildinfo);
×
1564
}
×
1565

1566
static int32_t udfdInitLog() {
2,531✔
1567
  char logName[12] = {0};
2,531✔
1568
  snprintf(logName, sizeof(logName), "%slog", "udfd");
2,531✔
1569
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
2,531✔
1570
}
1571

1572
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
2,531✔
1573
  TAOS_UDF_CHECK_PTR_RVOID(buf);
5,062!
1574
  buf->base = taosMemoryMalloc(suggested_size);
2,531✔
1575
  if (buf->base == NULL) {
2,531!
1576
    fnError("udfd ctrl pipe alloc buffer failed");
×
1577
    return;
×
1578
  }
1579
  buf->len = suggested_size;
2,531✔
1580
}
1581

1582
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
2,531✔
1583
  TAOS_UDF_CHECK_PTR_RVOID(q, buf);
7,593!
1584
  if (nread < 0) {
2,531!
1585
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
2,531!
1586
    taosMemoryFree(buf->base);
2,531✔
1587
    uv_close((uv_handle_t *)q, NULL);
2,531✔
1588
    uv_stop(global.loop);
2,531✔
1589
    return;
2,531✔
1590
  }
1591
  fnError("udfd ctrl pipe read %zu bytes", nread);
×
1592
  taosMemoryFree(buf->base);
×
1593
}
1594

1595
static void removeListeningPipe() {
5,062✔
1596
  uv_fs_t req;
1597
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
5,062✔
1598
  uv_fs_req_cleanup(&req);
5,062✔
1599
  if(err) {
5,062✔
1600
    fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
5,043!
1601
  }
1602
}
5,062✔
1603

1604
static int32_t udfdUvInit() {
2,531✔
1605
  TAOS_CHECK_RETURN(uv_loop_init(global.loop));
2,531!
1606

1607
  if (tsStartUdfd) {  // udfd is started by taosd, which shall exit when taosd exit
2,531!
1608
    TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
2,531!
1609
    TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
2,531!
1610
    TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
2,531!
1611
  }
1612
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
2,531✔
1613

1614
  removeListeningPipe();
2,531✔
1615

1616
  TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
2,531!
1617

1618
  TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
2,531!
1619
  TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
2,531!
1620

1621
  int r;
1622
  fnInfo("bind to pipe %s", global.listenPipeName);
2,531!
1623
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
2,531!
1624
    fnError("Bind error %s", uv_err_name(r));
×
1625
    removeListeningPipe();
×
1626
    return -2;
×
1627
  }
1628
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
2,531!
1629
    fnError("Listen error %s", uv_err_name(r));
×
1630
    removeListeningPipe();
×
1631
    return -3;
×
1632
  }
1633
  return 0;
2,531✔
1634
}
1635

1636
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
5,062✔
1637
  if (!uv_is_closing(handle)) {
5,062!
1638
    uv_close(handle, NULL);
5,062✔
1639
  }
1640
}
5,062✔
1641

1642
static int32_t udfdGlobalDataInit() {
2,531✔
1643
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
2,531✔
1644
  if (loop == NULL) {
2,531!
1645
    fnError("udfd init uv loop failed, mem overflow");
×
1646
    return terrno;
×
1647
  }
1648
  global.loop = loop;
2,531✔
1649

1650
  if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
2,531!
1651
    fnError("udfd init script plugins mutex failed");
×
1652
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1653
  }
1654

1655
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2,531✔
1656
  if (global.udfsHash == NULL) {
2,531!
1657
    return terrno;
×
1658
  }
1659
  // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
1660

1661
  if (uv_mutex_init(&global.udfsMutex) != 0) {
2,531!
1662
    fnError("udfd init udfs mutex failed");
×
1663
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1664
  }
1665

1666
  return 0;
2,531✔
1667
}
1668

1669
static void udfdGlobalDataDeinit() {
2,531✔
1670
  taosHashCleanup(global.udfsHash);
2,531✔
1671
  uv_mutex_destroy(&global.udfsMutex);
2,531✔
1672
  uv_mutex_destroy(&global.scriptPluginsMutex);
2,531✔
1673
  taosMemoryFreeClear(global.loop);
2,531!
1674
  fnInfo("udfd global data deinit");
2,531!
1675
}
2,531✔
1676

1677
static void udfdRun() {
2,531✔
1678
  fnInfo("start udfd event loop");
2,531!
1679
  int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
2,531✔
1680
  if(code != 0) {
2,531!
1681
    fnError("udfd event loop still has active handles or requests.");
2,531!
1682
  }
1683
  fnInfo("udfd event loop stopped.");
2,531!
1684

1685
  (void)uv_loop_close(global.loop);
2,531✔
1686

1687
  uv_walk(global.loop, udfdCloseWalkCb, NULL);
2,531✔
1688
  code = uv_run(global.loop, UV_RUN_DEFAULT);
2,531✔
1689
  if(code != 0) {
2,531!
1690
    fnError("udfd event loop still has active handles or requests.");
×
1691
  }
1692
  (void)uv_loop_close(global.loop);
2,531✔
1693
}
2,531✔
1694

1695
int32_t udfdInitResidentFuncs() {
2,531✔
1696
  if (strlen(tsUdfdResFuncs) == 0) {
2,531!
1697
    return TSDB_CODE_SUCCESS;
2,531✔
1698
  }
1699

1700
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
×
1701
  char *pSave = tsUdfdResFuncs;
×
1702
  char *token;
1703
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
×
1704
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
×
1705
    tstrncpy(func, token, TSDB_FUNC_NAME_LEN);
×
1706
    fnInfo("udfd add resident function %s", func);
×
1707
    if(taosArrayPush(global.residentFuncs, func) == NULL)
×
1708
    {
1709
      taosArrayDestroy(global.residentFuncs);
×
1710
      return terrno;
×
1711
    }
1712
  }
1713

1714
  return TSDB_CODE_SUCCESS;
×
1715
}
1716

1717
void udfdDeinitResidentFuncs() {
2,531✔
1718
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
2,531!
1719
    char  *funcName = taosArrayGet(global.residentFuncs, i);
×
1720
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
×
1721
    if (udfInHash) {
×
1722
      SUdf   *udf = *udfInHash;
×
1723
      int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
×
1724
      fnDebug("udfd destroy function returns %d", code);
×
1725
      if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
×
1726
      {
1727
        fnError("udfd remove resident function %s failed", funcName);
×
1728
      }
1729
      taosMemoryFree(udf);
×
1730
    }
1731
  }
1732
  taosArrayDestroy(global.residentFuncs);
2,531✔
1733
  fnInfo("udfd resident functions are deinit");
2,531!
1734
}
2,531✔
1735

1736
int32_t udfdCreateUdfSourceDir() {
2,531✔
1737
  snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
2,531✔
1738
  int32_t code = taosMkDir(global.udfDataDir);
2,531✔
1739
  if (code != TSDB_CODE_SUCCESS) {
2,531!
1740
    snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
×
1741
    code = taosMkDir(global.udfDataDir);
×
1742
  }
1743
  fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
2,531!
1744

1745
  return code;
2,531✔
1746
}
1747

1748
void udfdDestroyUdfSourceDir() {
2,531✔
1749
  fnInfo("destory udf source directory %s", global.udfDataDir);
2,531!
1750
  taosRemoveDir(global.udfDataDir);
2,531✔
1751
}
2,531✔
1752

1753
int main(int argc, char *argv[]) {
2,531✔
1754
  int  code = 0;
2,531✔
1755
  bool logInitialized = false;
2,531✔
1756
  bool cfgInitialized = false;
2,531✔
1757
  bool openClientRpcFinished = false;
2,531✔
1758
  bool residentFuncsInited = false;
2,531✔
1759
  bool udfSourceDirInited = false;
2,531✔
1760
  bool globalDataInited = false;
2,531✔
1761

1762
  if (!taosCheckSystemIsLittleEnd()) {
2,531!
1763
    (void)printf("failed to start since on non-little-end machines\n");
×
1764
    return -1;
×
1765
  }
1766

1767
  if (udfdParseArgs(argc, argv) != 0) {
2,531!
1768
    (void)printf("failed to start since parse args error\n");
×
1769
    return -1;
×
1770
  }
1771

1772
  if (global.printVersion) {
2,531!
1773
    udfdPrintVersion();
×
1774
    return 0;
×
1775
  }
1776

1777
  if (udfdInitLog() != 0) {
2,531!
1778
    // ignore create log failed, because this error no matter
1779
    (void)printf("failed to init udfd log.");
×
1780
  } else {
1781
    logInitialized = true;  // log is initialized
2,531✔
1782
  }
1783

1784
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
2,531!
1785
    fnError("failed to start since read config error");
×
1786
    code = -2;
×
1787
    goto _exit;
×
1788
  }
1789
  cfgInitialized = true;  // cfg is initialized
2,531✔
1790
  fnInfo("udfd start with config file %s", configDir);
2,531!
1791

1792
  if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
2,531!
1793
    fnError("init ep set from cfg failed");
×
1794
    code = -3;
×
1795
    goto _exit;
×
1796
  }
1797
  fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
2,531!
1798
  if (udfdOpenClientRpc() != 0) {
2,531!
1799
    fnError("open rpc connection to mnode failed");
×
1800
    code = -4;
×
1801
    goto _exit;
×
1802
  }
1803
  fnInfo("udfd rpc client is opened");
2,531!
1804
  openClientRpcFinished = true;  // rpc is opened
2,531✔
1805

1806
  if (udfdCreateUdfSourceDir() != 0) {
2,531!
1807
    fnError("create udf source directory failed");
×
1808
    code = -5;
×
1809
    goto _exit;
×
1810
  }
1811
  udfSourceDirInited = true;  // udf source dir is created
2,531✔
1812
  fnInfo("udfd udf source directory is created");
2,531!
1813

1814
  if (udfdGlobalDataInit() != 0) {
2,531!
1815
    fnError("init global data failed");
×
1816
    code = -6;
×
1817
    goto _exit;
×
1818
  }
1819
  globalDataInited = true;  // global data is inited
2,531✔
1820
  fnInfo("udfd global data is inited");
2,531!
1821

1822
  if (udfdUvInit() != 0) {
2,531!
1823
    fnError("uv init failure");
×
1824
    code = -7;
×
1825
    goto _exit;
×
1826
  }
1827
  fnInfo("udfd uv is inited");
2,531!
1828

1829
  if (udfdInitResidentFuncs() != 0) {
2,531!
1830
    fnError("init resident functions failed");
×
1831
    code = -8;
×
1832
    goto _exit;
×
1833
  }
1834
  residentFuncsInited = true;  // resident functions are inited
2,531✔
1835
  fnInfo("udfd resident functions are inited");
2,531!
1836

1837
  udfdRun();
2,531✔
1838
  fnInfo("udfd exit normally");
2,531!
1839

1840
  removeListeningPipe();
2,531✔
1841
  udfdDeinitScriptPlugins();
2,531✔
1842

1843
_exit:
2,531✔
1844
  if (globalDataInited) {
2,531!
1845
    udfdGlobalDataDeinit();
2,531✔
1846
  }
1847
  if (residentFuncsInited) {
2,531!
1848
    udfdDeinitResidentFuncs();
2,531✔
1849
  }
1850
  if (udfSourceDirInited) {
2,531!
1851
    udfdDestroyUdfSourceDir();
2,531✔
1852
  }
1853
  if (openClientRpcFinished) {
2,531!
1854
    udfdCloseClientRpc();
2,531✔
1855
  }
1856
  if (cfgInitialized) {
2,531!
1857
    taosCleanupCfg();
2,531✔
1858
  }
1859
  if (logInitialized) {
2,531!
1860
    taosCloseLog();
2,531✔
1861
  }
1862

1863
  return code;
2,531✔
1864
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc